Internal API Reference

This section provides API documentation for internal classes used in the scheduler and experiment systems. These are primarily useful for developers extending or debugging experimaestro.

Scheduler

Scheduler

class experimaestro.scheduler.base.Scheduler(name: str = 'Global')

Bases: StateProvider, Thread

A job scheduler (singleton) that provides live state

The scheduler is based on asyncio for easy concurrency handling. This is a singleton - only one scheduler instance exists per process.

Inherits from StateProvider to allow TUI/Web interfaces to access live job and experiment state during experiment execution.

static instance() Scheduler

Get or create the global scheduler instance

register_experiment(xp: Experiment)

Register an experiment with the scheduler

unregister_experiment(xp: Experiment)

Mark experiment as finished but keep it registered for TUI visibility

Previously this removed the experiment from the scheduler, but this caused issues with the TUI where jobs would disappear when the experiment finished. Now we keep the experiment registered so its jobs remain visible in the TUI. The experiment’s status (DONE/FAILED/DETACHED) reflects that it’s finished.

Jobs

Job

class experimaestro.scheduler.jobs.Job(config: Config, *, workspace: Workspace = None, launcher: Launcher = None, run_mode: RunMode = RunMode.NORMAL, max_retries: int | None = None, transient: TransientMode = TransientMode.NONE)

Bases: BaseJob, Resource

A job is a resource that is produced by the execution of some code

async finalize_status(callback: Callable[[BaseJob], None] | None = None, cleanup_events: bool = False) bool

Finalize job status: load from disk, apply callback, cleanup, and write.

This method: 1. Acquires the job lock 2. Loads state from disk (to get carbon info, etc.) 3. Calls the callback to modify job state (e.g., set state, increment retry_count) 4. Optionally cleans up event files (for non-done jobs being restarted) 5. Writes status if different from disk

Parameters:
  • callback – Optional function to modify job state after loading from disk. Called with the job as argument.

  • cleanup_events – If True, cleanup event files (for job restart scenarios).

Returns:

True if the status was written, False if unchanged.

load_from_disk()

Load job state from disk, prioritizing marker files over status.json

This method is resilient - it returns False on any error instead of raising.

Priority for state: 1. Check .done/.failed marker files (most reliable, written atomically by job) 2. Fall back to status.json for state info 3. Check PID file for running state 4. Default to current state

Additional fields are loaded from status.json: - timestamps (submittime, starttime, endtime) - exit_code, retry_count - progress, carbon_metrics

rotate_logs() None

Rotate log files before restarting a task.

Renames non-empty stdout and stderr files with a timestamp suffix (e.g., job.20231215143022.out) to preserve logs from previous runs.

set_state(new_state: JobState, *, loading: bool = False)

Set job state and update timestamps

Parameters:
  • new_state – The new job state

  • loading – If True, timestamps are not modified (loading from disk)

Timestamp rules (when loading=False): - WAITING: clears starttime and endtime - RUNNING: sets starttime - DONE/ERROR: sets endtime

JobDependency

class experimaestro.scheduler.jobs.JobDependency(job, *, soft: bool = False)

Bases: Dependency

async aio_lock(timeout: float = 0)

Acquire lock on job dependency by waiting for job to complete

Parameters:

timeout – Must be 0 (wait indefinitely) for job dependencies

Raises:
async ensure_started()

Ensure the dependency job is started.

If the dependency is a transient job that was skipped (state is UNSCHEDULED), this will start it so it actually runs.

TransientMode

class experimaestro.scheduler.jobs.TransientMode(value)

Mode for transient tasks

Transient tasks are intermediary tasks that may not need to persist after an experiment completes.

NONE = 0

Task is scheduled normally and kept after completion (default)

REMOVE = 2

Task is transient and its job folder is removed on experiment completion. Implies TRANSIENT behavior.

TRANSIENT = 1

Task is only run if required by another non-transient task

property is_transient: bool

Returns True if this mode implies transient behavior

property should_remove: bool

Returns True if the job folder should be removed after completion

Job Interfaces

BaseJob

class experimaestro.scheduler.interfaces.BaseJob

Base interface for job information and metadata operations

This class defines the interface for job data and provides methods for reading/writing job metadata files. Both live Job instances and database-loaded MockJob instances should provide these attributes.

identifier

Unique identifier for the job (hash)

Type:

str

task_id

Task class identifier (string)

Type:

str

path

Path to job directory

Type:

pathlib.Path

state

Current job state (JobState object or compatible)

starttime

When job started running (datetime or None)

Type:

datetime.datetime | None

endtime

When job finished (datetime or None)

Type:

datetime.datetime | None

progress

List of progress updates

exit_code

Process exit code (optional)

Type:

int | None

retry_count

Number of retries

Type:

int

async aio_process() Process | None

