Experiment and Scheduler Architecture

This document describes the internal architecture of the experimaestro experiment and scheduler systems, covering job lifecycle, state management, and completion handling.

See also the Internal API Reference for detailed class documentation.

Overview

The experiment system consists of two main components:

  1. experiment (experiment.py): Context manager for experiment setup, tracking, and cleanup

  2. Scheduler (base.py): Singleton managing job lifecycle and execution across all experiments

+-------------------------------------------------------------------+
|                           Scheduler                               |
|           (singleton, asyncio event loop in background)           |
|                                                                   |
|   +--------------+   +--------------+   +--------------+          |
|   | Experiment 1 |   | Experiment 2 |   | Experiment 3 |          |
|   |   (context)  |   |   (context)  |   |   (context)  |          |
|   |              |   |              |   |              |          |
|   | +----+----+  |   |    +----+    |   | +----+----+  |          |
|   | |Job1|Job2|  |   |    |Job1|    |   | |Job1|Job2|  |          |
|   | +----+----+  |   |    +----+    |   | +----+----+  |          |
|   +--------------+   +--------------+   +--------------+          |
+-------------------------------------------------------------------+

Experiment Lifecycle

Initialization (__init__)

The experiment class is a context manager that sets up the experiment environment:

with experiment(workdir, "my-experiment", port=12345) as xp:
    task.submit()

During initialization:

  • Creates Workspace from settings or path

  • Sets up experiment base directory (experiments/{name}/)

  • Prepares lock path for preventing concurrent runs

  • Captures project paths for git info

Entry (__enter__)

When entering the context:

  1. Lock acquisition: Locks experiment to prevent concurrent runs

  2. Run ID generation: Creates unique run_id (format: YYYYMMDD_HHMMSS, with .N suffix for collisions)

  3. Directory creation: Creates run-specific workdir (experiments/{name}/{run_id}/)

  4. Environment capture: Records git info, Python version, hostname

  5. Dirty git check: Optionally warns or errors on uncommitted changes (raises DirtyGitError)

  6. Scheduler registration: Registers with the singleton Scheduler

  7. Event writer initialization: Starts EventWriter for state changes

  8. TaskOutputsWorker start: Begins processing dynamic task outputs via TaskOutputsWorker

Exit (__exit__)

When exiting the context:

  1. Wait for jobs: Unless exception or GracefulExperimentExit, waits for all jobs to complete

  2. Stop services: Stops any registered services

  3. Finalize run: Writes final status and archives events

  4. Cleanup transient jobs: Removes transient job directories if clean exit

  5. History cleanup: Removes old runs based on max_done/max_failed settings

  6. Release lock: Releases workspace lock

File Structure

experiments/{name}/
  lock                          # Experiment-level lock file
  {run_id}/
    environment.json            # Environment and git info
    status.json                 # Final run status
    jobs.jsonl                  # Job list (one line per job)
    jobs/                       # Symlinks to job directories
    results/                    # User-defined results
    data/                       # Saved configurations

Scheduler Architecture

The Scheduler is a singleton that runs an asyncio event loop in a background thread. It manages job lifecycle across all active experiments.

Key Components

Component

Purpose

experiments

Dict of active experiments

jobs

Dict of all Job instances (by identifier)

waitingjobs

Set of jobs awaiting execution

loop

The asyncio event loop

exitCondition

Condition for signaling completions

dependencyLock

Lock for acquiring dynamic dependencies

Singleton Pattern

scheduler = Scheduler.instance()  # Get or create singleton

Uses an internal event loop thread for a shared event loop across locking, file watching, and scheduling.

Job Lifecycle

Job instances transition through these JobState values:

UNSCHEDULED → WAITING → READY → SCHEDULED → RUNNING → DONE/ERROR
     (0)        (1)      (2)       (3)         (4)      (5/6)

State

Description

UNSCHEDULED

Initial state, job not yet submitted

WAITING

Job submitted, waiting for dependencies

READY

Dependencies resolved, ready to execute

SCHEDULED

Scheduled for execution

RUNNING

Job process is executing

DONE

Job completed successfully

ERROR

Job failed (with optional JobFailureStatus)

Error States

ERROR states can have a JobFailureStatus:

  • DEPENDENCY: Upstream job failed

  • FAILED: Job execution failed

  • MEMORY: Out of memory

  • TIMEOUT: Walltime exceeded (allows retries for resumable tasks)

  • DELETED: Job data was deleted by the user from the monitor

Job Submission Pipeline

submit() → aio_registerJob() → aio_submit() → aio_start() → completion
    │              │                │              │
    │              │                │              └→ Execute process, wait
    │              │                └→ Create symlinks, notify, start inner
    │              └→ Deduplicate, merge state, check resubmission
    └→ Thread-safe entry point

