Skip to content

Executors

Executors control how experiment runs are dispatched. Choose from thread-based execution for debugging, process-based for local parallelism, or Slurm for HPC clusters.

Local Executors

metalab.ThreadExecutor

ThreadExecutor(max_workers: int = 4)

Executor using ThreadPoolExecutor.

Since threads share memory, we can: - Pass operation directly (no need for string reference) - Share store instance

Initialize the thread executor.

Parameters:

Name Type Description Default
max_workers int

Maximum number of worker threads.

4

__enter__

__enter__() -> 'ThreadExecutor'

Enter context manager.

__exit__

__exit__(exc_type: object, exc_val: object, exc_tb: object) -> None

Exit context manager, ensuring shutdown is called.

shutdown

shutdown(wait: bool = True) -> None

Shutdown the thread pool.

submit

submit(payloads: list[RunPayload], store: Store, operation: OperationWrapper, run_ids: list[str] | None = None) -> RunHandle

Submit payloads for execution and return a handle.

Parameters:

Name Type Description Default
payloads list[RunPayload]

List of run payloads to execute.

required
store Store

Store for persisting results.

required
operation OperationWrapper

The operation to run.

required
run_ids list[str] | None

All run IDs including skipped (for status tracking).

None

Returns:

Type Description
RunHandle

A LocalRunHandle for tracking and awaiting results.

metalab.ProcessExecutor

ProcessExecutor(max_workers: int = 4)

Executor using ProcessPoolExecutor.

Workers import operation from operation_ref string.

Initialize the process executor.

Parameters:

Name Type Description Default
max_workers int

Maximum number of worker processes.

4

__enter__

__enter__() -> 'ProcessExecutor'

Enter context manager.

__exit__

__exit__(exc_type: object, exc_val: object, exc_tb: object) -> None

Exit context manager, ensuring shutdown is called.

shutdown

shutdown(wait: bool = True) -> None

Shutdown the process pool.

submit

submit(payloads: list[RunPayload], store: Store, operation: OperationWrapper, run_ids: list[str] | None = None) -> RunHandle

Submit payloads for execution and return a handle.

Parameters:

Name Type Description Default
payloads list[RunPayload]

List of run payloads to execute.

required
store Store

Store for persisting results.

required
operation OperationWrapper

The operation to run (used to set operation_ref).

required
run_ids list[str] | None

All run IDs including skipped (for status tracking).

None

Returns:

Type Description
RunHandle

A LocalRunHandle for tracking and awaiting results.

SLURM Executor

For HPC cluster execution via direct sbatch submission.

metalab.executor.slurm.SlurmExecutor

SlurmExecutor(config: SlurmConfig | None = None)

Executor that submits index-addressed SLURM arrays.

Instead of serializing each task individually (like submitit's map_array), this executor: 1. Writes a single array spec file with experiment configuration 2. Submits one or more SLURM array jobs via sbatch 3. Each array task reconstructs its parameters from SLURM_ARRAY_TASK_ID

This approach scales to hundreds of thousands of tasks without the filesystem overhead of per-task serialization files.

Initialize the SLURM executor.

Parameters:

Name Type Description Default
config SlurmConfig | None

SLURM configuration. Uses defaults if not provided.

None

__enter__

__enter__() -> 'SlurmExecutor'

Enter context manager.

__exit__

__exit__(exc_type: object, exc_val: object, exc_tb: object) -> None

Exit context manager.

shutdown

shutdown(wait: bool = True) -> None

No-op for SLURM executor (jobs run independently).

submit

submit(payloads: list[RunPayload], store: 'Store', operation: 'OperationWrapper', run_ids: list[str] | None = None) -> 'SlurmRunHandle'

Legacy submit method - redirects to submit_indexed when possible.

This method exists for backward compatibility. New code should use submit_indexed() directly via the runner.

For large experiments, this will raise an error directing users to use the indexed submission path.

submit_indexed

submit_indexed(experiment: 'Experiment', store: 'Store', context_fingerprint: str, total_runs: int, skipped_count: int = 0, derived_metric_refs: list[str] | None = None) -> 'SlurmRunHandle'

Submit an experiment as index-addressed SLURM arrays.

Parameters:

Name Type Description Default
experiment 'Experiment'

The experiment to run.

required
store 'Store'

Store for persisting results.

required
context_fingerprint str

Precomputed context fingerprint.

required
total_runs int

Total number of runs (P * R).

required
skipped_count int

Number of runs already completed (for resume).

0
derived_metric_refs list[str] | None

Optional derived metric function references.

None

Returns:

Type Description
'SlurmRunHandle'

A SlurmRunHandle for tracking and awaiting results.

Raises:

Type Description
ValueError

If param source doesn't support indexing.

RuntimeError

If sbatch submission fails.

metalab.executor.slurm.SlurmConfig dataclass

SlurmConfig(partition: str = 'default', time: str = '1:00:00', cpus: int = 1, memory: str = '4G', gpus: int = 0, max_concurrent: int | None = None, max_array_size: int = DEFAULT_MAX_ARRAY_SIZE, chunk_size: int = 1, modules: list[str] = list(), conda_env: str | None = None, setup: list[str] = list(), extra_sbatch: dict[str, str] = dict())

Configuration for SLURM job submission.

Attributes:

Name Type Description
partition str

SLURM partition/queue name.

time str

Maximum walltime (e.g., "1:00:00" for 1 hour).

cpus int

Number of CPUs per task.

memory str

Memory per task (e.g., "4G", "16GB").

gpus int

Number of GPUs per task (0 for CPU-only).

max_concurrent int | None

Maximum concurrent jobs (maps to --array=%N).

max_array_size int

Maximum tasks per array job (for sharding).

chunk_size int

Number of runs per array task. Higher values reduce scheduler load for large experiments (e.g., 100k runs with chunk_size=100 submits 1k array tasks instead of 100k).

modules list[str]

Shell modules to load before execution.

conda_env str | None

Conda environment to activate.

setup list[str]

List of bash commands to run before each task.

extra_sbatch dict[str, str]

Additional sbatch directives as key-value pairs.