Returns the process if there is one

apply_event(event: EventBase) None

Apply a job event to update this job’s state.

Handles CarbonMetricsEvent, JobStateChangedEvent, and JobProgressEvent.

apply_events(events: Iterable[EventBase]) None

Apply multiple events in bulk without per-event notifications.

property donefile: Path

Path to the .done file

property failedfile: Path

Path to the .failed file

async finalize_status(callback: Callable[[BaseJob], None] | None = None, cleanup_events: bool = False) bool

Finalize job status: load from disk, apply callback, cleanup, and write.

This method: 1. Acquires the job lock 2. Loads state from disk (to get carbon info, etc.) 3. Calls the callback to modify job state (e.g., set state, increment retry_count) 4. Optionally cleans up event files (for non-done jobs being restarted) 5. Writes status if different from disk

Parameters:
  • callback – Optional function to modify job state after loading from disk. Called with the job as argument.

  • cleanup_events – If True, cleanup event files (for job restart scenarios).

Returns:

True if the status was written, False if unchanged.

property full_id: str

{identifier}

This is used as a unique key for jobs across the entire workspace, combining the task identifier and job hash.

Type:

Full job identifier

Type:

{task_id}

static get_donefile(job_path: Path, scriptname: str) Path

Get done marker file path

static get_failedfile(job_path: Path, scriptname: str) Path

Get failed marker file path

static get_pidfile(job_path: Path, scriptname: str) Path

Get PID file path

static get_scriptname(task_id: str) str

Extract script name from task_id (last component after ‘.’)

static get_status_path(job_path: Path) Path

Get status file path for a job path

static get_xpm_dir(job_path: Path) Path

Get .experimaestro directory path for a job path

load_from_disk()

Load job state from disk, prioritizing marker files over status.json

This method is resilient - it returns False on any error instead of raising.

Priority for state: 1. Check .done/.failed marker files (most reliable, written atomically by job) 2. Fall back to status.json for state info 3. Check PID file for running state 4. Default to current state

Additional fields are loaded from status.json: - timestamps (submittime, starttime, endtime) - exit_code, retry_count - progress, carbon_metrics

property locator: str

{task_id}/{identifier}

Type:

Full task locator (identifier)

property lockpath: Path

Path to the .lock file for job locking

static make_full_id(task_id: str, job_id: str) str

Create full job identifier from components.

This is a static helper for computing full_id when you don’t have a job instance yet (e.g., for cache lookups).

Parameters:
  • task_id – Task identifier

  • job_id – Job identifier (hash)

Returns:

{job_id}”

Return type:

Full job identifier in format “{task_id}

static parse_full_id(full_id: str) tuple[str, str]

Parse full job identifier into components.

Parameters:

full_id – Full job identifier in format “{task_id}:{job_id}”

Returns:

Tuple of (task_id, job_id)

property pidfile: Path

Path to the .pid file

process_state_dict() dict | None

Get process state as dictionary. Override in subclasses.

property scheduler_state: JobState

Scheduler lifecycle state.

For offline jobs (BaseJob, MockJob), this defaults to the execution state. For live jobs (Job), this is an independent state tracking scheduler lifecycle (UNSCHEDULED → WAITING → READY → SCHEDULED → RUNNING → DONE → ERROR).

property scriptname: str

The script name derived from task_id

set_state(new_state: JobState, *, loading: bool = False)

Set job state and update timestamps

Parameters:
  • new_state – The new job state

  • loading – If True, timestamps are not modified (loading from disk)

Timestamp rules (when loading=False): - WAITING: clears starttime and endtime - RUNNING: sets starttime - DONE/ERROR: sets endtime

property state: JobState

Access to execution state (ground truth from events/disk)

state_changed_event() JobStateChangedEvent

Create a JobStateChangedEvent from the current job state.

This centralizes event creation logic by using state_dict() to avoid duplication across the codebase (e.g., in StateListener, finalize_status).

Returns:

JobStateChangedEvent with current job state and metadata

state_dict() Dict[str, Any]

Get job state as dictionary (single source of truth)

This is the canonical representation of job state used for both serialization to status files and network communication.

Returns:

Dictionary with all job state fields

property status_path: Path

Path to the job status file

write_status() None

Write job state to status.json file.

This is a sync method for writing the canonical state_dict() to disk. Call this while holding the job lock to ensure atomic status updates.

property xpm_dir: Path

Path to the .experimaestro directory within job path

JobState

class experimaestro.scheduler.interfaces.JobState

Base class for job states

Job states are represented as instances of JobState subclasses. Singleton instances are available as class attributes (e.g., JobState.DONE) for backward compatibility.

Subclasses override notstarted(), running(), finished() to define behavior.

finished()

Returns True if the job has finished (success or error)

notstarted()

