API Reference

This section provides the API documentation for the experimaestro module.

Core Classes

The core classes form the foundation of experimaestro’s configuration and task system.

Config

XPM Configexperimaestro.Config

Bases: object

Base type for all objects in python interface

XPMConfig

alias of XPMConfig

XPMValue

alias of Config

__post_init__()

Called after the object __init__() and with properties set

register_task_output(method, *args, **kwargs)

Register a task output for dynamic callbacks.

This method is used to register outputs that can trigger callbacks when produced during task execution.

classmethod value_class()

Decorator to register an external value class for this configuration.

This allows declaring a separate class that will be used when creating instances, which is useful to avoid initializing resources (e.g., PyTorch) when only configuring.

class Model(Config):
    hidden_size: Param[int]

@Model.value_class()
class TorchModel(Model, nn.Module):
    def __init__(self):
        super().__init__()
        self.layer = nn.Linear(self.hidden_size, self.hidden_size)

The value class must be a subclass of the configuration class and a subclass of parent configuration value classes (if any).

ConfigMixin

class experimaestro.core.objects.ConfigMixin(**kwargs)

Class for configuration objects

__validate__()

Validate the values

add_dependencies(*dependencies)

Adds tokens to the task

copy()

Returns a copy of this configuration (ignores other non parameters attributes)

copy_dependencies(other: ConfigMixin)

Add all the dependencies from other configuration

instance(context: ConfigWalkContext | None = None, *, objects: ObjectStore | None = None, keep: bool = True) T

Return an instance with the current values

Parameters:
  • context – The context when computing the instance

  • objects – The previously built objects (so that we avoid re-creating instances of past configurations)

  • keep – register a configuration in the __config__ field of the instance

submit(*, workspace=None, launcher=None, run_mode: experimaestro.scheduler.workspace.RunMode = None, init_tasks: List[LightweightTask] = [], max_retries: int | None = None, transient: TransientMode = None)

Submit this task

Parameters:
  • workspace – the workspace, defaults to None

  • launcher – The launcher, defaults to None

  • run_mode – Run mode (if None, uses the workspace default)

  • max_retries – Maximum number of retries for resumable tasks that timeout (default: from workspace settings or 3)

  • transient – Transient mode for intermediary tasks (see TransientMode)

Returns:

an object object

tags()

Returns the tag associated with this object (and below)

Task

XPM Taskexperimaestro.Task

Bases: LightweightTask

Base class for tasks

watch_output(method, callback)

Sets up a callback

Parameters:
  • method – a method within a configuration

  • callback – the callback

ResumableTask

XPM Taskexperimaestro.ResumableTask

Bases: Task

Base class for resumable/checkpointable tasks

Resumable tasks can be restarted if they are stopped by a time limit (e.g., SLURM job timeout). The task directory and dynamic outputs are preserved across restarts to allow checkpoint recovery.

remaining_time() float | None

Returns the remaining time in seconds before the job times out.

This is useful for checkpointing before hitting a time limit (e.g., SLURM walltime).

Returns:

  • There is no time limit

  • The launcher doesn’t support querying remaining time

  • The task is not running

Return type:

The remaining time in seconds, or None if

LightweightTask

XPM Configexperimaestro.LightweightTask

Bases: Config

A task that can be run before or after a real task to modify its behaviour

InstanceConfig

XPM Configexperimaestro.InstanceConfig

Bases: Config

Base class for configurations where instance identity matters.

When a Config class derives from InstanceConfig instead of Config, instances are distinguished based on their object identity when used in containers. This enables distinguishing between shared and separate instances even when all parameters are identical.

Example

>>> class SubModel(InstanceConfig):
...     value: Param[int] = 100
>>> class MainModel(Config):
...     m1: Param[SubModel]
...     m2: Param[SubModel]
>>>
>>> sm1 = SubModel.C()
>>> sm2 = SubModel.C()  # Same params, different instance
>>>
>>> # Shared instance (same object used twice)
>>> shared = MainModel.C(m1=sm1, m2=sm1)
>>>
>>> # Separate instances (different objects)
>>> separate = MainModel.C(m1=sm1, m2=sm2)
>>>
>>> # Different identifiers: shared vs separate
>>> shared.__identifier__() != separate.__identifier__()

The instance order is determined by the traversal order during identifier computation, ensuring reproducibility.

Type Annotations

Type annotations are used to declare parameters in configurations and tasks.

Param

experimaestro.Param

Type annotation for configuration parameters.

Parameters annotated with Param[T] are included in the configuration identifier computation and must be set before the configuration is sealed.

Meta

experimaestro.Meta

Type annotation for meta-parameters (ignored in identifier computation).

Use Meta[T] for parameters that should not affect the task identity, such as output paths or runtime configuration.

Constant

experimaestro.Constant

Type annotation for constant (read-only) parameters.

Constants must have a default value and cannot be modified after creation.

DataPath

experimaestro.DataPath

Type annotation for data paths that should be serialized.

Use DataPath for paths that point to data files that should be preserved when serializing/deserializing a configuration.

DependentMarker

experimaestro.DependentMarker

Type alias for dependency marker functions used in task_outputs() and dynamic output methods.

Experiment Management

experiment

class experimaestro.experiment(env: Path | str | WorkspaceSettings, name: str, *, host: str | None = None, port: int | None = None, token: str | None = None, run_mode: RunMode | None = None, launcher=None, register_signals: bool = True, project_paths: list[Path] | None = None, wait_for_quit: bool = False, dirty_git: DirtyGitAction = DirtyGitAction.WARN, no_db: bool = False, no_environmental_impact: bool = False)

Bases: BaseExperiment

Context manager for running experiments.

Creates a workspace, manages task submission, and optionally starts a web server for monitoring.

Implements BaseExperiment interface for use with StateProvider and TUI.