1. submit(job) - Thread-safe Entry

  • Calls aio_registerJob() to check for duplicates

  • Sets up job future for async tracking

  • Returns immediately with existing job if resubmitted

2. aio_registerJob(job) - Deduplication

Jobs with identical configurations get the same identifier (hash). When resubmitting:

  1. Checks if job already exists in scheduler

  2. If found, adds experiment to existing job’s experiments list

  3. Merges TransientMode (more conservative wins)

  4. Updates max_retries if new job has higher value

  5. Copies watched outputs from new job to existing

  6. Returns existing job (unless needs restart)

3. aio_submit(job) - Main Execution

  • Adds job to waitingjobs

  • Registers WatchedOutput instances

  • Creates symlink in experiment’s jobspath

  • Notifies Listener of job submission

  • Calls aio_submit_inner() in retry loop (for GracefulTimeout)

  • Processes final state via aio_final_state()

4. aio_submit_inner(job) - Load and Check State

  • Loads existing state from disk (preserves history)

  • Checks if already done or exhausted retries

  • Clears transient fields

  • Sets state to WAITING

  • Checks for already running process

  • Calls aio_start() if job needs to run

5. aio_start(job) - Actual Execution

The job start sequence involves careful lock management. The scheduler and the job process both use the same lock file ({scriptname}.lock), but at different phases.

Timeline:
                    Scheduler                         Job Process
                    ─────────                         ───────────
  1. Wait for static dependencies (jobs)
  2. Acquire job lock ──────────────────┐
  3. Acquire dynamic dependencies       │
     (tokens, with deadlock prevention) │
  4. Create job directory               │
  5. Clean up old markers/events        │
  6. Write status.json (no PID yet)     │
  7. aio_run():                         │
     - Launch process ──────────────────┼──────→ Process spawned
     - Write {scriptname}.pid           │
     - Write status.json (with PID)     │
  8. Write locks.json (dynamic locks)   │
  9. Release job lock ──────────────────┘
                                          ┌────── Acquire job lock
                                          │       Execute task
                                          │       Write .done or .failed
                                          │       Write status.json (final)
                                          └────── Release job lock
 10. Wait for process to end
 11. Read final state from .done/.failed
 12. Return final state

Note on killed jobs: If the job process is killed (e.g., SLURM scancel, OOM), it won’t write .done/.failed or final status.json. In that case, cleanup is handled by the scheduler (if still running) or by a later experimaestro process that detects the stale state.

Gap between steps 9 and job process lock: After the scheduler releases the job lock (step 9) and before the job process acquires it, the lock is not held but the job is still active. During this window:

  • {scriptname}.pid exists (written by aio_run() in step 7)

  • status.json exists with PID information

  • No .done/.failed markers exist yet

Any code that checks whether a job is active (e.g., cleanup) must account for this gap by checking not just the lock, but also the PID file and the absence of terminal markers. Process liveness is checked using the launcher-independent Process abstraction (see below).

State Management

All state changes go through the set_state() method which ensures:

  1. Automatic timestamp updates:

    • WAITING: sets submittime, clears starttime and endtime

    • RUNNING: sets starttime

    • DONE/ERROR: sets endtime

  2. Experiment statistics: Updates unfinishedJobs and failedJobs counters

  3. Listener notifications: Notifies registered Listener of state changes

  4. Future reset: Resets _final_state future when transitioning to WAITING (enables proper handling of job resubmission)

State Property Encapsulation

The job state is stored in _state (private) and accessed via the state property. Direct assignment (job.state = X) automatically calls set_state() to ensure proper handling.

Job Locking

Each job has a lock file ({job_path}/{scriptname}.lock) ensuring:

  • Only one scheduler instance can run a job at a time

  • Status file writes are atomic and consistent

The lock is acquired before starting a job and released after the process starts. The done handler acquires its own lock (using filelock) when writing the final status file.

Dependency Management

Static Dependencies (Jobs)

  • Waited sequentially before job starts via JobDependency

  • Don’t need the dependency lock

  • If dependency fails, dependent job fails with DEPENDENCY reason

Dynamic Dependencies (Tokens)

Tokens are acquired with dependencyLock to prevent deadlocks:

Problem: Multiple jobs acquiring tokens could deadlock (A waits for Token1 while holding Token2, B waits for Token2 while holding Token1).

Solution:

  • Single dependencyLock ensures only one task acquires dynamic deps at a time

  • Retry logic: if any lock fails, release all and restart

  • First dependency has 0 timeout, others have 0.1s

Done Handler