Returns True if the job hasn’t started yet

running()

Returns True if the job is currently running

JobFailureStatus

class experimaestro.scheduler.interfaces.JobFailureStatus(value)

Reasons for job failure

CANCELLED = 4

Job was cancelled (e.g., via scancel or SIGTERM)

DEPENDENCY = 0

Job dependency failed

FAILED = 1

Job failed

MEMORY = 2

Memory

REJECTED_OTHER = 6

Job rejected for other reasons (e.g., invalid partition, resource constraints)

REJECTED_TIMELIMIT = 5

Job rejected due to time limit (e.g., requested time exceeds partition max)

TIMEOUT = 3

Timeout (can retry for resumable tasks)

Experiment

experiment (BaseExperiment)

class experimaestro.scheduler.experiment.experiment(env: Path | str | WorkspaceSettings, name: str, *, host: str | None = None, port: int | None = None, token: str | None = None, run_mode: RunMode | None = None, launcher=None, register_signals: bool = True, project_paths: list[Path] | None = None, wait_for_quit: bool = False, dirty_git: DirtyGitAction = DirtyGitAction.WARN, no_db: bool = False, no_environmental_impact: bool = False)

Bases: BaseExperiment

Context manager for running experiments.

Creates a workspace, manages task submission, and optionally starts a web server for monitoring.

Implements BaseExperiment interface for use with StateProvider and TUI.

Example:

from experimaestro import experiment

with experiment("./workdir", "my-experiment", port=12345) as xp:
    task = MyTask.C(param=42).submit()
    result = task.wait()
add_job(job: Job)

Register a job and its tags to jobs.jsonl file and database

Note: For NEW jobs, the unfinishedJobs counter is updated by job.set_scheduler_state() when the state transitions from UNSCHEDULED. For jobs already running, we increment here since no state transition will occur.

add_service(service: ServiceClass) ServiceClass

Adds a service (e.g. tensorboard viewer) to the experiment

Parameters:

service – A service instance

Returns:

The same service instance (or existing service if already added)

load(reference: str, name: str = 'default', run_id: str | None = None)

Loads configuration objects from an experimental directory.

Parameters:
  • reference – The name of the experiment

  • name – The name of the saving directory (default to default)

  • run_id – The run ID to load from (default: latest run)

prepare(job: Job)

Generate the file

save(obj: Any, name: str = 'default')

Serializes configurations.

Saves configuration objects within the experimental directory

Parameters:
  • obj – The object to save

  • name – The name of the saving directory (default to default)

stop()

Stop the experiment as soon as possible

wait()

Wait until the running processes have finished

watch_output(watched: WatchedOutput)

Watch an output

Parameters:

watched – The watched output specification

BaseExperiment

class experimaestro.scheduler.interfaces.BaseExperiment

Base interface for experiment information

This class defines the interface for experiment data. Both live experiment instances and MockExperiment instances should provide these attributes.

Core attributes:

workdir: Path to run directory (experiments/{exp-id}/{run-id}/) run_id: Run identifier

State tracking (replaces StatusData):

jobs: Dict mapping job_id to BaseJob services: Dict mapping service_id to BaseService tags: Dict mapping job_id to tag dict dependencies: Dict mapping job_id to list of dependency job_ids events_count: Number of events processed hostname: Hostname where experiment runs started_at: Start datetime ended_at: End datetime (None if running)

property actions: Dict[str, BaseAction]

Actions in this experiment

apply_event(event: EventBase, merge_mode: bool = False) None

Apply an event to update experiment state.

This is a stub method - concrete implementations (MockExperiment) should override this to handle specific event types.

Parameters:
  • event – Event to apply

  • merge_mode – If True, don’t overwrite fields that are already set (except for timestamps which should use the latest value)

apply_events(events: Iterable[EventBase]) None

Apply multiple events in bulk without per-event notifications.

property carbon_impact: CarbonImpactData | None

Carbon impact metrics for this experiment (sum and latest aggregations)

property dependencies: Dict[str, List[str]]

Job dependencies

property ended_at: datetime | None

End datetime (None if running)

property events_count: int

Number of events processed

property experiment_id: str

Experiment identifier derived from workdir structure

property failed_jobs: int

Number of failed jobs

async finalize_status(callback: Callable[[BaseExperiment], None] | None = None, cleanup_events: bool = False) bool

Finalize experiment status: load from disk, apply callback, cleanup, and write.

This method: 1. Acquires the experiment lock 2. Loads state from disk (to get latest event-based updates) 3. Calls the callback to modify experiment state (e.g., set status, mark completed) 4. Optionally cleans up event files (consolidates orphaned events) 5. Writes status if different from disk

Parameters:
  • callback – Optional function to modify experiment state after loading from disk. Called with the experiment as argument.

  • cleanup_events – If True, cleanup event files (consolidates orphaned events).