Example:

from experimaestro import experiment

with experiment("./workdir", "my-experiment", port=12345) as xp:
    task = MyTask.C(param=42).submit()
    result = task.wait()
add_service(service: ServiceClass) ServiceClass

Adds a service (e.g. tensorboard viewer) to the experiment

Parameters:

service – A service instance

Returns:

The same service instance (or existing service if already added)

static current() experiment

Returns the current experiment, but checking first if set

If there is no current experiment, raises an AssertError

wait()

Wait until the running processes have finished

Workspace

class experimaestro.Workspace(settings: Settings, workspace_settings: WorkspaceSettings, launcher=None, run_mode: RunMode | None = None)

Bases: object

Workspace environment for experiments

This is a simple container for workspace settings, environment, and configuration. Multiple Workspace instances can exist for the same path - the singleton pattern is handled by WorkspaceStateProvider which manages the database per workspace path.

property alt_jobspaths

Yield <folder>/jobs for every attached folder.

Used for read-through job lookups across all modes.

cleanup_old_scheduler_runs(retention_days: int = 15, force: bool = False) tuple[int, int]

Clean up old scheduler run directories

Parameters:
  • retention_days – Delete run directories older than this many days

  • force – Force cleanup even if recently run

Returns:

Tuple of (directories_deleted, errors_count)

property configcachepath

Folder for jobs

property connector

Returns the default connector

property experimentspath

Folder for experiments

property folders: List[FolderSettings]

Auxiliary folders attached to this workspace.

Includes both the new folders setting and the deprecated alt_workspaces field (treated as mode=use). Beta.

property jobspath

Folder for jobs

property partialspath

Folder for partial job directories (shared checkpoints, etc.)

property scheduler_lock_path: Path

Lock file for scheduler directory operations

property scheduler_run_path: Path

Folder for this run’s scheduler data

property scheduler_services_path: Path

Folder for service logs

property schedulerpath: Path

Folder for scheduler metadata (base .scheduler directory)

classmethod set_launcher(launcher) None

Set the launcher for the current workspace

Parameters:

launcher – The launcher to use for task execution

RunMode

class experimaestro.RunMode(value)

An enumeration.

DRY_RUN = 'dry-run'

Do not run

GENERATE_ONLY = 'generate'

Do not run, but generate the params.json file

NORMAL = 'normal'

Normal run

PREPARE = 'prepare'

Only run discovered Prepare configs; skip Task execution

Tagging

tag

experimaestro.tag(value)

Tag a parameter value for tracking in experiments.

Tagged values appear in experiment logs and can be used for filtering and organizing results. Tags are included in the task’s __tags__ dictionary.

Example:

task = MyTask.C(
    learning_rate=tag(0.001),  # Will appear in task tags
    batch_size=32,
).submit()
Parameters:

value – The value to tag (str, int, float, or bool)

Returns:

A tagged value wrapper that preserves the original value

tags

experimaestro.tags(value) TagDict

Return the tags associated with a configuration.

Returns a dictionary of all tagged parameter values from this configuration and its nested configurations.

Example:

config = MyTask.C(learning_rate=tag(0.001), epochs=tag(100))
task_tags = tags(config)  # {"learning_rate": 0.001, "epochs": 100}
Parameters:

value – A configuration object

Returns:

A TagDict with tag names as keys and tagged values as values

tagspath

experimaestro.tagspath(value: Config) str

Generate a unique path string from a configuration’s tags.

Useful for creating tag-based directory structures. Tags are sorted alphabetically and joined with underscores.

Example:

config = MyTask.C(learning_rate=tag(0.001), epochs=tag(100))
path = tagspath(config)  # "epochs=100_learning_rate=0.001"
Parameters:

value – A configuration object

Returns:

A string with sorted tags in key=value format, joined by _

Utilities

setmeta

experimaestro.setmeta(config: Config, flag: bool)

Force a configuration to be treated as a meta-parameter.

When a configuration is marked as meta, it is excluded from the identifier computation of its parent configuration.

Example:

class Ensemble(Config):
    model1: Param[Model]
    model2: Param[Model]

# Mark model2 as meta - it won't affect the ensemble's identifier
model2 = setmeta(Model.C(...), True)
ensemble = Ensemble.C(model1=model1, model2=model2)
Parameters:
  • config – The configuration to mark

  • flag – True to mark as meta, False to include in identifier

Returns:

The same configuration (for chaining)

sealed_set

experimaestro.sealed_set(*elements: Config) set[Config]

Create a set of sealed Config objects.

Each element is sealed (its identifier computed and cached) before being added to the set. This makes Config objects hashable by their identifier, enabling use in Python sets.

Non-Config elements (primitives, enums, etc.) are passed through as-is.

Example:

model1 = Model.C(lr=0.01)
model2 = Model.C(lr=0.02)
ensemble = Ensemble.C(models=sealed_set(model1, model2))
Parameters:

elements – Config objects to seal and add to the set

Returns:

A set containing the sealed elements

Raises:

TypeError – If a Config element cannot be sealed

cache

experimaestro.cache(name: str)

Decorator for caching method results to disk.

The cache is stored in the workspace’s config directory, keyed by the configuration’s identifier.

Example:

class MyConfig(Config):
    data_path: Param[Path]

    @cache("processed.pkl")
    def process(self, cache_path: Path):
        if cache_path.exists():
            return pickle.load(cache_path.open("rb"))
        result = expensive_computation(self.data_path)
        pickle.dump(result, cache_path.open("wb"))
        return result
Parameters:

name – Filename for the cache file

Returns:

A decorator that wraps the method with caching logic

initializer

experimaestro.initializer(method)

Decorator for methods that should only execute once.

After the first call, subsequent calls return the cached result. This is useful for lazy initialization of expensive resources.

