Experiment and Scheduler Architecture
This document describes the internal architecture of the experimaestro experiment and scheduler systems, covering job lifecycle, state management, and completion handling.
See also the Internal API Reference for detailed class documentation.
Overview
The experiment system consists of two main components:
experiment(experiment.py): Context manager for experiment setup, tracking, and cleanupScheduler(base.py): Singleton managing job lifecycle and execution across all experiments
+-------------------------------------------------------------------+
| Scheduler |
| (singleton, asyncio event loop in background) |
| |
| +--------------+ +--------------+ +--------------+ |
| | Experiment 1 | | Experiment 2 | | Experiment 3 | |
| | (context) | | (context) | | (context) | |
| | | | | | | |
| | +----+----+ | | +----+ | | +----+----+ | |
| | |Job1|Job2| | | |Job1| | | |Job1|Job2| | |
| | +----+----+ | | +----+ | | +----+----+ | |
| +--------------+ +--------------+ +--------------+ |
+-------------------------------------------------------------------+
Experiment Lifecycle
Initialization (__init__)
The experiment class is a context manager that sets up the experiment environment:
with experiment(workdir, "my-experiment", port=12345) as xp:
task.submit()
During initialization:
Creates
Workspacefrom settings or pathSets up experiment base directory (
experiments/{name}/)Prepares lock path for preventing concurrent runs
Captures project paths for git info
Entry (__enter__)
When entering the context:
Lock acquisition: Locks experiment to prevent concurrent runs
Run ID generation: Creates unique
run_id(format:YYYYMMDD_HHMMSS, with.Nsuffix for collisions)Directory creation: Creates run-specific
workdir(experiments/{name}/{run_id}/)Environment capture: Records git info, Python version, hostname
Dirty git check: Optionally warns or errors on uncommitted changes (raises
DirtyGitError)Scheduler registration: Registers with the singleton
SchedulerEvent writer initialization: Starts
EventWriterfor state changesTaskOutputsWorker start: Begins processing dynamic task outputs via
TaskOutputsWorker
Exit (__exit__)
When exiting the context:
Wait for jobs: Unless exception or
GracefulExperimentExit, waits for all jobs to completeStop services: Stops any registered services
Finalize run: Writes final status and archives events
Cleanup transient jobs: Removes transient job directories if clean exit
History cleanup: Removes old runs based on
max_done/max_failedsettingsRelease lock: Releases workspace lock
File Structure
experiments/{name}/
lock # Experiment-level lock file
{run_id}/
environment.json # Environment and git info
status.json # Final run status
jobs.jsonl # Job list (one line per job)
jobs/ # Symlinks to job directories
results/ # User-defined results
data/ # Saved configurations
Scheduler Architecture
The Scheduler is a singleton that runs an asyncio event loop in a background thread.
It manages job lifecycle across all active experiments.
Key Components
Component |
Purpose |
|---|---|
|
Dict of active experiments |
|
Dict of all |
|
Set of jobs awaiting execution |
|
The asyncio event loop |
|
Condition for signaling completions |
|
Lock for acquiring dynamic dependencies |
Singleton Pattern
scheduler = Scheduler.instance() # Get or create singleton
Uses an internal event loop thread for a shared event loop across locking, file watching, and scheduling.
Job Lifecycle
Job instances transition through these JobState values:
UNSCHEDULED → WAITING → READY → SCHEDULED → RUNNING → DONE/ERROR
(0) (1) (2) (3) (4) (5/6)
State |
Description |
|---|---|
|
Initial state, job not yet submitted |
|
Job submitted, waiting for dependencies |
|
Dependencies resolved, ready to execute |
|
Scheduled for execution |
|
Job process is executing |
|
Job completed successfully |
|
Job failed (with optional |
Error States
ERROR states can have a JobFailureStatus:
DEPENDENCY: Upstream job failedFAILED: Job execution failedMEMORY: Out of memoryTIMEOUT: Walltime exceeded (allows retries for resumable tasks)DELETED: Job data was deleted by the user from the monitor
Job Submission Pipeline
submit() → aio_registerJob() → aio_submit() → aio_start() → completion
│ │ │ │
│ │ │ └→ Execute process, wait
│ │ └→ Create symlinks, notify, start inner
│ └→ Deduplicate, merge state, check resubmission
└→ Thread-safe entry point
1. submit(job) - Thread-safe Entry
Calls
aio_registerJob()to check for duplicatesSets up job future for async tracking
Returns immediately with existing job if resubmitted
2. aio_registerJob(job) - Deduplication
Jobs with identical configurations get the same identifier (hash). When resubmitting:
Checks if job already exists in scheduler
If found, adds experiment to existing job’s experiments list
Merges
TransientMode(more conservative wins)Updates
max_retriesif new job has higher valueCopies watched outputs from new job to existing
Returns existing job (unless needs restart)
3. aio_submit(job) - Main Execution
Adds job to
waitingjobsRegisters
WatchedOutputinstancesCreates symlink in experiment’s
jobspathNotifies
Listenerof job submissionCalls
aio_submit_inner()in retry loop (forGracefulTimeout)Processes final state via
aio_final_state()
4. aio_submit_inner(job) - Load and Check State
Loads existing state from disk (preserves history)
Checks if already done or exhausted retries
Clears transient fields
Sets state to
WAITINGChecks for already running process
Calls
aio_start()if job needs to run
5. aio_start(job) - Actual Execution
The job start sequence involves careful lock management. The scheduler
and the job process both use the same lock file ({scriptname}.lock),
but at different phases.
Timeline:
Scheduler Job Process
───────── ───────────
1. Wait for static dependencies (jobs)
2. Acquire job lock ──────────────────┐
3. Acquire dynamic dependencies │
(tokens, with deadlock prevention) │
4. Create job directory │
5. Clean up old markers/events │
6. Write status.json (no PID yet) │
7. aio_run(): │
- Launch process ──────────────────┼──────→ Process spawned
- Write {scriptname}.pid │
- Write status.json (with PID) │
8. Write locks.json (dynamic locks) │
9. Release job lock ──────────────────┘
┌────── Acquire job lock
│ Execute task
│ Write .done or .failed
│ Write status.json (final)
└────── Release job lock
10. Wait for process to end
11. Read final state from .done/.failed
12. Return final state
Note on killed jobs: If the job process is killed (e.g., SLURM
scancel, OOM), it won’t write .done/.failed or final status.json.
In that case, cleanup is handled by the scheduler (if still running) or
by a later experimaestro process that detects the stale state.
Gap between steps 9 and job process lock: After the scheduler releases the job lock (step 9) and before the job process acquires it, the lock is not held but the job is still active. During this window:
{scriptname}.pidexists (written byaio_run()in step 7)status.jsonexists with PID informationNo
.done/.failedmarkers exist yet
Any code that checks whether a job is active (e.g., cleanup) must account
for this gap by checking not just the lock, but also the PID file and
the absence of terminal markers. Process liveness is checked using the
launcher-independent Process abstraction (see below).
State Management
All state changes go through the set_state() method which ensures:
Automatic timestamp updates:
WAITING: setssubmittime, clearsstarttimeandendtimeRUNNING: setsstarttimeDONE/ERROR: setsendtime
Experiment statistics: Updates
unfinishedJobsandfailedJobscountersListener notifications: Notifies registered
Listenerof state changesFuture reset: Resets
_final_statefuture when transitioning toWAITING(enables proper handling of job resubmission)
State Property Encapsulation
The job state is stored in _state (private) and accessed via the state property.
Direct assignment (job.state = X) automatically calls set_state() to ensure
proper handling.
Job Locking
Each job has a lock file ({job_path}/{scriptname}.lock) ensuring:
Only one scheduler instance can run a job at a time
Status file writes are atomic and consistent
The lock is acquired before starting a job and released after the process starts. The done handler acquires its own lock (using filelock) when writing the final status file.
Dependency Management
Static Dependencies (Jobs)
Waited sequentially before job starts via
JobDependencyDon’t need the dependency lock
If dependency fails, dependent job fails with
DEPENDENCYreason
Dynamic Dependencies (Tokens)
Tokens are acquired with dependencyLock to prevent deadlocks:
Problem: Multiple jobs acquiring tokens could deadlock (A waits for Token1 while holding Token2, B waits for Token2 while holding Token1).
Solution:
Single
dependencyLockensures only one task acquires dynamic deps at a timeRetry logic: if any lock fails, release all and restart
First dependency has 0 timeout, others have 0.1s
Done Handler
Job completion is processed in a dedicated thread pool:
Process task outputs: Calls
job.done_handler()to process any registered task output callbacksWrite final status: Acquires the job lock and writes the final status file with timestamps and state
Cleanup: Removes job from scheduler’s waiting jobs set
Resolve future: Sets the result on
_final_statefuture to unblockaio_submit()callers
Thread Safety
The done handler runs in a separate thread pool (4 workers by default) to avoid
blocking the main event loop. Communication with the event loop uses
call_soon_threadsafe() and run_coroutine_threadsafe().
Transient Jobs
Jobs can be marked as transient with different TransientMode values:
Mode |
Description |
|---|---|
|
Normal job, always runs |
|
Only runs if a non-transient job depends on it |
|
Like |
Transient jobs that aren’t needed stay in UNSCHEDULED state and skip the done
handler processing. They can be converted to non-transient if a job depends on them.
Resumable Tasks
Tasks inheriting from ResumableTask support automatic retry on timeout:
When a timeout occurs,
retry_countis incrementedIf
retry_count <= max_retries, the job restarts automaticallyLog files are rotated via
rotate_logs()to preserve previous run’s outputThe job directory is preserved between retries for checkpoint files
class LongTraining(ResumableTask):
checkpoint: Meta[Path] = field(default_factory=PathGenerator("checkpoint.pth"))
def execute(self):
start_epoch = 0
if self.checkpoint.exists():
start_epoch = load_checkpoint(self.checkpoint)
for epoch in range(start_epoch, self.epochs):
remaining = self.remaining_time()
if remaining is not None and remaining < 300:
save_checkpoint(self.checkpoint, epoch)
raise GracefulTimeout("Not enough time")
train_one_epoch()
save_checkpoint(self.checkpoint, epoch)
Dynamic Task Outputs
For tasks that produce outputs during execution (e.g., checkpoints), see WatchedOutput:
class Training(ResumableTask):
validation: Param[Validation]
def execute(self):
for step in range(100):
train_step()
if step % 10 == 0:
self.validation.compute(step) # Calls register_task_output
# Usage
def on_checkpoint(checkpoint):
Evaluate.C(checkpoint=checkpoint).submit()
training = Training.C(...)
training.watch_output(validation.checkpoint, on_checkpoint)
training.submit()
The TaskOutputsWorker processes these in a separate thread:
Queues callbacks for execution
Updates
taskOutputQueueSizevia schedulerWaits for all outputs before experiment completes
Events stored in
.experimaestro/task-outputs.jsonlfor replay on restart
Event System
Event Types
Job Events:
JobSubmittedEvent: Job was submitted (records tags, dependencies)JobStateChangedEvent: Job state changed (records failure reason, times, exit code)JobProgressEvent: Job progress updated
Experiment Events:
RunCompletedEvent: Experiment run finishedServiceAddedEvent: Service added to experimentServiceStateChangedEvent: Service state changed
Process Events:
CarbonMetricsEvent: Carbon tracking data (seeCarbonMetricsData)ProcessStartedEvent: Process startedProcessStateEvent: Process state changed
Event Storage
Events are written to JSONL files via EventWriter and read via EventReader:
workspace/.events/
experiments/{exp_id}/
events-0.jsonl
events-1.jsonl
jobs/{task_id}/
event-{job_id}-0.jsonl
After run completion, events are archived to permanent storage for replay.
Notification System
Two parallel notification systems:
Legacy listeners:
Listenercallbacks (job_submitted,job_state)StateProvider listeners:
StateProviderevent-based for TUI/Web UI
Notifications run in a thread pool to avoid blocking the scheduler.
Key Classes
Class |
File |
Purpose |
|---|---|---|
|
|
Main scheduler, manages event loop and job lifecycle |
|
Context manager for experiment setup |
|
|
Job state machine with dependencies |
|
|
|
Base class with state property and timestamps |
|
Processes dynamic task outputs |
|
|
Working directory and run mode management |
|
|
Writes state changes to event files |
Workspace Structure
workspace/
.events/
experiments/{exp_id}/events-*.jsonl
jobs/{task_id}/event-{job_id}-*.jsonl
jobs/
{task_id}/
{job_hash}/
.experimaestro/
status.json
task-outputs.jsonl
params.json
script.py
script.out
script.err
script.done/.failed
script.pid
locks.json
experiments/
{exp_id}/
lock
{run_id}/
environment.json
status.json
jobs.jsonl
jobs/ (symlinks)
results/
data/
partials/
config/