Returns:

True if the status was written, False if unchanged.

property finished_jobs: int

Number of finished jobs

get_job_state(job_id: str) JobState | None

Get experiment/scheduler state for a job.

Returns the scheduler lifecycle state for the given job, which may differ from the job’s execution state (job.state).

Parameters:

job_id – The job identifier

Returns:

JobState if tracked, None otherwise

get_services() List[BaseService]

Get services for this experiment as a list

static get_status_path(run_dir: Path) Path

Get status file path for a run directory

property hostname: str | None

Hostname where experiment runs

property jobs: Dict[str, BaseJob]

Jobs in this experiment

property run_dir: Path

Path to run directory (same as workdir)

property run_tags: list[str]

Tags assigned to this run (as sorted list for JSON serialization)

property services: Dict[str, BaseService]

Services in this experiment

property started_at: datetime | None

Start datetime

state_dict() Dict[str, Any]

Get experiment state as dictionary (single source of truth)

This is the canonical representation of experiment state used for both serialization to status files and network communication.

Note: Jobs are not included here - they are stored in jobs.jsonl.

property status: ExperimentStatus

Experiment status - override in subclasses

property tags: Dict[str, Dict[str, str]]

Tags for jobs

property total_jobs: int

Total number of jobs

write_status() None

Write status.json to disk (calls state_dict internally)

Uses file locking to ensure atomic writes across processes.

StateListener

class experimaestro.scheduler.experiment.StateListener(event_writer: ExperimentEventWriter, xp: experiment, experiment_id: str, run_id: str)

Listener that writes events to filesystem

Job state events are written to per-job event files by the scheduler. This listener writes experiment-level events (job state, services) to the experiment event file.

on_job_state_changed(job)

Job state changes are tracked via ExperimentJobStateEvent, not here.

JobStateChangedEvent is only written to job event files by the job process itself. Experiment event files only contain ExperimentJobStateEvent for scheduler lifecycle tracking.

on_job_submitted(job)

Called when job is submitted

on_service_added(service)

Write service added event to filesystem

service_state_changed(service)

Called when service state changes (runtime only, not persisted)

Workspace

Workspace

class experimaestro.scheduler.workspace.Workspace(settings: Settings, workspace_settings: WorkspaceSettings, launcher=None, run_mode: RunMode | None = None)

Workspace environment for experiments

This is a simple container for workspace settings, environment, and configuration. Multiple Workspace instances can exist for the same path - the singleton pattern is handled by WorkspaceStateProvider which manages the database per workspace path.

cleanup_old_scheduler_runs(retention_days: int = 15, force: bool = False) tuple[int, int]

Clean up old scheduler run directories

Parameters:
  • retention_days – Delete run directories older than this many days

  • force – Force cleanup even if recently run

Returns:

Tuple of (directories_deleted, errors_count)

property configcachepath

Folder for jobs

property connector

Returns the default connector

property experimentspath

Folder for experiments

property jobspath

Folder for jobs

property partialspath

Folder for partial job directories (shared checkpoints, etc.)

property scheduler_lock_path: Path

Lock file for scheduler directory operations

property scheduler_run_path: Path

Folder for this run’s scheduler data

property scheduler_services_path: Path

Folder for service logs

property schedulerpath: Path

Folder for scheduler metadata (base .scheduler directory)

classmethod set_launcher(launcher) None

Set the launcher for the current workspace

Parameters:

launcher – The launcher to use for task execution

RunMode

class experimaestro.scheduler.workspace.RunMode(value)

An enumeration.

DRY_RUN = 'dry-run'

Do not run

GENERATE_ONLY = 'generate'

Do not run, but generate the params.json file

NORMAL = 'normal'

Normal run

Dynamic Outputs

TaskOutputsWorker

class experimaestro.scheduler.dynamic_outputs.TaskOutputsWorker(xp: Experiment)

Worker thread that processes all task output callbacks for one experiment

Created by: experiment.__enter__()

Main responsibilities: 1. Manage TaskOutputs monitors for each job with watched outputs 2. Process RawEventItems and CallbackItems from the queue 3. Track task_output_count for experiment exit synchronization

Thread model: - File watchers run on watchdog thread, queue items to this worker - This worker processes items sequentially from the queue - Count updates are synchronized with the scheduler’s event loop

add(callback: Callable, event)

Add a callback event to the processing queue

Called by: TaskOutputWatcher.execute_event() or add_callback()

Parameters:
  • callback – The callback to call with the event

  • event – The processed Config object

async aio_process_job_outputs(job: Job)

Process any remaining task outputs for a completed job.

Called by: Job.aio_done_handler() (scheduler event loop)

This ensures all task outputs written by the job are queued for processing before the experiment considers exiting. File system watchers may have latency, so we explicitly read the file here.