Example:

class MyConfig(Config):
    @initializer
    def model(self):
        return load_expensive_model()
Parameters:

method – The method to wrap

Returns:

A wrapper that caches the result after first execution

tqdm

experimaestro.tqdm(**kwargs) xpm_tqdm
experimaestro.tqdm(iterable: Iterator[T] | None = None, **kwargs) Iterator[T]

Create an experimaestro-aware progress bar.

A drop-in replacement for tqdm.tqdm that automatically reports progress to job event files. Use this in task execute() methods.

Example:

from experimaestro import tqdm

for epoch in tqdm(range(100), desc="Epochs"):
    for batch in tqdm(dataloader, desc="Batches"):
        train(batch)
Parameters:
  • iterable – Iterable to wrap (optional)

  • kwargs – Additional arguments passed to tqdm

Returns:

A progress bar iterator

progress

experimaestro.progress(value: float, level=0, desc: str | None = None, console=False)

Report task progress.

Call this function from within a running task to report progress. Progress is written to job event files and displayed in monitors.

Example:

for i, batch in enumerate(dataloader):
    train(batch)
    progress(i / len(dataloader), desc="Training")
Parameters:
  • value – Progress value between 0.0 and 1.0

  • level – Nesting level for nested progress bars (default: 0)

  • desc – Optional description of the current operation

  • console – If True, also print to console

Field Definitions

field

class experimaestro.field(*, default: Any | None = None, default_factory: Callable | None = None, ignore_default: bool | Any | None = None, ignore_generated=False, overrides=False, groups: list[ParameterGroup] | None = None)

Specify additional properties for a configuration parameter.

Use field() to control default value behavior and parameter grouping.

Default value options and identifier behavior:

default

The parameter has a default value that is always included in the task identifier. Two configs with different values always get different identifiers, even if one uses the default.

default_factory

A callable (zero-argument) that produces the default value. Behaves like default — the value is always included in the identifier. On Meta fields, the callable is invoked at seal time (e.g. PathGenerator).

ignore_default (bool)

When True and combined with default or default_factory, the default value is excluded from the identifier when the actual value equals the default. This is the backwards-compatible behavior matching bare defaults (x: Param[int] = 23, which is deprecated).

Example:

class MyConfig(Config):
    # Default always included in identifier
    count: Param[int] = field(default=10)

    # Factory default always included in identifier
    fabric: Param[FabricConfig] = field(
        default_factory=FabricConfig.C
    )

    # Default ignored in identifier when value == default
    threshold: Param[float] = field(default=0.5, ignore_default=True)

    # Factory default ignored when value == default
    fabric: Param[FabricConfig] = field(
        default_factory=FabricConfig.C, ignore_default=True
    )

    # Generated path (Meta field, excluded from identifier)
    output: Meta[Path] = field(
        default_factory=PathGenerator("out.txt")
    )

    # Parameter in a group (for partial identifiers)
    lr: Param[float] = field(groups=[training_group])

param_group

experimaestro.param_group(name: str) ParameterGroup

Create a parameter group for use with partial identifiers.

Parameter groups allow computing partial identifiers that exclude certain parameters, enabling shared directories across related tasks.

Example:

training_group = param_group("training")

class MyTask(Task):
    model_size: Param[int]
    learning_rate: Param[float] = field(groups=[training_group])
Parameters:

name – Unique name for this parameter group

Returns:

A ParameterGroup object

partial

experimaestro.partial(*, exclude_groups: list[ParameterGroup] | None = None, include_groups: list[ParameterGroup] | None = None, exclude_no_group: bool = False, exclude_all: bool = False) Partial

Create a partial specification for partial identifier computation.

Partials allow tasks to share directories when they differ only in certain parameter groups (e.g., training hyperparameters).

Example:

training_group = param_group("training")

class Train(Task):
    model: Param[Model]
    epochs: Param[int] = field(groups=[training_group])

    checkpoint: Meta[Path] = field(
        default_factory=PathGenerator(
            "model.pt",
            partial=partial(exclude_groups=[training_group])
        )
    )
Parameters:
  • exclude_groups – Parameter groups to exclude from identifier

  • include_groups – Parameter groups to always include (overrides exclusion)

  • exclude_no_group – If True, exclude parameters with no group assigned

  • exclude_all – If True, exclude all parameters by default

Returns:

A Partial object

PathGenerator

class experimaestro.PathGenerator(path: str | Path | Callable[[ConfigWalkContext, Config], Path] = '', *, partial: Partial = None)

Generate paths within the task directory.

Use PathGenerator with field(default_factory=...) to create paths relative to the task’s working directory.

Example:

class MyTask(Task):
    output: Meta[Path] = field(default_factory=PathGenerator("results.json"))
    model: Meta[Path] = field(default_factory=PathGenerator("model.pt"))

For shared directories across related tasks, use with partial:

training_group = param_group("training")

class Train(Task):
    epochs: Param[int] = field(groups=[training_group])
    checkpoint: Meta[Path] = field(
        default_factory=PathGenerator(
            "model.pt",
            partial=partial(exclude=[training_group])
        )
    )
Parameters:
  • path – Relative path within the task directory. Can be a string, Path, or callable that takes (context, config) and returns a Path.

  • partial – Optional partial for partial directory sharing. When provided, the path is generated in a shared partial directory.

isoutput()

Returns True if this generator is a task output (e.g. generates a path within the job folder)

Deprecation

deprecate

experimaestro.deprecate(config_or_target: Type[Config] | Callable | None = None, *, replace: bool = False)

Deprecate a configuration/task class or a parameter.

Deprecated configurations maintain backwards compatibility while allowing migration to new structures. The identifier is computed from the converted configuration, ensuring consistency.