Job completion is processed in a dedicated thread pool:

  1. Process task outputs: Calls job.done_handler() to process any registered task output callbacks

  2. Write final status: Acquires the job lock and writes the final status file with timestamps and state

  3. Cleanup: Removes job from scheduler’s waiting jobs set

  4. Resolve future: Sets the result on _final_state future to unblock aio_submit() callers

Thread Safety

The done handler runs in a separate thread pool (4 workers by default) to avoid blocking the main event loop. Communication with the event loop uses call_soon_threadsafe() and run_coroutine_threadsafe().

Transient Jobs

Jobs can be marked as transient with different TransientMode values:

Mode

Description

NONE

Normal job, always runs

TRANSIENT

Only runs if a non-transient job depends on it

REMOVE

Like TRANSIENT, but job directory is removed after experiment ends

Transient jobs that aren’t needed stay in UNSCHEDULED state and skip the done handler processing. They can be converted to non-transient if a job depends on them.

Resumable Tasks

Tasks inheriting from ResumableTask support automatic retry on timeout:

  1. When a timeout occurs, retry_count is incremented

  2. If retry_count <= max_retries, the job restarts automatically

  3. Log files are rotated via rotate_logs() to preserve previous run’s output

  4. The job directory is preserved between retries for checkpoint files

class LongTraining(ResumableTask):
    checkpoint: Meta[Path] = field(default_factory=PathGenerator("checkpoint.pth"))

    def execute(self):
        start_epoch = 0
        if self.checkpoint.exists():
            start_epoch = load_checkpoint(self.checkpoint)

        for epoch in range(start_epoch, self.epochs):
            remaining = self.remaining_time()
            if remaining is not None and remaining < 300:
                save_checkpoint(self.checkpoint, epoch)
                raise GracefulTimeout("Not enough time")

            train_one_epoch()
            save_checkpoint(self.checkpoint, epoch)

Dynamic Task Outputs

For tasks that produce outputs during execution (e.g., checkpoints), see WatchedOutput:

class Training(ResumableTask):
    validation: Param[Validation]

    def execute(self):
        for step in range(100):
            train_step()
            if step % 10 == 0:
                self.validation.compute(step)  # Calls register_task_output

# Usage
def on_checkpoint(checkpoint):
    Evaluate.C(checkpoint=checkpoint).submit()

training = Training.C(...)
training.watch_output(validation.checkpoint, on_checkpoint)
training.submit()

The TaskOutputsWorker processes these in a separate thread:

  • Queues callbacks for execution

  • Updates taskOutputQueueSize via scheduler

  • Waits for all outputs before experiment completes

  • Events stored in .experimaestro/task-outputs.jsonl for replay on restart

Event System

Event Types

Job Events:

Experiment Events:

  • RunCompletedEvent: Experiment run finished

  • ServiceAddedEvent: Service added to experiment

  • ServiceStateChangedEvent: Service state changed

Process Events:

  • CarbonMetricsEvent: Carbon tracking data (see CarbonMetricsData)

  • ProcessStartedEvent: Process started

  • ProcessStateEvent: Process state changed

Event Storage

Events are written to JSONL files via EventWriter and read via EventReader:

workspace/.events/
  experiments/{exp_id}/
    events-0.jsonl
    events-1.jsonl
  jobs/{task_id}/
    event-{job_id}-0.jsonl

After run completion, events are archived to permanent storage for replay.

Notification System

Two parallel notification systems:

  1. Legacy listeners: Listener callbacks (job_submitted, job_state)

  2. StateProvider listeners: StateProvider event-based for TUI/Web UI

Notifications run in a thread pool to avoid blocking the scheduler.

Key Classes

Class

File

Purpose

Scheduler

base.py

Main scheduler, manages event loop and job lifecycle

experiment

experiment.py

Context manager for experiment setup

Job

jobs.py

Job state machine with dependencies

BaseJob

interfaces.py

Base class with state property and timestamps

TaskOutputsWorker

dynamic_outputs.py

Processes dynamic task outputs

Workspace

workspace.py

Working directory and run mode management

StateListener

experiment.py

Writes state changes to event files

Workspace Structure

workspace/
  .events/
    experiments/{exp_id}/events-*.jsonl
    jobs/{task_id}/event-{job_id}-*.jsonl
  jobs/
    {task_id}/
      {job_hash}/
        .experimaestro/
          status.json
          task-outputs.jsonl
        params.json
        script.py
        script.out
        script.err
        script.done/.failed
        script.pid
        locks.json
  experiments/
    {exp_id}/
      lock
      {run_id}/
        environment.json
        status.json
        jobs.jsonl
        jobs/ (symlinks)
        results/
        data/
  partials/
  config/