Note: This only queues the events. The actual callbacks will complete asynchronously and decrement task_output_count when done.

Parameters:

job – The job that has finished

process_job_outputs(job) None

Explicitly process any remaining task outputs for a completed job.

This is called when a job finishes to ensure all task outputs written by the job are processed before the experiment considers exiting. This is necessary because file system watchers may have latency.

Parameters:

job – The job that has finished

run()

Main worker loop - processes items from queue until shutdown

Called by: threading.Thread.start()

Processes: - RawEventItem: calls execute_event() to convert and dispatch - CallbackItem: calls the user callback and updates count - None: shutdown signal

shutdown()

Stop the worker and all monitors

update_count(delta: int)

Update task_output_count safely from any thread.

Handles the case where we might be on the event loop thread (which would deadlock if we used run_coroutine_threadsafe().result()).

watch_output(watched: WatchedOutput)

Register a watched output

Parameters:

watched – The watched output specification

WatchedOutput

class experimaestro.core.objects.WatchedOutput(config: ConfigInformation, method_name: str, method: Callable, callback: Callable, job: Job | None = None)
callback: Callable

The callback to call (with the output of the previous method)

config: ConfigInformation

The configuration containing the watched output

job: Job | None

The enclosing job (set when registered with scheduler)

method: Callable

The watched output method (called with the JSON event)

method_name: str

The watched output (name)

Events

EventBase

class experimaestro.scheduler.state_status.EventBase(timestamp: float = <factory>)

Base class for all events

Events are lightweight - they carry only IDs, not object references. Use StateProvider to fetch actual objects (BaseJob, BaseExperiment, etc.) when needed.

Subclasses are automatically registered in EVENT_TYPES by their class name. JSON serialization/deserialization is handled transparently via event_type field.

property event_type: str

Event type derived from class name

classmethod from_dict(d: dict) EventBase

Deserialize event from dictionary

classmethod get_class(name: str) type[EventBase] | None

Get an EventBase subclass by class name

to_json() str

Serialize event to JSON string

JobSubmittedEvent

class experimaestro.scheduler.state_status.JobSubmittedEvent(timestamp: float = <factory>, experiment_id: str = '', job_id: str = '', task_id: str = '', run_id: str = '', tags: list[~experimaestro.scheduler.state_status.JobTag] = <factory>, depends_on: list[str] = <factory>, submitted_time: str | None = None)

Bases: JobEventBase, ExperimentEventBase

Event: Job was submitted to the scheduler

Fired when a job is added to an experiment run. This is both a job event and an experiment event.

JobStateChangedEvent

class experimaestro.scheduler.state_status.JobStateChangedEvent(timestamp: float = <factory>, job_id: str = '', task_id: str = '', state: str = '', failure_reason: str | None = None, started_time: str | None = None, ended_time: str | None = None, exit_code: int | None = None, retry_count: int = 0, progress: list[~experimaestro.scheduler.state_status.ProgressLevel] = <factory>)

Bases: JobEventBase

Event: Job execution state changed (from job process events / disk)

Fired when a job’s execution state changes (scheduled, running, done, error, etc.) Timestamps are stored as ISO format strings for JSON serialization. Only written to job event files (.events/jobs/), never to experiment event files.

JobProgressEvent

class experimaestro.scheduler.state_status.JobProgressEvent(timestamp: float = <factory>, job_id: str = '', task_id: str = '', level: int = 0, progress: float = 0.0, desc: str | None = None)

Bases: JobEventBase

Event: Job progress update

Written by the running job process to report progress.

RunCompletedEvent

class experimaestro.scheduler.state_status.RunCompletedEvent(timestamp: float = <factory>, experiment_id: str = '', run_id: str = '', status: str = 'completed', ended_at: str = '')

Bases: ExperimentEventBase

Event: Experiment run completed

EventWriter

class experimaestro.scheduler.state_status.EventWriter(initial_count: int = 0, permanent_dir: Path | None = None)

Base class for writing events to JSONL files

Events are written to {events_dir}/events-{count}.jsonl Uses line buffering so each event is flushed immediately after write.

Supports automatic file rotation when files exceed MAX_EVENT_FILE_SIZE. When rotating, the status file is updated with the new events_count.

Supports proactive hardlinking: when a permanent_dir is set and hardlinks are supported, a hardlink is created to permanent storage immediately when the event file is opened. This ensures events are written to both locations simultaneously and no data is lost if the process crashes.

archive_events() None

Archive events to permanent storage (called on completion)

For each temp file: - If permanent file exists (hardlink already created): just delete temp - If permanent file doesn’t exist: move temp to permanent

cleanup() None

Delete all event files in this directory (temporary files only)

close() None

Close the current event file (thread-safe)