Usage patterns:

  1. Simple deprecation (class inherits from new class):

    @deprecate
    class OldConfig(NewConfig):
        pass
    
  2. Deprecation with conversion:

    @deprecate(NewConfig)
    class OldConfig(Config):
        value: Param[int]
    
        def __convert__(self):
            return NewConfig.C(values=[self.value])
    
  3. Immediate replacement:

    @deprecate(NewConfig, replace=True)
    class OldConfig(Config):
        value: Param[int]
    
        def __convert__(self):
            return NewConfig.C(values=[self.value])
    
  4. Deprecate a parameter:

    class MyConfig(Config):
        new_param: Param[list[int]]
    
        @deprecate
        def old_param(self, value: int):
            self.new_param = [value]
    
Parameters:
  • config_or_target – Target class for conversion, or the deprecated class/method when used as a simple decorator

  • replace – If True, creating the deprecated class immediately returns the converted instance

Exceptions

GracefulTimeout

class experimaestro.GracefulTimeout(message: str = 'Task stopped gracefully before timeout')

Bases: Exception

Exception raised to signal a graceful timeout in resumable tasks.

Raise this exception when a task needs to checkpoint and exit before a time limit (e.g., SLURM walltime). The task will be marked for retry rather than as failed.

Example:

```python
    class LongTraining(ResumableTask):
        def execute(self):
            for epoch in range(self.epochs):
                remaining = self.remaining_time()
                if remaining is not None and remaining < 300:
                    save_checkpoint(self.checkpoint, epoch)
                    raise GracefulTimeout("Not enough time for another epoch")
                train_one_epoch()
```

Serialization

save

experimaestro.save(obj: Any, save_directory: Path | None, definition_filename: str = 'experimaestro.json')

Save a configuration to a directory.

The serialization process stores the configuration in the definition file and copies any files or folders registered as DataPath parameters.

Example:

config = MyConfig.C(data_path=Path("/data/file.txt"))
save(config, Path("/output/saved_config"))
Parameters:
  • obj – The configuration to save

  • save_directory – The directory in which the object and its data will be saved

  • definition_filename – The filename for the definition file (default: “experimaestro.json”)

load

experimaestro.load(path: str | Path | SerializedPathLoader, as_instance: bool = False, partial_loading: bool | None = None, definition_filename: str | None = None) Tuple[Any, List[LightweightTask]]

Load a configuration from a directory.

Restores a configuration previously saved with save().

Example:

config = load(Path("/output/saved_config"))
Parameters:
  • path – Directory containing the saved configuration, or a function that resolves relative paths to absolute ones

  • as_instance – If True, return an instance instead of a config

  • partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.

  • definition_filename – The definition filename. If None, tries “experimaestro.json” first, then falls back to “definition.json”.

Returns:

The loaded configuration or instance

serialize

experimaestro.serialize(obj: Any, save_directory: Path, *, init_tasks: list[LightweightTask] = [], definition_filename: str = 'experimaestro.json')

Serialize a configuration to a directory with initialization tasks.

Similar to save(), but also stores lightweight initialization tasks that should be run when the configuration is deserialized.

Parameters:
  • obj – The configuration to serialize

  • save_directory – The directory in which the object and its data will be saved

  • init_tasks – List of lightweight tasks to run on deserialization

  • definition_filename – The filename for the definition file (default: “experimaestro.json”)

deserialize

experimaestro.deserialize(path: str | Path | SerializedPathLoader, as_instance: bool = False, partial_loading: bool | None = None, definition_filename: str | None = None) tuple[Any, List[LightweightTask]] | Any

Deserialize a configuration from a directory.

Restores a configuration previously saved with serialize(). When as_instance=True, runs any stored initialization tasks.

Parameters:
  • path – Directory containing the serialized configuration, or a function that resolves relative paths to absolute ones

  • as_instance – If True, return an instance and run init tasks

  • partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.

  • definition_filename – The definition filename. If None, tries “experimaestro.json” first, then falls back to “definition.json”.

Returns:

The configuration/instance (if as_instance), or tuple of (configuration, init_tasks)

from_task_dir

experimaestro.from_task_dir(path: str | Path | SerializedPathLoader, as_instance: bool = False, partial_loading: bool | None = None)

Load a task configuration from a task directory.

Loads the task parameters from a job directory (containing params.json). This is useful for reloading task configurations after execution.

Parameters:
  • path – Task directory containing params.json, or a function that resolves relative paths to absolute ones

  • as_instance – If True, return an instance instead of a config

  • partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.

Returns:

The loaded task configuration or instance

state_dict

experimaestro.state_dict(context: SerializationContext, obj: Any)

Convert an object to a state dictionary for serialization.

Returns a dictionary representation that can be serialized to JSON and later restored with from_state_dict().

Parameters:
  • context – The serialization context

  • obj – The object to serialize

Returns:

A dictionary with ‘objects’ and ‘data’ keys

from_state_dict

experimaestro.from_state_dict(state: Dict[str, Any], path: None | str | Path | SerializedPathLoader = None, *, as_instance: bool = False, partial_loading: bool | None = None)

Load an object from a state dictionary.

Restores a configuration from a dictionary previously created by state_dict().

Parameters:
  • state – The state dictionary to load from

  • path – Directory containing data files, or a function that resolves relative paths to absolute ones

  • as_instance – If True, return an instance instead of a config

  • partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.

Returns:

The loaded configuration or instance

SerializationContext

class experimaestro.SerializationContext(*, save_directory: Path | None = None)

Context when serializing experimaestro configurations

property depth: int

The current depth in the configuration tree (root = 0)

serialize(var_path: List[str], data_path: Path, config: ConfigMixin) SerializedPath

Serialize data files into the save directory

Parameters:
  • var_path – The variable path (list of field names from root)

  • data_path – The path to the data file/folder to serialize

  • config – The config object owning this data path

