API Reference
This section provides the API documentation for the experimaestro module.
Core Classes
The core classes form the foundation of experimaestro’s configuration and task system.
Config
- XPM Configexperimaestro.Config
Bases:
objectBase type for all objects in python interface
- XPMConfig
alias of
XPMConfig
- __post_init__()
Called after the object __init__() and with properties set
- register_task_output(method, *args, **kwargs)
Register a task output for dynamic callbacks.
This method is used to register outputs that can trigger callbacks when produced during task execution.
- classmethod value_class()
Decorator to register an external value class for this configuration.
This allows declaring a separate class that will be used when creating instances, which is useful to avoid initializing resources (e.g., PyTorch) when only configuring.
The value class must be a subclass of the configuration class and a subclass of parent configuration value classes (if any).
ConfigMixin
- class experimaestro.core.objects.ConfigMixin(**kwargs)
Class for configuration objects
- __validate__()
Validate the values
- add_dependencies(*dependencies)
Adds tokens to the task
- copy()
Returns a copy of this configuration (ignores other non parameters attributes)
- copy_dependencies(other: ConfigMixin)
Add all the dependencies from other configuration
- instance(context: ConfigWalkContext | None = None, *, objects: ObjectStore | None = None, keep: bool = True) T
Return an instance with the current values
- Parameters:
context – The context when computing the instance
objects – The previously built objects (so that we avoid re-creating instances of past configurations)
keep – register a configuration in the __config__ field of the instance
- submit(*, workspace=None, launcher=None, run_mode: experimaestro.scheduler.workspace.RunMode = None, init_tasks: List[LightweightTask] = [], max_retries: int | None = None, transient: TransientMode = None)
Submit this task
- Parameters:
workspace – the workspace, defaults to None
launcher – The launcher, defaults to None
run_mode – Run mode (if None, uses the workspace default)
max_retries – Maximum number of retries for resumable tasks that timeout (default: from workspace settings or 3)
transient – Transient mode for intermediary tasks (see TransientMode)
- Returns:
an object object
- tags()
Returns the tag associated with this object (and below)
Task
- XPM Taskexperimaestro.Task
Bases:
LightweightTaskBase class for tasks
- watch_output(method, callback)
Sets up a callback
- Parameters:
method – a method within a configuration
callback – the callback
ResumableTask
- XPM Taskexperimaestro.ResumableTask
Bases:
TaskBase class for resumable/checkpointable tasks
Resumable tasks can be restarted if they are stopped by a time limit (e.g., SLURM job timeout). The task directory and dynamic outputs are preserved across restarts to allow checkpoint recovery.
- remaining_time() float | None
Returns the remaining time in seconds before the job times out.
This is useful for checkpointing before hitting a time limit (e.g., SLURM walltime).
- Returns:
There is no time limit
The launcher doesn’t support querying remaining time
The task is not running
- Return type:
The remaining time in seconds, or None if
LightweightTask
InstanceConfig
- XPM Configexperimaestro.InstanceConfig
Bases:
ConfigBase class for configurations where instance identity matters.
When a Config class derives from InstanceConfig instead of Config, instances are distinguished based on their object identity when used in containers. This enables distinguishing between shared and separate instances even when all parameters are identical.
Example
>>> class SubModel(InstanceConfig): ... value: Param[int] = 100 >>> class MainModel(Config): ... m1: Param[SubModel] ... m2: Param[SubModel] >>> >>> sm1 = SubModel.C() >>> sm2 = SubModel.C() # Same params, different instance >>> >>> # Shared instance (same object used twice) >>> shared = MainModel.C(m1=sm1, m2=sm1) >>> >>> # Separate instances (different objects) >>> separate = MainModel.C(m1=sm1, m2=sm2) >>> >>> # Different identifiers: shared vs separate >>> shared.__identifier__() != separate.__identifier__()
The instance order is determined by the traversal order during identifier computation, ensuring reproducibility.
Type Annotations
Type annotations are used to declare parameters in configurations and tasks.
Param
- experimaestro.Param
Type annotation for configuration parameters.
Parameters annotated with
Param[T]are included in the configuration identifier computation and must be set before the configuration is sealed.
Meta
- experimaestro.Meta
Type annotation for meta-parameters (ignored in identifier computation).
Use
Meta[T]for parameters that should not affect the task identity, such as output paths or runtime configuration.
Constant
- experimaestro.Constant
Type annotation for constant (read-only) parameters.
Constants must have a default value and cannot be modified after creation.
DataPath
- experimaestro.DataPath
Type annotation for data paths that should be serialized.
Use
DataPathfor paths that point to data files that should be preserved when serializing/deserializing a configuration.
DependentMarker
- experimaestro.DependentMarker
Type alias for dependency marker functions used in
task_outputs()and dynamic output methods.
Experiment Management
experiment
- class experimaestro.experiment(env: Path | str | WorkspaceSettings, name: str, *, host: str | None = None, port: int | None = None, token: str | None = None, run_mode: RunMode | None = None, launcher=None, register_signals: bool = True, project_paths: list[Path] | None = None, wait_for_quit: bool = False, dirty_git: DirtyGitAction = DirtyGitAction.WARN, no_db: bool = False, no_environmental_impact: bool = False)
Bases:
BaseExperimentContext manager for running experiments.
Creates a workspace, manages task submission, and optionally starts a web server for monitoring.
Implements BaseExperiment interface for use with StateProvider and TUI.
Example:
from experimaestro import experiment with experiment("./workdir", "my-experiment", port=12345) as xp: task = MyTask.C(param=42).submit() result = task.wait()
- add_service(service: ServiceClass) ServiceClass
Adds a service (e.g. tensorboard viewer) to the experiment
- Parameters:
service – A service instance
- Returns:
The same service instance (or existing service if already added)
- static current() experiment
Returns the current experiment, but checking first if set
If there is no current experiment, raises an AssertError
- wait()
Wait until the running processes have finished
Workspace
- class experimaestro.Workspace(settings: Settings, workspace_settings: WorkspaceSettings, launcher=None, run_mode: RunMode | None = None)
Bases:
objectWorkspace environment for experiments
This is a simple container for workspace settings, environment, and configuration. Multiple Workspace instances can exist for the same path - the singleton pattern is handled by WorkspaceStateProvider which manages the database per workspace path.
- property alt_jobspaths
Yield
<folder>/jobsfor every attached folder.Used for read-through job lookups across all modes.
- cleanup_old_scheduler_runs(retention_days: int = 15, force: bool = False) tuple[int, int]
Clean up old scheduler run directories
- Parameters:
retention_days – Delete run directories older than this many days
force – Force cleanup even if recently run
- Returns:
Tuple of (directories_deleted, errors_count)
- property configcachepath
Folder for jobs
- property connector
Returns the default connector
- property experimentspath
Folder for experiments
- property folders: List[FolderSettings]
Auxiliary folders attached to this workspace.
Includes both the new
folderssetting and the deprecatedalt_workspacesfield (treated asmode=use). Beta.
- property jobspath
Folder for jobs
- property partialspath
Folder for partial job directories (shared checkpoints, etc.)
RunMode
Tagging
tag
- experimaestro.tag(value)
Tag a parameter value for tracking in experiments.
Tagged values appear in experiment logs and can be used for filtering and organizing results. Tags are included in the task’s
__tags__dictionary.Example:
task = MyTask.C( learning_rate=tag(0.001), # Will appear in task tags batch_size=32, ).submit()
- Parameters:
value – The value to tag (str, int, float, or bool)
- Returns:
A tagged value wrapper that preserves the original value
Utilities
setmeta
- experimaestro.setmeta(config: Config, flag: bool)
Force a configuration to be treated as a meta-parameter.
When a configuration is marked as meta, it is excluded from the identifier computation of its parent configuration.
Example:
class Ensemble(Config): model1: Param[Model] model2: Param[Model] # Mark model2 as meta - it won't affect the ensemble's identifier model2 = setmeta(Model.C(...), True) ensemble = Ensemble.C(model1=model1, model2=model2)
- Parameters:
config – The configuration to mark
flag – True to mark as meta, False to include in identifier
- Returns:
The same configuration (for chaining)
sealed_set
- experimaestro.sealed_set(*elements: Config) set[Config]
Create a set of sealed Config objects.
Each element is sealed (its identifier computed and cached) before being added to the set. This makes Config objects hashable by their identifier, enabling use in Python sets.
Non-Config elements (primitives, enums, etc.) are passed through as-is.
Example:
model1 = Model.C(lr=0.01) model2 = Model.C(lr=0.02) ensemble = Ensemble.C(models=sealed_set(model1, model2))
- Parameters:
elements – Config objects to seal and add to the set
- Returns:
A set containing the sealed elements
- Raises:
TypeError – If a Config element cannot be sealed
cache
- experimaestro.cache(name: str)
Decorator for caching method results to disk.
The cache is stored in the workspace’s config directory, keyed by the configuration’s identifier.
Example:
class MyConfig(Config): data_path: Param[Path] @cache("processed.pkl") def process(self, cache_path: Path): if cache_path.exists(): return pickle.load(cache_path.open("rb")) result = expensive_computation(self.data_path) pickle.dump(result, cache_path.open("wb")) return result
- Parameters:
name – Filename for the cache file
- Returns:
A decorator that wraps the method with caching logic
initializer
- experimaestro.initializer(method)
Decorator for methods that should only execute once.
After the first call, subsequent calls return the cached result. This is useful for lazy initialization of expensive resources.
Example:
class MyConfig(Config): @initializer def model(self): return load_expensive_model()
- Parameters:
method – The method to wrap
- Returns:
A wrapper that caches the result after first execution
tqdm
- experimaestro.tqdm(**kwargs) xpm_tqdm
- experimaestro.tqdm(iterable: Iterator[T] | None = None, **kwargs) Iterator[T]
Create an experimaestro-aware progress bar.
A drop-in replacement for
tqdm.tqdmthat automatically reports progress to job event files. Use this in taskexecute()methods.Example:
- Parameters:
iterable – Iterable to wrap (optional)
kwargs – Additional arguments passed to tqdm
- Returns:
A progress bar iterator
progress
- experimaestro.progress(value: float, level=0, desc: str | None = None, console=False)
Report task progress.
Call this function from within a running task to report progress. Progress is written to job event files and displayed in monitors.
Example:
- Parameters:
value – Progress value between 0.0 and 1.0
level – Nesting level for nested progress bars (default: 0)
desc – Optional description of the current operation
console – If True, also print to console
Field Definitions
field
- class experimaestro.field(*, default: Any | None = None, default_factory: Callable | None = None, ignore_default: bool | Any | None = None, ignore_generated=False, overrides=False, groups: list[ParameterGroup] | None = None)
Specify additional properties for a configuration parameter.
Use
field()to control default value behavior and parameter grouping.Default value options and identifier behavior:
defaultThe parameter has a default value that is always included in the task identifier. Two configs with different values always get different identifiers, even if one uses the default.
default_factoryA callable (zero-argument) that produces the default value. Behaves like
default— the value is always included in the identifier. OnMetafields, the callable is invoked at seal time (e.g.PathGenerator).ignore_default(bool)When
Trueand combined withdefaultordefault_factory, the default value is excluded from the identifier when the actual value equals the default. This is the backwards-compatible behavior matching bare defaults (x: Param[int] = 23, which is deprecated).
Example:
class MyConfig(Config): # Default always included in identifier count: Param[int] = field(default=10) # Factory default always included in identifier fabric: Param[FabricConfig] = field( default_factory=FabricConfig.C ) # Default ignored in identifier when value == default threshold: Param[float] = field(default=0.5, ignore_default=True) # Factory default ignored when value == default fabric: Param[FabricConfig] = field( default_factory=FabricConfig.C, ignore_default=True ) # Generated path (Meta field, excluded from identifier) output: Meta[Path] = field( default_factory=PathGenerator("out.txt") ) # Parameter in a group (for partial identifiers) lr: Param[float] = field(groups=[training_group])
param_group
- experimaestro.param_group(name: str) ParameterGroup
Create a parameter group for use with partial identifiers.
Parameter groups allow computing partial identifiers that exclude certain parameters, enabling shared directories across related tasks.
Example:
- Parameters:
name – Unique name for this parameter group
- Returns:
A ParameterGroup object
partial
- experimaestro.partial(*, exclude_groups: list[ParameterGroup] | None = None, include_groups: list[ParameterGroup] | None = None, exclude_no_group: bool = False, exclude_all: bool = False) Partial
Create a partial specification for partial identifier computation.
Partials allow tasks to share directories when they differ only in certain parameter groups (e.g., training hyperparameters).
Example:
training_group = param_group("training") class Train(Task): model: Param[Model] epochs: Param[int] = field(groups=[training_group]) checkpoint: Meta[Path] = field( default_factory=PathGenerator( "model.pt", partial=partial(exclude_groups=[training_group]) ) )
- Parameters:
exclude_groups – Parameter groups to exclude from identifier
include_groups – Parameter groups to always include (overrides exclusion)
exclude_no_group – If True, exclude parameters with no group assigned
exclude_all – If True, exclude all parameters by default
- Returns:
A Partial object
PathGenerator
- class experimaestro.PathGenerator(path: str | Path | Callable[[ConfigWalkContext, Config], Path] = '', *, partial: Partial = None)
Generate paths within the task directory.
Use
PathGeneratorwithfield(default_factory=...)to create paths relative to the task’s working directory.Example:
class MyTask(Task): output: Meta[Path] = field(default_factory=PathGenerator("results.json")) model: Meta[Path] = field(default_factory=PathGenerator("model.pt"))
For shared directories across related tasks, use with partial:
training_group = param_group("training") class Train(Task): epochs: Param[int] = field(groups=[training_group]) checkpoint: Meta[Path] = field( default_factory=PathGenerator( "model.pt", partial=partial(exclude=[training_group]) ) )
- Parameters:
path – Relative path within the task directory. Can be a string, Path, or callable that takes (context, config) and returns a Path.
partial – Optional partial for partial directory sharing. When provided, the path is generated in a shared partial directory.
- isoutput()
Returns True if this generator is a task output (e.g. generates a path within the job folder)
Deprecation
deprecate
- experimaestro.deprecate(config_or_target: Type[Config] | Callable | None = None, *, replace: bool = False)
Deprecate a configuration/task class or a parameter.
Deprecated configurations maintain backwards compatibility while allowing migration to new structures. The identifier is computed from the converted configuration, ensuring consistency.
Usage patterns:
Simple deprecation (class inherits from new class):
@deprecate class OldConfig(NewConfig): pass
Deprecation with conversion:
@deprecate(NewConfig) class OldConfig(Config): value: Param[int] def __convert__(self): return NewConfig.C(values=[self.value])
Immediate replacement:
@deprecate(NewConfig, replace=True) class OldConfig(Config): value: Param[int] def __convert__(self): return NewConfig.C(values=[self.value])
Deprecate a parameter:
- Parameters:
config_or_target – Target class for conversion, or the deprecated class/method when used as a simple decorator
replace – If True, creating the deprecated class immediately returns the converted instance
Exceptions
GracefulTimeout
- class experimaestro.GracefulTimeout(message: str = 'Task stopped gracefully before timeout')
Bases:
ExceptionException raised to signal a graceful timeout in resumable tasks.
Raise this exception when a task needs to checkpoint and exit before a time limit (e.g., SLURM walltime). The task will be marked for retry rather than as failed.
Example:
```python class LongTraining(ResumableTask): def execute(self): for epoch in range(self.epochs): remaining = self.remaining_time() if remaining is not None and remaining < 300: save_checkpoint(self.checkpoint, epoch) raise GracefulTimeout("Not enough time for another epoch") train_one_epoch() ```
Serialization
save
- experimaestro.save(obj: Any, save_directory: Path | None, definition_filename: str = 'experimaestro.json')
Save a configuration to a directory.
The serialization process stores the configuration in the definition file and copies any files or folders registered as DataPath parameters.
Example:
config = MyConfig.C(data_path=Path("/data/file.txt")) save(config, Path("/output/saved_config"))
- Parameters:
obj – The configuration to save
save_directory – The directory in which the object and its data will be saved
definition_filename – The filename for the definition file (default: “experimaestro.json”)
load
- experimaestro.load(path: str | Path | SerializedPathLoader, as_instance: bool = False, partial_loading: bool | None = None, definition_filename: str | None = None) Tuple[Any, List[LightweightTask]]
Load a configuration from a directory.
Restores a configuration previously saved with
save().Example:
config = load(Path("/output/saved_config"))
- Parameters:
path – Directory containing the saved configuration, or a function that resolves relative paths to absolute ones
as_instance – If True, return an instance instead of a config
partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.
definition_filename – The definition filename. If None, tries “experimaestro.json” first, then falls back to “definition.json”.
- Returns:
The loaded configuration or instance
serialize
- experimaestro.serialize(obj: Any, save_directory: Path, *, init_tasks: list[LightweightTask] = [], definition_filename: str = 'experimaestro.json')
Serialize a configuration to a directory with initialization tasks.
Similar to
save(), but also stores lightweight initialization tasks that should be run when the configuration is deserialized.- Parameters:
obj – The configuration to serialize
save_directory – The directory in which the object and its data will be saved
init_tasks – List of lightweight tasks to run on deserialization
definition_filename – The filename for the definition file (default: “experimaestro.json”)
deserialize
- experimaestro.deserialize(path: str | Path | SerializedPathLoader, as_instance: bool = False, partial_loading: bool | None = None, definition_filename: str | None = None) tuple[Any, List[LightweightTask]] | Any
Deserialize a configuration from a directory.
Restores a configuration previously saved with
serialize(). Whenas_instance=True, runs any stored initialization tasks.- Parameters:
path – Directory containing the serialized configuration, or a function that resolves relative paths to absolute ones
as_instance – If True, return an instance and run init tasks
partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.
definition_filename – The definition filename. If None, tries “experimaestro.json” first, then falls back to “definition.json”.
- Returns:
The configuration/instance (if as_instance), or tuple of (configuration, init_tasks)
from_task_dir
- experimaestro.from_task_dir(path: str | Path | SerializedPathLoader, as_instance: bool = False, partial_loading: bool | None = None)
Load a task configuration from a task directory.
Loads the task parameters from a job directory (containing params.json). This is useful for reloading task configurations after execution.
- Parameters:
path – Task directory containing params.json, or a function that resolves relative paths to absolute ones
as_instance – If True, return an instance instead of a config
partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.
- Returns:
The loaded task configuration or instance
state_dict
- experimaestro.state_dict(context: SerializationContext, obj: Any)
Convert an object to a state dictionary for serialization.
Returns a dictionary representation that can be serialized to JSON and later restored with
from_state_dict().- Parameters:
context – The serialization context
obj – The object to serialize
- Returns:
A dictionary with ‘objects’ and ‘data’ keys
from_state_dict
- experimaestro.from_state_dict(state: Dict[str, Any], path: None | str | Path | SerializedPathLoader = None, *, as_instance: bool = False, partial_loading: bool | None = None)
Load an object from a state dictionary.
Restores a configuration from a dictionary previously created by
state_dict().- Parameters:
state – The state dictionary to load from
path – Directory containing data files, or a function that resolves relative paths to absolute ones
as_instance – If True, return an instance instead of a config
partial_loading – If True, skip loading task references. If None (default), partial_loading is enabled when as_instance is True.
- Returns:
The loaded configuration or instance
SerializationContext
- class experimaestro.SerializationContext(*, save_directory: Path | None = None)
Context when serializing experimaestro configurations
- serialize(var_path: List[str], data_path: Path, config: ConfigMixin) SerializedPath
Serialize data files into the save directory
- Parameters:
var_path – The variable path (list of field names from root)
data_path – The path to the data file/folder to serialize
config – The config object owning this data path
- Returns:
A SerializedPath referencing the serialized data
- Raises:
ValueError – If the destination path was already used
load_xp_info
- experimaestro.load_xp_info(path: str | Path) ExperimentInfo
Load all serialized objects from an experiment run directory.
Reads
objects.jsonl(streaming format) to reconstruct job configs and actions. Usesjobs.jsonlfor job IDs andstatus.jsonfor action IDs to classify entries.Falls back to
configs.jsonfor experiments created before theobjects.jsonlformat was introduced.This is a standalone function – no experiment context or
WorkspaceStateProvideris required.- Parameters:
path – Path to the experiment run directory
- Returns:
ExperimentInfo with .jobs and .actions dictionaries
- Raises:
FileNotFoundError – If neither objects.jsonl nor configs.json exists
ExperimentInfo
Launchers
DirectLauncher
- class experimaestro.launchers.direct.DirectLauncher(connector: Connector, *, priority: float = 0)
Bases:
LauncherLauncher that runs tasks directly as local processes.
This is the default launcher that executes tasks on the local machine without any job scheduler. Tasks are run as Python subprocesses.
- Parameters:
connector – The connector to use (defaults to LocalConnector)
- static get_cli()
Returns the CLI group for direct launcher commands.
- scriptbuilder()
Returns a script builder
SlurmLauncher
- class experimaestro.launchers.slurm.SlurmLauncher(*, connector: Connector | None = None, options: SlurmOptions | None = None, interval: float = 60, main=None, launcherenv: Dict[str, str] | None = None, binpath='/usr/bin')
Bases:
LauncherSlurm workload manager launcher
https://slurm.schedmd.com/documentation.html
- config(**kwargs)
Returns a new Slurm launcher with the given configuration
- property key
Returns a dictionary characterizing this launcher when calling sacct/etc
- processbuilder() SlurmProcessBuilder
Returns the process builder for this launcher
By default, returns the associated connector builder
- scriptbuilder()
Returns the script builder
We assume Unix, but should be changed to PythonScriptBuilder when working
SlurmOptions
- class experimaestro.launchers.slurm.SlurmOptions(nodes: int | None = 1, time: str | None = None, account: str | None = None, qos: str | None = None, partition: str | None = None, constraint: str | None = None, mem: str | None = None, exclude: str | None = None, mem_per_gpu: str | None = None, cpus_per_task: str | None = None, nodelist: str | None = None, ntasks_per_node: int | None = None, gpus: int | None = None, gpus_per_node: int | None = None)
-
- static format_time(duration_s: int)
Format time for the SLURM option
- Parameters:
duration_s – Time duration in seconds1
- Returns:
The configuration string
Connectors
LocalConnector
- class experimaestro.connectors.local.LocalConnector(localpath: Path | None = None)
Bases:
ConnectorConnector for executing tasks on the local machine.
This connector handles local file system operations and process execution. It is the default connector used when no remote execution is needed.
Use
instance()to get a singleton instance of the local connector.- Parameters:
localpath – Base path for experimaestro data. Defaults to
~/.local/share/experimaestroor the value ofXPM_WORKDIRenvironment variable.
SshConnector
Launcher Finder
find_launcher
parse
- experimaestro.launcherfinder.parser.parse(expr: str)
Parse a requirement specification string into a HostRequirement object.
The specification string describes hardware requirements for running a task. Multiple alternatives can be specified using
|(OR), and requirements within an alternative are combined using&(AND).Syntax elements:
duration=<N><unit>: Job duration (units: h/hours, d/days, m/mins)cpu(mem=<size>, cores=<N>): CPU requirementscuda(mem=<size>) * <N>: NVIDIA CUDA GPU requirements (memory and count)mps(mem=<size>) * <N>: Apple MPS GPU requirements (unified memory)gpu(mem=<size>) * <N>: Generic GPU requirements (matches any accelerator)Memory sizes:
<N>G,<N>GiB,<N>M,<N>MiB
Accelerator types:
cuda: NVIDIA CUDA GPUs only (dedicated memory)mps: Apple Silicon MPS only (unified memory with CPU)gpu: Any accelerator type (cross-platform)
- Parameters:
expr – The requirement specification string
- Returns:
A
HostRequirementobject
Examples:
from experimaestro.launcherfinder.parser import parse # Request 2 NVIDIA GPUs with 32GB each, 700GB RAM, for 40 hours req = parse("duration=40h & cpu(mem=700GiB) & cuda(mem=32GiB) * 2") # Cross-platform: CUDA on Linux/Windows OR MPS on macOS req = parse( "duration=4h & cuda(mem=8GiB)" " | duration=4h & mps(mem=8GiB)" ) # Generic GPU requirement (matches any accelerator) req = parse("duration=2h & gpu(mem=4GiB)")
HostRequirement
- class experimaestro.launcherfinder.specs.HostRequirement
A requirement must be a disjunction of host requirements
- abstract multiply_duration(coefficient: float) HostRequirement
Returns a new HostRequirement with a duration multiplied by the provided coefficient
- requirements: List[HostSimpleRequirement]
List of requirements (by order of priority)
HostSimpleRequirement
- class experimaestro.launcherfinder.specs.HostSimpleRequirement(*reqs: HostSimpleRequirement)
Simple host requirement
- accelerators: List[AcceleratorSpecification]
Specification for accelerators (GPUs)
- cpu: CPUSpecification
Specification for CPU
- property cuda_gpus: List[CudaSpecification]
CUDA GPUs (backwards compatibility alias).
Returns only CUDA accelerators from the accelerators list.
- multiply_duration(coefficient: float) HostSimpleRequirement
Returns a new HostRequirement with a duration multiplied by the provided coefficient
HostSpecification
- class experimaestro.launcherfinder.specs.HostSpecification(*, accelerators: List[AcceleratorSpecification] = NOTHING, cuda: List[CudaSpecification] = NOTHING, cpu: CPUSpecification = NOTHING, priority: int = 0, max_duration: int = 0, min_gpu: int = 0)
Specifies how the host is set.
Supports both CUDA GPUs and other accelerators (MPS, ROCm, etc.). Use accelerators for the generic list, or cuda for backwards compatibility.
Examples
# New style - generic accelerators host = HostSpecification(accelerators=[CudaSpecification(memory=24*1024**3)])
# Backwards compatible - cuda shorthand host = HostSpecification(cuda=[CudaSpecification(memory=24*1024**3)])
- accelerators: List[AcceleratorSpecification]
All accelerators (GPUs) available on this host
- cpu: CPUSpecification
CPU specification for this host
- cuda: List[CudaSpecification]
CUDA GPUs (backwards compatibility, merged into accelerators)
AcceleratorSpecification
- class experimaestro.launcherfinder.specs.AcceleratorSpecification(memory: int = 0, model: str = '', min_memory: int = 0)
Generic accelerator (GPU-like device) specification.
This can match any accelerator type (CUDA, MPS, ROCm, etc.) based on memory requirements alone. Use this when you don’t care about the specific accelerator type.
For type-specific requirements, use CudaSpecification or MPSSpecification.
- property accelerator_type: AcceleratorType | None
Type of accelerator (None for generic)
- match(spec: AcceleratorSpecification) bool
Returns True if this host accelerator can satisfy the spec requirement.
Matching rules: - If spec is generic (AcceleratorSpecification), any accelerator matches - If spec is specific (CudaSpecification, MPSSpecification), types must match
CudaSpecification
- class experimaestro.launcherfinder.specs.CudaSpecification(memory: int = 0, model: str = '', min_memory: int = 0)
Bases:
AcceleratorSpecificationNVIDIA CUDA GPU specification (dedicated GPU memory).
Only matches CUDA GPUs - will not match MPS or other accelerator types.
- property accelerator_type: AcceleratorType
Type of accelerator (None for generic)
MPSSpecification
- class experimaestro.launcherfinder.specs.MPSSpecification(memory: int = 0, model: str = '', min_memory: int = 0)
Bases:
AcceleratorSpecificationApple Metal Performance Shaders (MPS) specification.
MPS uses unified memory - GPU memory is shared with CPU RAM. When a task requests GPU memory on MPS, it consumes system RAM.
Only matches MPS - will not match CUDA or other accelerator types.
- property accelerator_type: AcceleratorType
Type of accelerator (None for generic)
AcceleratorType
- class experimaestro.launcherfinder.specs.AcceleratorType(value)
Types of accelerators supported.
CPUSpecification
Experiments CLI
ExperimentHelper
ConfigurationBase
- class experimaestro.experiments.ConfigurationBase(*, id: str = '???', file: str = 'experiment', module: str | None = None, pythonpath: List[str] | None = None, parent: str | None = None, imports: List[str] | None = None, pre_experiment: str | None = None, title: str = '', subtitle: str = '', paper: str = '', description: str = '', add_timestamp: bool = False, dirty_git: DirtyGitAction = DirtyGitAction.WARN)
Base configuration for any experiment
- dirty_git: DirtyGitAction
ignore, warn (default), error
- Type:
Action when git repository has uncommitted changes
- parent: str | None
(Deprecated) Relative path of a YAML file that should be merged. Use
importsinstead.
- pre_experiment: str | None
Python file path or module name to execute before importing the experiment.
Can be either: - A relative path to a Python file (e.g., “pre_setup.py”) - A Python module name (e.g., “mypackage.pre_experiment”)
This is useful for setting environment variables or mocking modules to speed up the experiment setup phase (e.g., mocking torch.compile or torch.nn). The actual job execution will use real modules.
configuration
- experimaestro.experiments.configuration(*args, **kwargs)
Method to define keyword only dataclasses
Configurations are keyword-only
HuggingFace Integration
ExperimaestroHFHub
Submit Hooks
SubmitHook
Services
Service
- class experimaestro.scheduler.services.Service(log_directory: Path | None = None)
An experiment service
Services can be associated with an experiment. They send notifications to service listeners.
To support restarting services from monitor mode, subclasses should override
state_dict()to return the data needed to recreate the service, and implementfrom_state_dict()to recreate it.- add_listener(listener: ServiceListener)
Adds a listener
- Parameters:
listener – The listener to add
- cleanup_logging(stdout_handler, stderr_handler)
Clean up logging handlers - call at end of _serve()
- description()
Human-readable description of the service
- static from_state_dict(service_class: str, data: dict, path_translator: Callable[[str], Path] | None = None) Service
Recreate a service from a state dictionary.
- Parameters:
service_class – Fully qualified class name (e.g., “module.ClassName”)
data – Dictionary from
state_dict()(may be serialized)path_translator – Optional function to translate remote paths to local. Used by remote clients to map paths to local cache.
- Returns:
A new Service instance, or raises if the class cannot be loaded.
- Raises:
ValueError – If __unserializable__ is True or class cannot be loaded
- full_state_dict() dict
Serialize service to dictionary for JSON serialization.
Overrides BaseService.full_state_dict() to properly serialize Path objects.
- remove_listener(listener: ServiceListener)
Removes a listener
- Parameters:
listener – The listener to remove
- static serialize_state_dict(data: dict) dict
Serialize a state_dict, converting Path objects to serializable format.
This is called automatically when storing services. Path values are converted to {“__path__”: “/path/string”} format.
- Parameters:
data – Raw state_dict from service (should include __class__)
- Returns:
Serializable dictionary with paths converted
- set_experiment(xp: Experiment) None
Called when the service is added to an experiment.
Override this method to access the experiment context (e.g., workdir). The base implementation stores the experiment reference and creates log directories.
- Parameters:
xp – The experiment this service is being added to.
- set_starting() None
Set state to STARTING and clear any previous error.
This is a no-op for live services - they manage their own state internally through get_url(). Only MockService uses this to manually control state for UI feedback.
- setup_logging() tuple[FileHandler | None, FileHandler | None]
Setup logging handlers for service output
Returns tuple of (stdout_handler, stderr_handler) for cleanup. Call this at the start of _serve() to redirect service logs.
- property state
Current service state
- state_dict() dict
Return parameters needed to recreate this service.
Subclasses should override this to return constructor arguments. Path values are automatically serialized and restored (with translation for remote monitoring).
Example:
def state_dict(self): return { "log_dir": self.log_dir, # Path is auto-handled "name": self.name, }
- Returns:
Dict with constructor kwargs.
WebService
- class experimaestro.scheduler.services.WebService(log_directory: Path | None = None)
Bases:
ServiceBase class for web-based experiment services.
Web services provide HTTP endpoints that can be accessed through the experimaestro web interface. When an experiment is running with a port configured, web services are automatically proxied through the main experimaestro server.
To implement a web service:
Subclass
WebServiceSet a unique
idclass attributeImplement the
_serve()method to start your web serverSet
self.urland callrunning.set()when readyOptionally check
self.should_stop()to handle graceful shutdown
Example:
class MyWebService(WebService): id = "myservice" def _serve(self, running: threading.Event): # Start your web server self.url = "http://localhost:8080" running.set() # Keep serving, checking for stop signal while not self.should_stop(): time.sleep(1)
- get_url()
Get the URL of this web service, starting it if needed.
If the service is not running, this method will start it and block until the URL is available. If the service is already starting or running, returns the existing URL.
- Returns:
The URL where this service can be accessed
- Raises:
RuntimeError – If called while service is stopping
- serve()
Start the web service in a background thread.
This method creates a daemon thread that calls
_serve().
- should_stop() bool
Check if the service should stop.
Subclasses can call this in their _serve loop to check for graceful shutdown requests.
- Returns:
True if stop() has been called
- stop(timeout: float = 2.0)
Stop the web service.
This method signals the service to stop and waits for the thread to terminate. If the thread doesn’t stop gracefully within the timeout, it attempts to forcefully terminate it.
- Parameters:
timeout – Seconds to wait for graceful shutdown before forcing
ServiceState
- class experimaestro.scheduler.services.ServiceState(value)
State of a service lifecycle.
Services transition through these states:
STOPPED -> STARTING -> RUNNING -> STOPPING -> STOPPED |-> ERROR (if start fails)
ServiceListener
- class experimaestro.scheduler.services.ServiceListener
A service listener
Jobs
Job
- class experimaestro.scheduler.jobs.Job(config: Config, *, workspace: Workspace = None, launcher: Launcher = None, run_mode: RunMode = RunMode.NORMAL, max_retries: int | None = None, transient: TransientMode = TransientMode.NONE, backup: bool | None = None)
A job is a resource that is produced by the execution of some code
- async aio_done_handler()
Process remaining task outputs after job completion.
Called by: Scheduler.aio_final_state()
Ensures all remaining task output events are queued for processing by explicitly reading the task outputs file. The callbacks will complete asynchronously and decrement task_output_count when done.
- async aio_run() Process
Actually run the code
- Returns:
A Process instance representing the running job
- property environ
Returns the job environment
It is made of (by order of priority):
1. The job environment 1. The launcher environment 1. The workspace environment
- property failedpath
When a job has been unsuccessful, this file is written with an error code inside
- property lockpath
This file is used as a lock for running the job
- property pidpath
This file contains the file PID
- prepare(overwrite=False)
Prepare all files before starting a task
- Parameters:
overwrite – if True, overwrite files even if the task has been run
- register_watched_outputs()
Register all watched outputs with the scheduler.
This should be called after the job is submitted and has a scheduler.
- rotate_logs() None
Rotate log files before restarting a task.
Renames non-empty stdout and stderr files with a timestamp suffix (e.g., job.20231215143022.out) to preserve logs from previous runs.
- set_scheduler_state(new_state: JobState)
Set the scheduler lifecycle state.
Updates experiment statistics and notifies listeners. This does NOT update the execution state (_state) — that is updated only by load_from_disk() and apply_event().
- Parameters:
new_state – The new scheduler state
- watch_output(watched: WatchedOutput)
Add a watched output to this job.
- Parameters:
watched – A description of the watched output
Base Launcher
Launcher
- class experimaestro.launchers.Launcher(connector: Connector, *, priority: float = 0)
Bases:
ABCBase class for task launchers.
Launchers are responsible for executing tasks on a compute resource. They work with a
Connectorto access the target system and manage process execution.Subclasses include:
DirectLauncher: Local executionSlurmLauncher: SLURM cluster
- Parameters:
connector – The connector to use for accessing the compute resource
priority – Priority for launcher selection in DynamicLauncher (higher = preferred)
- abstract launcher_info_code() str
Returns Python code to set up launcher info during task execution.
This code is inserted into the generated task script to set up launcher-specific information (like LauncherInformation for querying remaining time).
- Returns:
Python code as a string, or empty string if no setup needed.
- onSubmit(job: Job)
Called when submitting a job
Example of use: this allows the launcher to add token dependencies
- processbuilder() ProcessBuilder
Returns the process builder for this launcher
By default, returns the associated connector builder
- abstract scriptbuilder() ScriptBuilder
Returns a script builder
Click Integration
forwardoption
- class experimaestro.click.forwardoption(path=[])
Allows to access an argument of the configuration
This allows to refer to a path of a class in a “python” syntax, e.g. @forwardoption.ranker.optimizer.epsilon(MyConfig) or @forwardoption.ranker.optimizer.epsilon(MyConfig, “option-name”)
default can be changed by setting the option