abstract property events_dir: Path

Get the directory where events are written

flush() None

Flush the current event file to disk (thread-safe)

property permanent_dir: Path | None

Get the permanent storage directory

rotate(new_count: int) None

Rotate to a new event file (called after status file update)

property status_path: Path | None

Get the path to the status file, or None if not applicable

Subclasses should override this to return the path to their status file.

write_event(event: EventBase) None

Write an event to the current event file (thread-safe)

If permanent storage is configured and hardlinks are supported, creates a hardlink immediately when the file is first opened.

EventReader

class experimaestro.scheduler.state_status.EventReader(*directories: WatchedDirectory, max_open_files: int = 128)

Generic reader for events from JSONL files

Watches multiple directories with configurable entity ID extraction.

Supports: - One-shot reading: read_events_since_count() - Incremental reading: read_new_events() - tracks file positions - File watching: start_watching(), stop_watching() - uses watchdog - Entity tracking: on_created callback determines which entities to follow

ensure_file_polled(path: Path) None

Ensure a file is being polled by the file watcher.

Adds the file to the watcher’s polling list if not already tracked. This is needed on shared/network filesystems where watchdog doesn’t detect remote file creation (e.g., NFS, GPFS, Lustre).

Parameters:

path – Path to the event file to ensure is polled

follow(entity_id: str, dir_config: WatchedDirectory) list[EventBase]

Start following an entity (e.g., when scheduler submits a new job).

Returns all existing events for bulk consolidation by the caller, then starts tailing the latest event file for new events.

If the entity is already being followed, returns immediately.

Note: This bypasses the on_created callback - use this for explicit registration (e.g., scheduler submitting a job) rather than discovery.

Parameters:
  • entity_id – Entity identifier to follow

  • dir_config – Directory configuration for this entity

Returns:

List of existing events for the caller to consolidate

get_all_event_files() list[Path]

Get all event files across all directories, sorted by modification time

read_events_since_count(entity_id: str, start_count: int, base_dir: Path | None = None) list[EventBase]

Read events for an entity starting from a specific file count

Tries flat format first ({entity_id}-{count}.jsonl), then falls back to old subdir format ({entity_id}/events-{count}.jsonl).

Parameters:
  • entity_id – Entity identifier (job_id or experiment_id)

  • start_count – File count to start reading from

  • base_dir – Optional base directory to search in (defaults to first directory)

Returns:

List of events from files starting at start_count

read_new_events() list[tuple[str, EventBase]]

Read new events since last call (incremental reading)

Returns:

List of (entity_id, event) tuples

scan_existing_files() None

Scan for existing event files and set initial positions to end of file

Call this before start_watching() to skip existing events and only receive new ones.

start_watching() None

Start watching for file changes using FileWatcher.

Discovers existing entities, calls on_created for each, and replays all historical events for entities that should be followed.

For each entity, only the latest event file is tracked (tailed). Earlier files are processed during catch-up but not watched.

stop_watching() None

Stop watching for file changes (closes tailed file pools)

State Provider

StateProvider

class experimaestro.scheduler.state_provider.StateProvider

Abstract base class for state providers

Defines the interface that all state providers must implement. This enables both local (WorkspaceStateProvider), remote (SSHStateProviderClient), and live (Scheduler) providers to be used interchangeably.

Concrete implementations:

  • Scheduler: Live in-memory state from running experiments

  • OfflineStateProvider: Base for cached/persistent state (WorkspaceStateProvider, SSHStateProviderClient)

State listener management is provided by the base class with default implementations.

add_listener(listener: Callable[[EventBase], None]) None

Register a listener for state change events

Parameters:

listener – Callback function that receives StateEvent objects

abstract clean_job(job: BaseJob, perform: bool = False) bool

Clean a finished job

cleanup_orphan_partials(perform: bool = False) List[str]

Clean up orphan partial directories

abstract close() None

Close the state provider and release resources

delete_experiment(experiment_id: str, delete_jobs: bool = False, perform: bool = True) Tuple[bool, str]

Delete an experiment and optionally its job data

Parameters:
  • experiment_id – Experiment identifier to delete

  • delete_jobs – If True, also delete job directories (default: False)

  • perform – If True, actually perform deletion; if False, just check

Returns:

Tuple of (success, message)

delete_job_safely(job: BaseJob, perform: bool = True) Tuple[bool, str]

Safely delete a job and its data

Only deletes jobs that are finished (not running). Uses clean_job for the actual deletion.

Parameters:
  • job – The job to delete

  • perform – If True, actually perform deletion; if False, just check

Returns:

Tuple of (success, message)

execute_warning_action(warning_key: str, action_key: str, experiment_id: str = '', run_id: str = '') None

Execute a warning action and emit error event if it fails

