Internal API Reference
This section provides API documentation for internal classes used in the scheduler and experiment systems. These are primarily useful for developers extending or debugging experimaestro.
Scheduler
Scheduler
- class experimaestro.scheduler.base.Scheduler(name: str = 'Global')
Bases:
StateProvider,ThreadA job scheduler (singleton) that provides live state
The scheduler is based on asyncio for easy concurrency handling. This is a singleton - only one scheduler instance exists per process.
Inherits from StateProvider to allow TUI/Web interfaces to access live job and experiment state during experiment execution.
- static instance() Scheduler
Get or create the global scheduler instance
- register_experiment(xp: Experiment)
Register an experiment with the scheduler
- unregister_experiment(xp: Experiment)
Mark experiment as finished but keep it registered for TUI visibility
Previously this removed the experiment from the scheduler, but this caused issues with the TUI where jobs would disappear when the experiment finished. Now we keep the experiment registered so its jobs remain visible in the TUI. The experiment’s status (DONE/FAILED/DETACHED) reflects that it’s finished.
Jobs
Job
- class experimaestro.scheduler.jobs.Job(config: Config, *, workspace: Workspace = None, launcher: Launcher = None, run_mode: RunMode = RunMode.NORMAL, max_retries: int | None = None, transient: TransientMode = TransientMode.NONE)
Bases:
BaseJob,ResourceA job is a resource that is produced by the execution of some code
- async finalize_status(callback: Callable[[BaseJob], None] | None = None, cleanup_events: bool = False) bool
Finalize job status: load from disk, apply callback, cleanup, and write.
This method: 1. Acquires the job lock 2. Loads state from disk (to get carbon info, etc.) 3. Calls the callback to modify job state (e.g., set state, increment retry_count) 4. Optionally cleans up event files (for non-done jobs being restarted) 5. Writes status if different from disk
- Parameters:
callback – Optional function to modify job state after loading from disk. Called with the job as argument.
cleanup_events – If True, cleanup event files (for job restart scenarios).
- Returns:
True if the status was written, False if unchanged.
- load_from_disk()
Load job state from disk, prioritizing marker files over status.json
This method is resilient - it returns False on any error instead of raising.
Priority for state: 1. Check .done/.failed marker files (most reliable, written atomically by job) 2. Fall back to status.json for state info 3. Check PID file for running state 4. Default to current state
Additional fields are loaded from status.json: - timestamps (submittime, starttime, endtime) - exit_code, retry_count - progress, carbon_metrics
- rotate_logs() None
Rotate log files before restarting a task.
Renames non-empty stdout and stderr files with a timestamp suffix (e.g., job.20231215143022.out) to preserve logs from previous runs.
- set_state(new_state: JobState, *, loading: bool = False)
Set job state and update timestamps
- Parameters:
new_state – The new job state
loading – If True, timestamps are not modified (loading from disk)
Timestamp rules (when loading=False): - WAITING: clears starttime and endtime - RUNNING: sets starttime - DONE/ERROR: sets endtime
JobDependency
- class experimaestro.scheduler.jobs.JobDependency(job, *, soft: bool = False)
Bases:
Dependency- async aio_lock(timeout: float = 0)
Acquire lock on job dependency by waiting for job to complete
- Parameters:
timeout – Must be 0 (wait indefinitely) for job dependencies
- Raises:
ValueError – If timeout is not 0
RuntimeError – If the job has not been submitted or if it failed
- async ensure_started()
Ensure the dependency job is started.
If the dependency is a transient job that was skipped (state is UNSCHEDULED), this will start it so it actually runs.
TransientMode
- class experimaestro.scheduler.jobs.TransientMode(value)
Mode for transient tasks
Transient tasks are intermediary tasks that may not need to persist after an experiment completes.
- NONE = 0
Task is scheduled normally and kept after completion (default)
- REMOVE = 2
Task is transient and its job folder is removed on experiment completion. Implies TRANSIENT behavior.
- TRANSIENT = 1
Task is only run if required by another non-transient task
Job Interfaces
BaseJob
- class experimaestro.scheduler.interfaces.BaseJob
Base interface for job information and metadata operations
This class defines the interface for job data and provides methods for reading/writing job metadata files. Both live Job instances and database-loaded MockJob instances should provide these attributes.
- identifier
Unique identifier for the job (hash)
- Type:
- task_id
Task class identifier (string)
- Type:
- path
Path to job directory
- Type:
- state
Current job state (JobState object or compatible)
- starttime
When job started running (datetime or None)
- Type:
datetime.datetime | None
- endtime
When job finished (datetime or None)
- Type:
datetime.datetime | None
- progress
List of progress updates
- exit_code
Process exit code (optional)
- Type:
int | None
- retry_count
Number of retries
- Type:
- async aio_process() Process | None
Returns the process if there is one
- apply_event(event: EventBase) None
Apply a job event to update this job’s state.
Handles CarbonMetricsEvent, JobStateChangedEvent, and JobProgressEvent.
- apply_events(events: Iterable[EventBase]) None
Apply multiple events in bulk without per-event notifications.
- property donefile: Path
Path to the .done file
- property failedfile: Path
Path to the .failed file
- async finalize_status(callback: Callable[[BaseJob], None] | None = None, cleanup_events: bool = False) bool
Finalize job status: load from disk, apply callback, cleanup, and write.
This method: 1. Acquires the job lock 2. Loads state from disk (to get carbon info, etc.) 3. Calls the callback to modify job state (e.g., set state, increment retry_count) 4. Optionally cleans up event files (for non-done jobs being restarted) 5. Writes status if different from disk
- Parameters:
callback – Optional function to modify job state after loading from disk. Called with the job as argument.
cleanup_events – If True, cleanup event files (for job restart scenarios).
- Returns:
True if the status was written, False if unchanged.
- property full_id: str
{identifier}
This is used as a unique key for jobs across the entire workspace, combining the task identifier and job hash.
- Type:
Full job identifier
- Type:
{task_id}
- load_from_disk()
Load job state from disk, prioritizing marker files over status.json
This method is resilient - it returns False on any error instead of raising.
Priority for state: 1. Check .done/.failed marker files (most reliable, written atomically by job) 2. Fall back to status.json for state info 3. Check PID file for running state 4. Default to current state
Additional fields are loaded from status.json: - timestamps (submittime, starttime, endtime) - exit_code, retry_count - progress, carbon_metrics
- property locator: str
{task_id}/{identifier}
- Type:
Full task locator (identifier)
- property lockpath: Path
Path to the .lock file for job locking
- static make_full_id(task_id: str, job_id: str) str
Create full job identifier from components.
This is a static helper for computing full_id when you don’t have a job instance yet (e.g., for cache lookups).
- Parameters:
task_id – Task identifier
job_id – Job identifier (hash)
- Returns:
{job_id}”
- Return type:
Full job identifier in format “{task_id}
- static parse_full_id(full_id: str) tuple[str, str]
Parse full job identifier into components.
- Parameters:
full_id – Full job identifier in format “{task_id}:{job_id}”
- Returns:
Tuple of (task_id, job_id)
- property pidfile: Path
Path to the .pid file
- property scheduler_state: JobState
Scheduler lifecycle state.
For offline jobs (BaseJob, MockJob), this defaults to the execution state. For live jobs (Job), this is an independent state tracking scheduler lifecycle (UNSCHEDULED → WAITING → READY → SCHEDULED → RUNNING → DONE → ERROR).
- property scriptname: str
The script name derived from task_id
- set_state(new_state: JobState, *, loading: bool = False)
Set job state and update timestamps
- Parameters:
new_state – The new job state
loading – If True, timestamps are not modified (loading from disk)
Timestamp rules (when loading=False): - WAITING: clears starttime and endtime - RUNNING: sets starttime - DONE/ERROR: sets endtime
- property state: JobState
Access to execution state (ground truth from events/disk)
- state_changed_event() JobStateChangedEvent
Create a JobStateChangedEvent from the current job state.
This centralizes event creation logic by using state_dict() to avoid duplication across the codebase (e.g., in StateListener, finalize_status).
- Returns:
JobStateChangedEvent with current job state and metadata
- state_dict() Dict[str, Any]
Get job state as dictionary (single source of truth)
This is the canonical representation of job state used for both serialization to status files and network communication.
- Returns:
Dictionary with all job state fields
- property status_path: Path
Path to the job status file
- write_status() None
Write job state to status.json file.
This is a sync method for writing the canonical state_dict() to disk. Call this while holding the job lock to ensure atomic status updates.
- property xpm_dir: Path
Path to the .experimaestro directory within job path
JobState
- class experimaestro.scheduler.interfaces.JobState
Base class for job states
Job states are represented as instances of JobState subclasses. Singleton instances are available as class attributes (e.g., JobState.DONE) for backward compatibility.
Subclasses override notstarted(), running(), finished() to define behavior.
- finished()
Returns True if the job has finished (success or error)
- notstarted()
Returns True if the job hasn’t started yet
- running()
Returns True if the job is currently running
JobFailureStatus
- class experimaestro.scheduler.interfaces.JobFailureStatus(value)
Reasons for job failure
- CANCELLED = 4
Job was cancelled (e.g., via scancel or SIGTERM)
- DEPENDENCY = 0
Job dependency failed
- FAILED = 1
Job failed
- MEMORY = 2
Memory
- REJECTED_OTHER = 6
Job rejected for other reasons (e.g., invalid partition, resource constraints)
- REJECTED_TIMELIMIT = 5
Job rejected due to time limit (e.g., requested time exceeds partition max)
- TIMEOUT = 3
Timeout (can retry for resumable tasks)
Experiment
experiment (BaseExperiment)
- class experimaestro.scheduler.experiment.experiment(env: Path | str | WorkspaceSettings, name: str, *, host: str | None = None, port: int | None = None, token: str | None = None, run_mode: RunMode | None = None, launcher=None, register_signals: bool = True, project_paths: list[Path] | None = None, wait_for_quit: bool = False, dirty_git: DirtyGitAction = DirtyGitAction.WARN, no_db: bool = False, no_environmental_impact: bool = False)
Bases:
BaseExperimentContext manager for running experiments.
Creates a workspace, manages task submission, and optionally starts a web server for monitoring.
Implements BaseExperiment interface for use with StateProvider and TUI.
Example:
from experimaestro import experiment with experiment("./workdir", "my-experiment", port=12345) as xp: task = MyTask.C(param=42).submit() result = task.wait()
- add_job(job: Job)
Register a job and its tags to jobs.jsonl file and database
Note: For NEW jobs, the unfinishedJobs counter is updated by job.set_scheduler_state() when the state transitions from UNSCHEDULED. For jobs already running, we increment here since no state transition will occur.
- add_service(service: ServiceClass) ServiceClass
Adds a service (e.g. tensorboard viewer) to the experiment
- Parameters:
service – A service instance
- Returns:
The same service instance (or existing service if already added)
- load(reference: str, name: str = 'default', run_id: str | None = None)
Loads configuration objects from an experimental directory.
- Parameters:
reference – The name of the experiment
name – The name of the saving directory (default to default)
run_id – The run ID to load from (default: latest run)
- save(obj: Any, name: str = 'default')
Serializes configurations.
Saves configuration objects within the experimental directory
- Parameters:
obj – The object to save
name – The name of the saving directory (default to default)
- stop()
Stop the experiment as soon as possible
- wait()
Wait until the running processes have finished
- watch_output(watched: WatchedOutput)
Watch an output
- Parameters:
watched – The watched output specification
BaseExperiment
- class experimaestro.scheduler.interfaces.BaseExperiment
Base interface for experiment information
This class defines the interface for experiment data. Both live experiment instances and MockExperiment instances should provide these attributes.
- Core attributes:
workdir: Path to run directory (experiments/{exp-id}/{run-id}/) run_id: Run identifier
- State tracking (replaces StatusData):
jobs: Dict mapping job_id to BaseJob services: Dict mapping service_id to BaseService tags: Dict mapping job_id to tag dict dependencies: Dict mapping job_id to list of dependency job_ids events_count: Number of events processed hostname: Hostname where experiment runs started_at: Start datetime ended_at: End datetime (None if running)
- apply_event(event: EventBase, merge_mode: bool = False) None
Apply an event to update experiment state.
This is a stub method - concrete implementations (MockExperiment) should override this to handle specific event types.
- Parameters:
event – Event to apply
merge_mode – If True, don’t overwrite fields that are already set (except for timestamps which should use the latest value)
- apply_events(events: Iterable[EventBase]) None
Apply multiple events in bulk without per-event notifications.
- property carbon_impact: CarbonImpactData | None
Carbon impact metrics for this experiment (sum and latest aggregations)
- async finalize_status(callback: Callable[[BaseExperiment], None] | None = None, cleanup_events: bool = False) bool
Finalize experiment status: load from disk, apply callback, cleanup, and write.
This method: 1. Acquires the experiment lock 2. Loads state from disk (to get latest event-based updates) 3. Calls the callback to modify experiment state (e.g., set status, mark completed) 4. Optionally cleans up event files (consolidates orphaned events) 5. Writes status if different from disk
- Parameters:
callback – Optional function to modify experiment state after loading from disk. Called with the experiment as argument.
cleanup_events – If True, cleanup event files (consolidates orphaned events).
- Returns:
True if the status was written, False if unchanged.
- get_job_state(job_id: str) JobState | None
Get experiment/scheduler state for a job.
Returns the scheduler lifecycle state for the given job, which may differ from the job’s execution state (job.state).
- Parameters:
job_id – The job identifier
- Returns:
JobState if tracked, None otherwise
- state_dict() Dict[str, Any]
Get experiment state as dictionary (single source of truth)
This is the canonical representation of experiment state used for both serialization to status files and network communication.
Note: Jobs are not included here - they are stored in jobs.jsonl.
- property status: ExperimentStatus
Experiment status - override in subclasses
StateListener
- class experimaestro.scheduler.experiment.StateListener(event_writer: ExperimentEventWriter, xp: experiment, experiment_id: str, run_id: str)
Listener that writes events to filesystem
Job state events are written to per-job event files by the scheduler. This listener writes experiment-level events (job state, services) to the experiment event file.
- on_job_state_changed(job)
Job state changes are tracked via ExperimentJobStateEvent, not here.
JobStateChangedEvent is only written to job event files by the job process itself. Experiment event files only contain ExperimentJobStateEvent for scheduler lifecycle tracking.
- on_job_submitted(job)
Called when job is submitted
- on_service_added(service)
Write service added event to filesystem
- service_state_changed(service)
Called when service state changes (runtime only, not persisted)
Workspace
Workspace
- class experimaestro.scheduler.workspace.Workspace(settings: Settings, workspace_settings: WorkspaceSettings, launcher=None, run_mode: RunMode | None = None)
Workspace environment for experiments
This is a simple container for workspace settings, environment, and configuration. Multiple Workspace instances can exist for the same path - the singleton pattern is handled by WorkspaceStateProvider which manages the database per workspace path.
- cleanup_old_scheduler_runs(retention_days: int = 15, force: bool = False) tuple[int, int]
Clean up old scheduler run directories
- Parameters:
retention_days – Delete run directories older than this many days
force – Force cleanup even if recently run
- Returns:
Tuple of (directories_deleted, errors_count)
- property configcachepath
Folder for jobs
- property connector
Returns the default connector
- property experimentspath
Folder for experiments
- property jobspath
Folder for jobs
- property partialspath
Folder for partial job directories (shared checkpoints, etc.)
RunMode
Dynamic Outputs
TaskOutputsWorker
- class experimaestro.scheduler.dynamic_outputs.TaskOutputsWorker(xp: Experiment)
Worker thread that processes all task output callbacks for one experiment
Created by: experiment.__enter__()
Main responsibilities: 1. Manage TaskOutputs monitors for each job with watched outputs 2. Process RawEventItems and CallbackItems from the queue 3. Track task_output_count for experiment exit synchronization
Thread model: - File watchers run on watchdog thread, queue items to this worker - This worker processes items sequentially from the queue - Count updates are synchronized with the scheduler’s event loop
- add(callback: Callable, event)
Add a callback event to the processing queue
Called by: TaskOutputWatcher.execute_event() or add_callback()
- Parameters:
callback – The callback to call with the event
event – The processed Config object
- async aio_process_job_outputs(job: Job)
Process any remaining task outputs for a completed job.
Called by: Job.aio_done_handler() (scheduler event loop)
This ensures all task outputs written by the job are queued for processing before the experiment considers exiting. File system watchers may have latency, so we explicitly read the file here.
Note: This only queues the events. The actual callbacks will complete asynchronously and decrement task_output_count when done.
- Parameters:
job – The job that has finished
- process_job_outputs(job) None
Explicitly process any remaining task outputs for a completed job.
This is called when a job finishes to ensure all task outputs written by the job are processed before the experiment considers exiting. This is necessary because file system watchers may have latency.
- Parameters:
job – The job that has finished
- run()
Main worker loop - processes items from queue until shutdown
Called by: threading.Thread.start()
Processes: - RawEventItem: calls execute_event() to convert and dispatch - CallbackItem: calls the user callback and updates count - None: shutdown signal
- shutdown()
Stop the worker and all monitors
- update_count(delta: int)
Update task_output_count safely from any thread.
Handles the case where we might be on the event loop thread (which would deadlock if we used run_coroutine_threadsafe().result()).
- watch_output(watched: WatchedOutput)
Register a watched output
- Parameters:
watched – The watched output specification
WatchedOutput
Events
EventBase
- class experimaestro.scheduler.state_status.EventBase(timestamp: float = <factory>)
Base class for all events
Events are lightweight - they carry only IDs, not object references. Use StateProvider to fetch actual objects (BaseJob, BaseExperiment, etc.) when needed.
Subclasses are automatically registered in EVENT_TYPES by their class name. JSON serialization/deserialization is handled transparently via event_type field.
JobSubmittedEvent
- class experimaestro.scheduler.state_status.JobSubmittedEvent(timestamp: float = <factory>, experiment_id: str = '', job_id: str = '', task_id: str = '', run_id: str = '', tags: list[~experimaestro.scheduler.state_status.JobTag] = <factory>, depends_on: list[str] = <factory>, submitted_time: str | None = None)
Bases:
JobEventBase,ExperimentEventBaseEvent: Job was submitted to the scheduler
Fired when a job is added to an experiment run. This is both a job event and an experiment event.
JobStateChangedEvent
- class experimaestro.scheduler.state_status.JobStateChangedEvent(timestamp: float = <factory>, job_id: str = '', task_id: str = '', state: str = '', failure_reason: str | None = None, started_time: str | None = None, ended_time: str | None = None, exit_code: int | None = None, retry_count: int = 0, progress: list[~experimaestro.scheduler.state_status.ProgressLevel] = <factory>)
Bases:
JobEventBaseEvent: Job execution state changed (from job process events / disk)
Fired when a job’s execution state changes (scheduled, running, done, error, etc.) Timestamps are stored as ISO format strings for JSON serialization. Only written to job event files (.events/jobs/), never to experiment event files.
JobProgressEvent
- class experimaestro.scheduler.state_status.JobProgressEvent(timestamp: float = <factory>, job_id: str = '', task_id: str = '', level: int = 0, progress: float = 0.0, desc: str | None = None)
Bases:
JobEventBaseEvent: Job progress update
Written by the running job process to report progress.
RunCompletedEvent
- class experimaestro.scheduler.state_status.RunCompletedEvent(timestamp: float = <factory>, experiment_id: str = '', run_id: str = '', status: str = 'completed', ended_at: str = '')
Bases:
ExperimentEventBaseEvent: Experiment run completed
EventWriter
- class experimaestro.scheduler.state_status.EventWriter(initial_count: int = 0, permanent_dir: Path | None = None)
Base class for writing events to JSONL files
Events are written to {events_dir}/events-{count}.jsonl Uses line buffering so each event is flushed immediately after write.
Supports automatic file rotation when files exceed MAX_EVENT_FILE_SIZE. When rotating, the status file is updated with the new events_count.
Supports proactive hardlinking: when a permanent_dir is set and hardlinks are supported, a hardlink is created to permanent storage immediately when the event file is opened. This ensures events are written to both locations simultaneously and no data is lost if the process crashes.
- archive_events() None
Archive events to permanent storage (called on completion)
For each temp file: - If permanent file exists (hardlink already created): just delete temp - If permanent file doesn’t exist: move temp to permanent
EventReader
- class experimaestro.scheduler.state_status.EventReader(*directories: WatchedDirectory, max_open_files: int = 128)
Generic reader for events from JSONL files
Watches multiple directories with configurable entity ID extraction.
Supports: - One-shot reading: read_events_since_count() - Incremental reading: read_new_events() - tracks file positions - File watching: start_watching(), stop_watching() - uses watchdog - Entity tracking: on_created callback determines which entities to follow
- ensure_file_polled(path: Path) None
Ensure a file is being polled by the file watcher.
Adds the file to the watcher’s polling list if not already tracked. This is needed on shared/network filesystems where watchdog doesn’t detect remote file creation (e.g., NFS, GPFS, Lustre).
- Parameters:
path – Path to the event file to ensure is polled
- follow(entity_id: str, dir_config: WatchedDirectory) list[EventBase]
Start following an entity (e.g., when scheduler submits a new job).
Returns all existing events for bulk consolidation by the caller, then starts tailing the latest event file for new events.
If the entity is already being followed, returns immediately.
Note: This bypasses the on_created callback - use this for explicit registration (e.g., scheduler submitting a job) rather than discovery.
- Parameters:
entity_id – Entity identifier to follow
dir_config – Directory configuration for this entity
- Returns:
List of existing events for the caller to consolidate
- get_all_event_files() list[Path]
Get all event files across all directories, sorted by modification time
- read_events_since_count(entity_id: str, start_count: int, base_dir: Path | None = None) list[EventBase]
Read events for an entity starting from a specific file count
Tries flat format first ({entity_id}-{count}.jsonl), then falls back to old subdir format ({entity_id}/events-{count}.jsonl).
- Parameters:
entity_id – Entity identifier (job_id or experiment_id)
start_count – File count to start reading from
base_dir – Optional base directory to search in (defaults to first directory)
- Returns:
List of events from files starting at start_count
- read_new_events() list[tuple[str, EventBase]]
Read new events since last call (incremental reading)
- Returns:
List of (entity_id, event) tuples
- scan_existing_files() None
Scan for existing event files and set initial positions to end of file
Call this before start_watching() to skip existing events and only receive new ones.
- start_watching() None
Start watching for file changes using FileWatcher.
Discovers existing entities, calls on_created for each, and replays all historical events for entities that should be followed.
For each entity, only the latest event file is tracked (tailed). Earlier files are processed during catch-up but not watched.
State Provider
StateProvider
- class experimaestro.scheduler.state_provider.StateProvider
Abstract base class for state providers
Defines the interface that all state providers must implement. This enables both local (WorkspaceStateProvider), remote (SSHStateProviderClient), and live (Scheduler) providers to be used interchangeably.
Concrete implementations:
Scheduler: Live in-memory state from running experiments
OfflineStateProvider: Base for cached/persistent state (WorkspaceStateProvider, SSHStateProviderClient)
State listener management is provided by the base class with default implementations.
- add_listener(listener: Callable[[EventBase], None]) None
Register a listener for state change events
- Parameters:
listener – Callback function that receives StateEvent objects
- delete_experiment(experiment_id: str, delete_jobs: bool = False, perform: bool = True) Tuple[bool, str]
Delete an experiment and optionally its job data
- Parameters:
experiment_id – Experiment identifier to delete
delete_jobs – If True, also delete job directories (default: False)
perform – If True, actually perform deletion; if False, just check
- Returns:
Tuple of (success, message)
- delete_job_safely(job: BaseJob, perform: bool = True) Tuple[bool, str]
Safely delete a job and its data
Only deletes jobs that are finished (not running). Uses clean_job for the actual deletion.
- Parameters:
job – The job to delete
perform – If True, actually perform deletion; if False, just check
- Returns:
Tuple of (success, message)
- execute_warning_action(warning_key: str, action_key: str, experiment_id: str = '', run_id: str = '') None
Execute a warning action and emit error event if it fails
- Parameters:
warning_key – The warning identifier
action_key – The action to execute
experiment_id – Experiment ID for error events
run_id – Run ID for error events
- Raises:
KeyError – If warning_key or action_key not found
- abstract get_all_jobs(state: str | None = None, tags: Dict[str, str] | None = None, since: datetime | None = None) List[BaseJob]
Get all jobs across all experiments
- abstract get_dependencies_map(experiment_id: str, run_id: str | None = None) Dict[str, List[str]]
Get dependencies map for jobs in an experiment/run
Dependencies are stored per (job_id, experiment_id, run_id) in JobDependenciesModel. This method returns a map from job_id to list of job_ids it depends on.
- Parameters:
experiment_id – Experiment identifier
run_id – Run identifier (None = current run)
- Returns:
Dictionary mapping job identifiers to list of job IDs they depend on
- get_display_path(job: BaseJob) str
Get the path to display/copy for a job
For local providers, returns the local path. For remote providers, returns the remote path.
- Parameters:
job – Job to get display path for
- Returns:
Path string suitable for display and copying
- abstract get_experiment(experiment_id: str) BaseExperiment | None
Get a specific experiment by ID
- abstract get_experiment_job_info(experiment_id: str, run_id: str | None = None) Dict[str, ExperimentJobInformation]
Get experiment-level job info (submittime, transient) for jobs
- Returns:
Dictionary mapping job_id to ExperimentJobInformation
- abstract get_experiment_runs(experiment_id: str) List[BaseExperiment]
Get all runs for an experiment
- Returns:
List of BaseExperiment instances (MockExperiment for past runs, or live experiment for the current run in Scheduler)
- abstract get_experiments(since: datetime | None = None) List[BaseExperiment]
Get list of all experiments
- abstract get_job(task_id: str, job_id: str) BaseJob | None
Get a job directly by task_id and job_id
Jobs are stored independently in workspace/jobs/task_id/job_id/, so they can be retrieved without knowing which experiment they belong to.
- Parameters:
task_id – The task identifier
job_id – The job identifier (hash)
- Returns:
The job if found, None otherwise
- abstract get_jobs(experiment_id: str | None = None, run_id: str | None = None, task_id: str | None = None, state: str | None = None, tags: Dict[str, str] | None = None, since: datetime | None = None) List[BaseJob]
Query jobs with optional filters
- get_process_info(job: BaseJob) ProcessInfo | None
Get process information for a job
Returns a ProcessInfo dataclass or None if not available.
- static get_resolved_state(job: BaseJob, experiment: BaseExperiment | None) tuple[JobState, JobState | None]
Resolve display state from experiment and execution states.
Returns (resolved_state, scheduler_state_or_none). - When there’s no conflict: (resolved_state, None) - When there’s a conflict: (exec_state, exp_state) — caller shows both icons
Uses JobState.resolve() for dispatch to subclass-specific logic.
- abstract get_services(experiment_id: str | None = None, run_id: str | None = None) List[BaseService]
Get services for an experiment
- get_stray_jobs() List[BaseJob]
Get stray jobs (running jobs not associated with any active experiment)
Stray jobs are a subset of orphan jobs - they are orphan jobs that are currently running or scheduled. These represent jobs where the experimental plan changed but the job process is still running.
- Returns:
List of running/scheduled jobs not in any active experiment
- abstract get_tags_map(experiment_id: str, run_id: str | None = None) Dict[str, Dict[str, str]]
Get tags map for jobs in an experiment/run
Tags are stored per (job_id, experiment_id, run_id) in JobTagModel. This method returns a map from job_id to {tag_key: tag_value}.
- Parameters:
experiment_id – Experiment identifier
run_id – Run identifier (None = current run)
- Returns:
Dictionary mapping job identifiers to their tags dict
- get_unresolved_warnings() List[WarningEvent]
Get all unresolved warnings
- Returns:
List of WarningEvent objects with metadata for all pending warnings
- property is_remote: bool
Whether this is a remote provider (e.g., SSH)
Remote providers use periodic refresh instead of push notifications and support sync_path for on-demand file synchronization.
- load_configs(experiment_id: str, run_id: str | None = None) dict[str, Any]
Load all job configs from a past experiment run.
Deprecated since version Use:
load_xp_info()instead.
- load_xp_info(experiment_id: str, run_id: str | None = None) ExperimentInfo
Load all serialized objects from a past experiment run.
Returns an ExperimentInfo with .jobs and .actions dictionaries.
- Parameters:
experiment_id – Experiment identifier
run_id – Run identifier (None = current/latest run)
- Returns:
ExperimentInfo with jobs and actions
- Raises:
FileNotFoundError – If objects.jsonl/configs.json doesn’t exist
NotImplementedError – If the provider doesn’t support this
- register_warning_actions(warning_key: str, actions: Dict[str, Callable[[], None]], warning_event: WarningEvent | None = None) None
Register action callbacks for a warning
- Parameters:
warning_key – Unique identifier for the warning
actions – Dict mapping action_key to callback function
warning_event – Optional WarningEvent with full metadata for display
- remove_listener(listener: Callable[[EventBase], None]) None
Unregister a listener
- Parameters:
listener – Previously registered callback function
- service_state_changed(service) None
Called when a service’s state changes - emit event to listeners
StateProvider registers itself as a listener on services it returns, so this method is called when those services’ states change.
OfflineStateProvider
- class experimaestro.scheduler.state_provider.OfflineStateProvider
Bases:
StateProviderState provider for offline/cached state access
Provides state listener management, job/experiment/service caching shared by WorkspaceStateProvider and SSHStateProviderClient.
This is an intermediate class between StateProvider (the ABC) and concrete implementations that need state listener support and caching.
Caching strategy: - Jobs and experiments are cached by their identifiers - Events update cached objects in-place to maintain consistency - get_jobs/get_experiments return cached objects when available
- apply_event(event: EventBase) None
Apply an event to cached jobs and experiments
This method is called when events are received (from event files or via notifications). It updates the cached objects in-place to maintain state consistency.
Handles: - JobStateChangedEvent: Updates job state in cache - JobProgressEvent: Updates job progress in cache - CarbonMetricsEvent: Updates job carbon metrics in cache - JobSubmittedEvent: Adds new job to cache - ExperimentUpdatedEvent: Invalidates experiment cache
Subclasses may override this for additional logic.
- get_services(experiment_id: str | None = None, run_id: str | None = None) List[BaseService]
Get services for an experiment
Uses caching to preserve service instances (and their URLs) across calls. Subclasses can override _get_live_services() for live service support and must implement _fetch_services_from_storage() for persistent storage.
If experiment_id is None, returns services from all experiments.
CarbonMetricsData
- class experimaestro.scheduler.state_provider.CarbonMetricsData(co2_kg: float = 0.0, energy_kwh: float = 0.0, cpu_power_w: float = 0.0, gpu_power_w: float = 0.0, ram_power_w: float = 0.0, duration_s: float = 0.0, region: str = '', is_final: bool = False, written: bool = False)
Carbon metrics data for a job.
Listeners
Listener
- class experimaestro.scheduler.base.Listener
- job_state(job)
Deprecated: use on_job_state_changed instead
- job_submitted(job)
Deprecated: use on_job_submitted instead
- on_job_state_changed(job)
Called when job state changes (new preferred method)
- on_job_submitted(job)
Called when job is submitted (new preferred method)
Exceptions
DirtyGitError
- class experimaestro.DirtyGitError
Bases:
HandledExceptionRaised when the git repository has uncommitted changes and dirty_git=error
FailedExperiment
- class experimaestro.scheduler.experiment.FailedExperiment
Bases:
HandledExceptionRaised when an experiment failed
GracefulExperimentExit
- class experimaestro.scheduler.experiment.GracefulExperimentExit
Bases:
ExceptionRaised to exit an experiment context without waiting for running jobs.
This is useful in tests or when you want to detach from an experiment while keeping jobs running (e.g., to test stray job detection).
Example:
with experiment(workdir, "my-experiment") as xp: task = MyTask.C(value=1).submit() # Wait for task to start... raise GracefulExperimentExit() # Exit without waiting for task to finish