Returns:

A SerializedPath referencing the serialized data

Raises:

ValueError – If the destination path was already used

load_xp_info

experimaestro.load_xp_info(path: str | Path) ExperimentInfo

Load all serialized objects from an experiment run directory.

Reads objects.jsonl (streaming format) to reconstruct job configs and actions. Uses jobs.jsonl for job IDs and status.json for action IDs to classify entries.

Falls back to configs.json for experiments created before the objects.jsonl format was introduced.

This is a standalone function – no experiment context or WorkspaceStateProvider is required.

Parameters:

path – Path to the experiment run directory

Returns:

ExperimentInfo with .jobs and .actions dictionaries

Raises:

FileNotFoundError – If neither objects.jsonl nor configs.json exists

ExperimentInfo

class experimaestro.ExperimentInfo(jobs: Dict[str, Any], actions: Dict[str, Any])

Structured result from loading experiment objects.

Contains deserialized job configs and actions from an experiment run.

actions: Dict[str, Any]

Mapping of action_id to Action objects

jobs: Dict[str, Any]

Mapping of job_id to Config objects

Launchers

DirectLauncher

class experimaestro.launchers.direct.DirectLauncher(connector: Connector, *, priority: float = 0)

Bases: Launcher

Launcher that runs tasks directly as local processes.

This is the default launcher that executes tasks on the local machine without any job scheduler. Tasks are run as Python subprocesses.

Parameters:

connector – The connector to use (defaults to LocalConnector)

static get_cli()

Returns the CLI group for direct launcher commands.

launcher_info_code() str

Returns empty string as local launcher has no time limits.

scriptbuilder()

Returns a script builder

SlurmLauncher

class experimaestro.launchers.slurm.SlurmLauncher(*, connector: Connector | None = None, options: SlurmOptions | None = None, interval: float = 60, main=None, launcherenv: Dict[str, str] | None = None, binpath='/usr/bin')

Bases: Launcher

Slurm workload manager launcher

https://slurm.schedmd.com/documentation.html

config(**kwargs)

Returns a new Slurm launcher with the given configuration

property key

Returns a dictionary characterizing this launcher when calling sacct/etc

launcher_info_code() str

Returns Python code to set up launcher info during task execution.

processbuilder() SlurmProcessBuilder

Returns the process builder for this launcher

By default, returns the associated connector builder

scriptbuilder()

Returns the script builder

We assume Unix, but should be changed to PythonScriptBuilder when working

SlurmOptions

class experimaestro.launchers.slurm.SlurmOptions(nodes: int | None = 1, time: str | None = None, account: str | None = None, qos: str | None = None, partition: str | None = None, constraint: str | None = None, mem: str | None = None, exclude: str | None = None, mem_per_gpu: str | None = None, cpus_per_task: str | None = None, nodelist: str | None = None, ntasks_per_node: int | None = None, gpus: int | None = None, gpus_per_node: int | None = None)
account: str | None = None

The account for launching the job

args() List[str]

Returns the corresponding options

constraint: str | None = None

Logic expression on node features (as defined by the administator)

cpus_per_task: str | None = None

Number of cpus requested per task

exclude: str | None = None

List of hosts to exclude

static format_time(duration_s: int)

Format time for the SLURM option

Parameters:

duration_s – Time duration in seconds1

Returns:

The configuration string

gpus: int | None = None

Number of GPUs

gpus_per_node: int | None = None

Number of GPUs per node

mem: str | None = None

Requested memory on the node (in megabytes by default)

mem_per_gpu: str | None = None

K, M, G, or T)

Type:

Requested memory per allocated GPU (size with units

nodelist: str | None = None

Request a specific list of hosts

nodes: int | None = 1

Number of requested nodes

ntasks_per_node: int | None = None

Number of tasks to run on each node

partition: str | None = None

The requested partition

qos: str | None = None

The requested Quality of Service

time: str | None = None

Requested time

Connectors

LocalConnector

class experimaestro.connectors.local.LocalConnector(localpath: Path | None = None)

Bases: Connector

Connector for executing tasks on the local machine.

This connector handles local file system operations and process execution. It is the default connector used when no remote execution is needed.

Use instance() to get a singleton instance of the local connector.

Parameters:

localpath – Base path for experimaestro data. Defaults to ~/.local/share/experimaestro or the value of XPM_WORKDIR environment variable.

async_lock(path: Path, max_delay: int = -1) Lock

Returns an async lock

Parameters:
  • lockfile (path {Path} -- Path of the)

  • duration (max_delay {int} -- Maximum wait)

createtoken(name: str, total: int) Token

Returns a token in the default path for the connector

lock(path: Path, max_delay: int = -1) SyncLock

Returns a sync lock

Parameters:
  • lockfile (path {Path} -- Path of the)

  • duration (max_delay {int} -- Maximum wait)

SshConnector

class experimaestro.connectors.ssh.SshConnector(hostname: str)

Bases: Connector

async_lock(path: Path, max_delay: int = -1)

Returns an async lock on a file

createtoken(name: str, total: int) Token

Returns a token in the default path for the connector

static fromPath(path: SshPath)

Creates an SSH connector from an SshPath

static get(hostname)

Get an SSH connector from a hostname

This method can caches SSH connectors, and is thus preferred to direct initialization

lock(path: Path, max_delay: int = -1)

Returns a sync lock on a file

Launcher Finder

find_launcher

experimaestro.launcherfinder.find_launcher(*specs: HostRequirement | str, tags: Set[str] = {}) Launcher

Find a launcher matching a given specification

parse

experimaestro.launcherfinder.parser.parse(expr: str)

Parse a requirement specification string into a HostRequirement object.

The specification string describes hardware requirements for running a task. Multiple alternatives can be specified using | (OR), and requirements within an alternative are combined using & (AND).

Syntax elements:

  • duration=<N><unit>: Job duration (units: h/hours, d/days, m/mins)

  • cpu(mem=<size>, cores=<N>): CPU requirements

  • cuda(mem=<size>) * <N>: NVIDIA CUDA GPU requirements (memory and count)

  • mps(mem=<size>) * <N>: Apple MPS GPU requirements (unified memory)

  • gpu(mem=<size>) * <N>: Generic GPU requirements (matches any accelerator)

  • Memory sizes: <N>G, <N>GiB, <N>M, <N>MiB

Accelerator types:

  • cuda: NVIDIA CUDA GPUs only (dedicated memory)

  • mps: Apple Silicon MPS only (unified memory with CPU)

  • gpu: Any accelerator type (cross-platform)

Parameters:

expr – The requirement specification string

Returns:

A HostRequirement object

Examples:

from experimaestro.launcherfinder.parser import parse

# Request 2 NVIDIA GPUs with 32GB each, 700GB RAM, for 40 hours
req = parse("duration=40h & cpu(mem=700GiB) & cuda(mem=32GiB) * 2")

# Cross-platform: CUDA on Linux/Windows OR MPS on macOS
req = parse(
    "duration=4h & cuda(mem=8GiB)"
    " | duration=4h & mps(mem=8GiB)"
)

# Generic GPU requirement (matches any accelerator)
req = parse("duration=2h & gpu(mem=4GiB)")

HostRequirement

class experimaestro.launcherfinder.specs.HostRequirement

A requirement must be a disjunction of host requirements

abstract multiply_duration(coefficient: float) HostRequirement

Returns a new HostRequirement with a duration multiplied by the provided coefficient

requirements: List[HostSimpleRequirement]

List of requirements (by order of priority)

HostSimpleRequirement

class experimaestro.launcherfinder.specs.HostSimpleRequirement(*reqs: HostSimpleRequirement)

Simple host requirement

accelerators: List[AcceleratorSpecification]

Specification for accelerators (GPUs)

cpu: CPUSpecification

Specification for CPU

property cuda_gpus: List[CudaSpecification]

CUDA GPUs (backwards compatibility alias).

Returns only CUDA accelerators from the accelerators list.

duration: int

Requested duration (in seconds)

multiply_duration(coefficient: float) HostSimpleRequirement

Returns a new HostRequirement with a duration multiplied by the provided coefficient

HostSpecification

class experimaestro.launcherfinder.specs.HostSpecification(*, accelerators: List[AcceleratorSpecification] = NOTHING, cuda: List[CudaSpecification] = NOTHING, cpu: CPUSpecification = NOTHING, priority: int = 0, max_duration: int = 0, min_gpu: int = 0)

Specifies how the host is set.

Supports both CUDA GPUs and other accelerators (MPS, ROCm, etc.). Use accelerators for the generic list, or cuda for backwards compatibility.

Examples

# New style - generic accelerators host = HostSpecification(accelerators=[CudaSpecification(memory=24*1024**3)])

# Backwards compatible - cuda shorthand host = HostSpecification(cuda=[CudaSpecification(memory=24*1024**3)])

accelerators: List[AcceleratorSpecification]

All accelerators (GPUs) available on this host

cpu: CPUSpecification

CPU specification for this host

cuda: List[CudaSpecification]

CUDA GPUs (backwards compatibility, merged into accelerators)

max_duration: int

Max job duration (in seconds)

min_gpu: int

Minimum number of allocated GPUs

priority: int

Priority for this host (higher better)

AcceleratorSpecification

class experimaestro.launcherfinder.specs.AcceleratorSpecification(memory: int = 0, model: str = '', min_memory: int = 0)

Generic accelerator (GPU-like device) specification.

This can match any accelerator type (CUDA, MPS, ROCm, etc.) based on memory requirements alone. Use this when you don’t care about the specific accelerator type.

For type-specific requirements, use CudaSpecification or MPSSpecification.

property accelerator_type: AcceleratorType | None

Type of accelerator (None for generic)

match(spec: AcceleratorSpecification) bool

Returns True if this host accelerator can satisfy the spec requirement.

Matching rules: - If spec is generic (AcceleratorSpecification), any accelerator matches - If spec is specific (CudaSpecification, MPSSpecification), types must match

memory: int = 0

Memory in bytes

min_memory: int = 0

Minimum request memory (in bytes)

model: str = ''

Model name

property unified_memory: bool

If True, memory is shared with CPU (e.g., Apple Silicon)

CudaSpecification

class experimaestro.launcherfinder.specs.CudaSpecification(memory: int = 0, model: str = '', min_memory: int = 0)

Bases: AcceleratorSpecification

NVIDIA CUDA GPU specification (dedicated GPU memory).

Only matches CUDA GPUs - will not match MPS or other accelerator types.

property accelerator_type: AcceleratorType

Type of accelerator (None for generic)

memory: int = 0

Memory (in bytes)

min_memory: int = 0

Minimum request memory (in bytes)

model: str = ''

CUDA card model name

MPSSpecification

class experimaestro.launcherfinder.specs.MPSSpecification(memory: int = 0, model: str = '', min_memory: int = 0)

Bases: AcceleratorSpecification

Apple Metal Performance Shaders (MPS) specification.

MPS uses unified memory - GPU memory is shared with CPU RAM. When a task requests GPU memory on MPS, it consumes system RAM.

Only matches MPS - will not match CUDA or other accelerator types.

property accelerator_type: AcceleratorType

Type of accelerator (None for generic)

memory: int = 0

Memory in bytes (shared with CPU)

min_memory: int = 0

Minimum request memory (in bytes)

model: str = ''

Apple Silicon model (e.g., ‘M1’, ‘M2 Pro’)

property unified_memory: bool

If True, memory is shared with CPU (e.g., Apple Silicon)

AcceleratorType

class experimaestro.launcherfinder.specs.AcceleratorType(value)

Types of accelerators supported.

CPUSpecification

class experimaestro.launcherfinder.specs.CPUSpecification(memory: int = 0, cores: int = 0, mem_per_cpu: int = 0, cpu_per_gpu: int = 0)
cores: int = 0

Number of cores

cpu_per_gpu: int = 0

Number of CPU per GPU (0 if not defined)

mem_per_cpu: int = 0

Memory per CPU (0 if not defined)

memory: int = 0

Memory in bytes

Experiments CLI

ExperimentHelper

class experimaestro.experiments.cli.ExperimentHelper(callable: ExperimentCallable)

Helper for experiments

callable: ExperimentCallable

Run function

classmethod decorator(*args, **kwargs)

Decorator for the run(helper, configuration) method

xp: Experiment

The experiment object

ConfigurationBase

class experimaestro.experiments.ConfigurationBase(*, id: str = '???', file: str = 'experiment', module: str | None = None, pythonpath: List[str] | None = None, parent: str | None = None, imports: List[str] | None = None, pre_experiment: str | None = None, title: str = '', subtitle: str = '', paper: str = '', description: str = '', add_timestamp: bool = False, dirty_git: DirtyGitAction = DirtyGitAction.WARN)

Base configuration for any experiment

add_timestamp: bool

Adds a timestamp YYYY_MM_DD-HH_MM to the experiment ID

description: str

Description of the experiment

dirty_git: DirtyGitAction

ignore, warn (default), error

Type:

Action when git repository has uncommitted changes

file: str

Relative path of the file containing a run function

id: str

ID of the experiment

This ID is used by experimaestro when running as the experiment.

imports: List[str] | None

List of YAML file paths to import (merged in order, current file wins)

module: str | None

Relative path of the file containing a run function

paper: str

Source paper for this experiment

parent: str | None

(Deprecated) Relative path of a YAML file that should be merged. Use imports instead.

pre_experiment: str | None

Python file path or module name to execute before importing the experiment.

Can be either: - A relative path to a Python file (e.g., “pre_setup.py”) - A Python module name (e.g., “mypackage.pre_experiment”)

This is useful for setting environment variables or mocking modules to speed up the experiment setup phase (e.g., mocking torch.compile or torch.nn). The actual job execution will use real modules.

pythonpath: List[str] | None

Python path relative to the parent directory of the YAML file

subtitle: str

Allows to give some more details about the experiment

title: str

Short description of the experiment

configuration

experimaestro.experiments.configuration(*args, **kwargs)

Method to define keyword only dataclasses

Configurations are keyword-only

HuggingFace Integration

ExperimaestroHFHub

Submit Hooks

SubmitHook

class experimaestro.core.types.SubmitHook

Hook called before the job is submitted to the scheduler

This allows modifying e.g. the run environnement

abstract process(job: Job, launcher: Launcher)

Apply the hook for the job/launcher

abstract spec()

Returns an identifier tuple for hashing/equality

Services

Service

class experimaestro.scheduler.services.Service(log_directory: Path | None = None)

An experiment service

Services can be associated with an experiment. They send notifications to service listeners.

To support restarting services from monitor mode, subclasses should override state_dict() to return the data needed to recreate the service, and implement from_state_dict() to recreate it.

add_listener(listener: ServiceListener)

Adds a listener

Parameters:

listener – The listener to add

cleanup_logging(stdout_handler, stderr_handler)

Clean up logging handlers - call at end of _serve()

description()

Human-readable description of the service

property error: str | None

Return error message if service failed to start

property experiment_id: str

Return the experiment ID this service belongs to

static from_state_dict(service_class: str, data: dict, path_translator: Callable[[str], Path] | None = None) Service

Recreate a service from a state dictionary.

Parameters:
  • service_class – Fully qualified class name (e.g., “module.ClassName”)

  • data – Dictionary from state_dict() (may be serialized)

  • path_translator – Optional function to translate remote paths to local. Used by remote clients to map paths to local cache.

Returns:

A new Service instance, or raises if the class cannot be loaded.

Raises:

ValueError – If __unserializable__ is True or class cannot be loaded

full_state_dict() dict

Serialize service to dictionary for JSON serialization.

Overrides BaseService.full_state_dict() to properly serialize Path objects.

remove_listener(listener: ServiceListener)

Removes a listener

Parameters:

listener – The listener to remove

property run_id: str

Return the run ID (timestamp format YYYYMMDD_HHMMSS)

static serialize_state_dict(data: dict) dict

Serialize a state_dict, converting Path objects to serializable format.

This is called automatically when storing services. Path values are converted to {“__path__”: “/path/string”} format.

Parameters:

data – Raw state_dict from service (should include __class__)

Returns:

Serializable dictionary with paths converted

set_error(error: str | None) None

Set error message and update state to ERROR

set_experiment(xp: Experiment) None

Called when the service is added to an experiment.

Override this method to access the experiment context (e.g., workdir). The base implementation stores the experiment reference and creates log directories.

Parameters:

xp – The experiment this service is being added to.

set_starting() None

Set state to STARTING and clear any previous error.

This is a no-op for live services - they manage their own state internally through get_url(). Only MockService uses this to manually control state for UI feedback.

setup_logging() tuple[FileHandler | None, FileHandler | None]

Setup logging handlers for service output

Returns tuple of (stdout_handler, stderr_handler) for cleanup. Call this at the start of _serve() to redirect service logs.

property state

Current service state

state_dict() dict

Return parameters needed to recreate this service.

Subclasses should override this to return constructor arguments. Path values are automatically serialized and restored (with translation for remote monitoring).

Example:

def state_dict(self):
    return {
        "log_dir": self.log_dir,  # Path is auto-handled
        "name": self.name,
    }
Returns:

Dict with constructor kwargs.

property stderr: Path | None

Return path to stderr log file

property stdout: Path | None

Return path to stdout log file

WebService

class experimaestro.scheduler.services.WebService(log_directory: Path | None = None)

Bases: Service

Base class for web-based experiment services.

Web services provide HTTP endpoints that can be accessed through the experimaestro web interface. When an experiment is running with a port configured, web services are automatically proxied through the main experimaestro server.

To implement a web service:

  1. Subclass WebService

  2. Set a unique id class attribute

  3. Implement the _serve() method to start your web server

  4. Set self.url and call running.set() when ready

  5. Optionally check self.should_stop() to handle graceful shutdown

Example:

class MyWebService(WebService):
    id = "myservice"

    def _serve(self, running: threading.Event):
        # Start your web server
        self.url = "http://localhost:8080"
        running.set()
        # Keep serving, checking for stop signal
        while not self.should_stop():
            time.sleep(1)
get_url()

Get the URL of this web service, starting it if needed.

If the service is not running, this method will start it and block until the URL is available. If the service is already starting or running, returns the existing URL.

Returns:

The URL where this service can be accessed

Raises:

RuntimeError – If called while service is stopping

serve()

Start the web service in a background thread.

This method creates a daemon thread that calls _serve().

should_stop() bool

Check if the service should stop.

Subclasses can call this in their _serve loop to check for graceful shutdown requests.

Returns:

True if stop() has been called

stop(timeout: float = 2.0)

Stop the web service.

This method signals the service to stop and waits for the thread to terminate. If the thread doesn’t stop gracefully within the timeout, it attempts to forcefully terminate it.

Parameters:

timeout – Seconds to wait for graceful shutdown before forcing

ServiceState

class experimaestro.scheduler.services.ServiceState(value)

State of a service lifecycle.

Services transition through these states:

STOPPED -> STARTING -> RUNNING -> STOPPING -> STOPPED
                   |-> ERROR (if start fails)

ServiceListener

class experimaestro.scheduler.services.ServiceListener

A service listener

Jobs

Job

class experimaestro.scheduler.jobs.Job(config: Config, *, workspace: Workspace = None, launcher: Launcher = None, run_mode: RunMode = RunMode.NORMAL, max_retries: int | None = None, transient: TransientMode = TransientMode.NONE, backup: bool | None = None)

A job is a resource that is produced by the execution of some code

async aio_done_handler()

Process remaining task outputs after job completion.

Called by: Scheduler.aio_final_state()

Ensures all remaining task output events are queued for processing by explicitly reading the task outputs file. The callbacks will complete asynchronously and decrement task_output_count when done.

async aio_run() Process

Actually run the code

Returns:

A Process instance representing the running job

property donepath: Path

When a job has been successful, this file is written

property environ

Returns the job environment

It is made of (by order of priority):

1. The job environment 1. The launcher environment 1. The workspace environment

property failedpath

When a job has been unsuccessful, this file is written with an error code inside

property jobpath: Path

Deprecated, use path

property locator: str

Full task locator (for BaseJob interface)

property lockpath

This file is used as a lock for running the job

property pidpath

This file contains the file PID

prepare(overwrite=False)

Prepare all files before starting a task

Parameters:

overwrite – if True, overwrite files even if the task has been run

process_state_dict() dict | None

Get process state as dictionary.

property python_path: Iterator[str]

Returns an iterator over python path

register_watched_outputs()

Register all watched outputs with the scheduler.

This should be called after the job is submitted and has a scheduler.

rotate_logs() None

Rotate log files before restarting a task.

Renames non-empty stdout and stderr files with a timestamp suffix (e.g., job.20231215143022.out) to preserve logs from previous runs.

property scheduler_state: JobState

Scheduler lifecycle state (independent from execution state)

set_scheduler_state(new_state: JobState)

Set the scheduler lifecycle state.

Updates experiment statistics and notifies listeners. This does NOT update the execution state (_state) — that is updated only by load_from_disk() and apply_event().

Parameters:

new_state – The new scheduler state

property task_id: str

Task class identifier (for BaseJob interface)

watch_output(watched: WatchedOutput)

Add a watched output to this job.

Parameters:

watched – A description of the watched output

Base Launcher

Launcher

class experimaestro.launchers.Launcher(connector: Connector, *, priority: float = 0)

Bases: ABC

Base class for task launchers.

Launchers are responsible for executing tasks on a compute resource. They work with a Connector to access the target system and manage process execution.

Subclasses include:

Parameters:
  • connector – The connector to use for accessing the compute resource

  • priority – Priority for launcher selection in DynamicLauncher (higher = preferred)

static get(path: Path)

Get a default launcher for a given path

abstract launcher_info_code() str

Returns Python code to set up launcher info during task execution.

This code is inserted into the generated task script to set up launcher-specific information (like LauncherInformation for querying remaining time).

Returns:

Python code as a string, or empty string if no setup needed.

onSubmit(job: Job)

Called when submitting a job

Example of use: this allows the launcher to add token dependencies

priority: float

Priority for launcher selection (higher values = higher priority)

processbuilder() ProcessBuilder

Returns the process builder for this launcher

By default, returns the associated connector builder

abstract scriptbuilder() ScriptBuilder

Returns a script builder

Click Integration

forwardoption

class experimaestro.click.forwardoption(path=[])

Allows to access an argument of the configuration

This allows to refer to a path of a class in a “python” syntax, e.g. @forwardoption.ranker.optimizer.epsilon(MyConfig) or @forwardoption.ranker.optimizer.epsilon(MyConfig, “option-name”)

default can be changed by setting the option