Parameters:
  • warning_key – The warning identifier

  • action_key – The action to execute

  • experiment_id – Experiment ID for error events

  • run_id – Run ID for error events

Raises:

KeyError – If warning_key or action_key not found

abstract get_all_jobs(state: str | None = None, tags: Dict[str, str] | None = None, since: datetime | None = None) List[BaseJob]

Get all jobs across all experiments

abstract get_current_run(experiment_id: str) str | None

Get the current run ID for an experiment

abstract get_dependencies_map(experiment_id: str, run_id: str | None = None) Dict[str, List[str]]

Get dependencies map for jobs in an experiment/run

Dependencies are stored per (job_id, experiment_id, run_id) in JobDependenciesModel. This method returns a map from job_id to list of job_ids it depends on.

Parameters:
  • experiment_id – Experiment identifier

  • run_id – Run identifier (None = current run)

Returns:

Dictionary mapping job identifiers to list of job IDs they depend on

get_display_path(job: BaseJob) str

Get the path to display/copy for a job

For local providers, returns the local path. For remote providers, returns the remote path.

Parameters:

job – Job to get display path for

Returns:

Path string suitable for display and copying

abstract get_experiment(experiment_id: str) BaseExperiment | None

Get a specific experiment by ID

abstract get_experiment_job_info(experiment_id: str, run_id: str | None = None) Dict[str, ExperimentJobInformation]

Get experiment-level job info (submittime, transient) for jobs

Returns:

Dictionary mapping job_id to ExperimentJobInformation

abstract get_experiment_runs(experiment_id: str) List[BaseExperiment]

Get all runs for an experiment

Returns:

List of BaseExperiment instances (MockExperiment for past runs, or live experiment for the current run in Scheduler)

abstract get_experiments(since: datetime | None = None) List[BaseExperiment]

Get list of all experiments

abstract get_job(task_id: str, job_id: str) BaseJob | None

Get a job directly by task_id and job_id

Jobs are stored independently in workspace/jobs/task_id/job_id/, so they can be retrieved without knowing which experiment they belong to.

Parameters:
  • task_id – The task identifier

  • job_id – The job identifier (hash)

Returns:

The job if found, None otherwise

abstract get_jobs(experiment_id: str | None = None, run_id: str | None = None, task_id: str | None = None, state: str | None = None, tags: Dict[str, str] | None = None, since: datetime | None = None) List[BaseJob]

Query jobs with optional filters

get_last_sync_time() datetime | None

Get the last sync time (for incremental updates)

get_orphan_jobs() List[BaseJob]

Get orphan jobs (jobs not associated with any experiment run)

get_process_info(job: BaseJob) ProcessInfo | None

Get process information for a job

Returns a ProcessInfo dataclass or None if not available.

static get_resolved_state(job: BaseJob, experiment: BaseExperiment | None) tuple[JobState, JobState | None]

Resolve display state from experiment and execution states.

Returns (resolved_state, scheduler_state_or_none). - When there’s no conflict: (resolved_state, None) - When there’s a conflict: (exec_state, exp_state) — caller shows both icons

Uses JobState.resolve() for dispatch to subclass-specific logic.

abstract get_services(experiment_id: str | None = None, run_id: str | None = None) List[BaseService]

Get services for an experiment

get_stray_jobs() List[BaseJob]

Get stray jobs (running jobs not associated with any active experiment)

Stray jobs are a subset of orphan jobs - they are orphan jobs that are currently running or scheduled. These represent jobs where the experimental plan changed but the job process is still running.

Returns:

List of running/scheduled jobs not in any active experiment

abstract get_tags_map(experiment_id: str, run_id: str | None = None) Dict[str, Dict[str, str]]

Get tags map for jobs in an experiment/run

Tags are stored per (job_id, experiment_id, run_id) in JobTagModel. This method returns a map from job_id to {tag_key: tag_value}.

Parameters:
  • experiment_id – Experiment identifier

  • run_id – Run identifier (None = current run)

Returns:

Dictionary mapping job identifiers to their tags dict

get_unresolved_warnings() List[WarningEvent]

Get all unresolved warnings

Returns:

List of WarningEvent objects with metadata for all pending warnings

is_live: bool = False

Whether this provider is connected to a live scheduler

property is_remote: bool

Whether this is a remote provider (e.g., SSH)

Remote providers use periodic refresh instead of push notifications and support sync_path for on-demand file synchronization.

abstract kill_job(job: BaseJob, perform: bool = False) bool

Kill a running job

load_configs(experiment_id: str, run_id: str | None = None) dict[str, Any]

Load all job configs from a past experiment run.

Deprecated since version Use: load_xp_info() instead.

load_xp_info(experiment_id: str, run_id: str | None = None) ExperimentInfo

Load all serialized objects from a past experiment run.

Returns an ExperimentInfo with .jobs and .actions dictionaries.

Parameters:
  • experiment_id – Experiment identifier

  • run_id – Run identifier (None = current/latest run)

Returns:

ExperimentInfo with jobs and actions

Raises:
property read_only: bool

Whether this provider is read-only

register_warning_actions(warning_key: str, actions: Dict[str, Callable[[], None]], warning_event: WarningEvent | None = None) None

Register action callbacks for a warning

Parameters:
  • warning_key – Unique identifier for the warning

  • actions – Dict mapping action_key to callback function

  • warning_event – Optional WarningEvent with full metadata for display

remove_listener(listener: Callable[[EventBase], None]) None

Unregister a listener

Parameters:

listener – Previously registered callback function

service_state_changed(service) None

Called when a service’s state changes - emit event to listeners

StateProvider registers itself as a listener on services it returns, so this method is called when those services’ states change.

sync_path(path: str) Path | None

Sync a specific path from remote (remote providers only)

Returns None for local providers or if sync fails.

translate_path(path: Path) str

Translate a local path to a display path (e.g. remote path)

Parameters:

path – Local path to translate

Returns:

Path string suitable for display and copying

OfflineStateProvider

class experimaestro.scheduler.state_provider.OfflineStateProvider

Bases: StateProvider

State provider for offline/cached state access

Provides state listener management, job/experiment/service caching shared by WorkspaceStateProvider and SSHStateProviderClient.

This is an intermediate class between StateProvider (the ABC) and concrete implementations that need state listener support and caching.

Caching strategy: - Jobs and experiments are cached by their identifiers - Events update cached objects in-place to maintain consistency - get_jobs/get_experiments return cached objects when available

apply_event(event: EventBase) None

Apply an event to cached jobs and experiments

This method is called when events are received (from event files or via notifications). It updates the cached objects in-place to maintain state consistency.

Handles: - JobStateChangedEvent: Updates job state in cache - JobProgressEvent: Updates job progress in cache - CarbonMetricsEvent: Updates job carbon metrics in cache - JobSubmittedEvent: Adds new job to cache - ExperimentUpdatedEvent: Invalidates experiment cache

Subclasses may override this for additional logic.

get_services(experiment_id: str | None = None, run_id: str | None = None) List[BaseService]

Get services for an experiment

Uses caching to preserve service instances (and their URLs) across calls. Subclasses can override _get_live_services() for live service support and must implement _fetch_services_from_storage() for persistent storage.

If experiment_id is None, returns services from all experiments.

CarbonMetricsData

class experimaestro.scheduler.state_provider.CarbonMetricsData(co2_kg: float = 0.0, energy_kwh: float = 0.0, cpu_power_w: float = 0.0, gpu_power_w: float = 0.0, ram_power_w: float = 0.0, duration_s: float = 0.0, region: str = '', is_final: bool = False, written: bool = False)

Carbon metrics data for a job.

written: bool = False

True if the carbon record was successfully written to CarbonStorage.

Listeners

Listener

class experimaestro.scheduler.base.Listener
job_state(job)

Deprecated: use on_job_state_changed instead

job_submitted(job)

Deprecated: use on_job_submitted instead

on_job_state_changed(job)

Called when job state changes (new preferred method)

on_job_submitted(job)

Called when job is submitted (new preferred method)

on_service_added(service: Service)

Called when a new service is added (new preferred method)

service_add(service: Service)

Deprecated: use on_service_added instead

Exceptions

DirtyGitError

class experimaestro.DirtyGitError

Bases: HandledException

Raised when the git repository has uncommitted changes and dirty_git=error

FailedExperiment

class experimaestro.scheduler.experiment.FailedExperiment

Bases: HandledException

Raised when an experiment failed

GracefulExperimentExit

class experimaestro.scheduler.experiment.GracefulExperimentExit

Bases: Exception

Raised to exit an experiment context without waiting for running jobs.

This is useful in tests or when you want to detach from an experiment while keeping jobs running (e.g., to test stray job detection).

Example:

with experiment(workdir, "my-experiment") as xp:
    task = MyTask.C(value=1).submit()
    # Wait for task to start...
    raise GracefulExperimentExit()  # Exit without waiting for task to finish

Carbon Tracking

CarbonImpactData

class experimaestro.carbon.base.CarbonImpactData(sum: CarbonAggregateData | None = None, latest: CarbonAggregateData | None = None)

Carbon impact data for an experiment (sum and latest aggregations).

classmethod from_dict(d: dict | None) CarbonImpactData | None

Create from dictionary.

latest: CarbonAggregateData | None = None

Sum of only the latest run of each unique job.

sum: CarbonAggregateData | None = None

Sum of all job runs (including retries).

to_dict() dict

Convert to dictionary for JSON serialization.