Tasks
A task (Task) is a special configuration that can be:
Submitted to the task scheduler using
submit()(preparation of the experiment)Executed with the method
execute()(running a specific task within the experiment)
Defining a task
from experimaestro import Config, Task, Param, Meta, PathGenerator, field
class ModelLearn(Task):
epochs: Param[int] = 100
model: Param[Model]
parameters: Meta[Path] = field(default_factory=PathGenerator("parameters.pth"))
def execute(self):
"""Called when this task is run"""
pass
Task lifecycle
During task execution, the working directory is set to be the task directory, and some special variables are defined:
tags can be accessed as a dictionary using
self.__tags__task directory can is
self.__taskdir__when using partial identifiers,
self.__maintaskdir__is the directory of the main task
Tasks outputs and dependencies
Task outputs can be re-used by other tasks. It is thus important
to properly define what is a task dependency (i.e. the task
should be run before) or not. To do so, the task_outputs() method
of a Task takes one argument, dep, which can be used to mark a dependency
to the task.
By default, the task configuration is marked as a dependency as follows:
For more complex cases, one can redefine the task_outputs method
and explicitly declare the dependencies.
Task outputs
In this example, we sample from a dataset composed of composed of queries and documents. The documents are left untouched, but the topics are sampled. In that case, we express the fact that:
the returned object
Datasetshould be dependant on the taskRandomFoldthe
topicsproperty of this dataset should also be dependantbut the
documentsproperty should not (since we do not sample from it)
from pathlib import Path
from experimaestro import Task, Param, field
from experimaestro.generators import PathGenerator
class RandomFold(Task):
dataset: Param[Dataset]
"""The source dataset"""
topics: Param[Path] = field(default_factory=PathGenerator("topics.tsv"))
"""Generated topics"""
def task_outputs(self, dep) -> Adhoc:
return dep(Dataset.C(
topics=dep(Topics.C(path=self.topics)),
documents=self.dataset.documents,
))
Common use case
A common use case is when we want to use a Path which contains the output of
the task A. Given how experimaestro works, just using it in a dependant task B
will not work since the ID will be the same whatever the task A
configuration. There are two ways to do so:
Wrap the task A output into a configuration object
Using initialization tasks
Wrapping the task output
Initialization tasks
The second solution is particularly suited when wanting to restore an object
state from disk, and we want to separate the loading mechanism from the
configuration logic; in that case, LightweightTask (a Config which must be
subclassed) can be used.
from pathlib import Path
from experimaestro import Config, Param, Task, Meta, LightweightTask
class Model(Config):
...
class ModelLoader(LightweightTask):
path: Meta[Path]
model: Param[Model]
def execute(self):
...
class Evaluate(Task):
model: Param[Model]
def execute(self):
...
class ModelLearner(Task):
model: Param[Model]
def task_outputs(self, dep):
return dep(ModelLoader.C(model=model, path=path))
def execute(self):
...
# We learn the model...
model_loader = learner.submit()
# ... and evaluate it, using the learned parameters
Evaluate.C(model=model).submit(init_tasks=[model_loader])
Submit hooks
When a task is submitted, it is possible to modify the job/launcher environment using SubmitHook.
from experimaestro import Config
from experimaestro.core.types import SubmitHook
from experimaestro.scheduler.jobs import Job
from experimaestro.launchers import Launcher
class needs_java(SubmitHook):
def __init__(self, version: int):
self.version = version
def spec(self):
"""Returns a hashable identifier for this hook (so it is only applied once)"""
return self.version
def process(self, job: Job, launcher: Launcher):
"""Apply the hook for a given job/launcher"""
job.environ["JAVA_HOME"] = "THE_JAVA_HOME"
...
@needs_java(11)
class IndexCollection(Config):
...
Resumable Tasks
For long-running tasks that may be interrupted by scheduler timeouts (e.g., SLURM job time limits), you can use ResumableTask instead of Task. Resumable tasks can automatically retry when they fail due to timeouts, allowing them to resume from checkpoints.
Defining a resumable task
from experimaestro import ResumableTask, Param, Meta, PathGenerator, field
from pathlib import Path
class LongTraining(ResumableTask):
epochs: Param[int] = 1000
checkpoint: Meta[Path] = field(default_factory=PathGenerator("checkpoint.pth"))
def execute(self):
# Check if we're resuming from a checkpoint
start_epoch = 0
if self.checkpoint.exists():
start_epoch = load_checkpoint(self.checkpoint)
# Continue training from where we left off
for epoch in range(start_epoch, self.epochs):
train_one_epoch()
save_checkpoint(self.checkpoint, epoch)
Automatic Retry on Timeout
When a resumable task times out (e.g., reaches SLURM walltime limit), the scheduler automatically restarts it up to a maximum number of retries. The retry limit can be configured:
Globally: Set
max_retriesin workspace settings (~/.config/experimaestro/settings.yaml):workspaces: - id: my_workspace path: /path/to/workspace max_retries: 5 # Default is 3
Per-task: Override when submitting:
task = LongTraining.C(epochs=1000).submit(max_retries=10)
Important Considerations
Checkpointing: Your task should save progress to checkpoints and check for existing checkpoints on startup
Idempotency: The task should produce the same results whether run once or restarted multiple times
Directory preservation: Unlike regular tasks, the task directory is preserved between retries to maintain checkpoints
Non-timeout failures: Only timeout failures trigger retries. Other errors (out of memory, bugs, etc.) will not retry
Querying Remaining Time
Resumable tasks can query the remaining time before a job timeout using the remaining_time() method. This is useful for deciding whether to start another iteration or checkpoint before the scheduler kills the job.
Using remaining_time()
from experimaestro import ResumableTask, GracefulTimeout, Param, Meta, PathGenerator, field
from pathlib import Path
class LongTraining(ResumableTask):
epochs: Param[int] = 1000
checkpoint: Meta[Path] = field(default_factory=PathGenerator("checkpoint.pth"))
def execute(self):
start_epoch = 0
if self.checkpoint.exists():
start_epoch = load_checkpoint(self.checkpoint)
for epoch in range(start_epoch, self.epochs):
# Check if we have enough time for another epoch
remaining = self.remaining_time()
if remaining is not None and remaining < 300: # 5 min buffer
save_checkpoint(self.checkpoint, epoch)
raise GracefulTimeout("Not enough time for another epoch")
train_one_epoch()
save_checkpoint(self.checkpoint, epoch)
The remaining_time() method returns:
Remaining seconds (as
float): When running on a launcher with time limits (e.g., SLURM)None: When there is no time limit, or the launcher doesn’t support querying remaining time
Note
Launcher Support
Currently, remaining_time() is supported for:
SLURM: Queries the remaining walltime using
squeueDirect (local): Always returns
None(no time limit)
The remaining time is cached internally, so repeated calls are efficient.
Graceful Timeout
Sometimes a task knows it won’t have enough time to complete another processing step before the scheduler kills it (e.g., SLURM walltime). In this case, the task can raise GracefulTimeout to stop cleanly and trigger a retry.
Using GracefulTimeout with remaining_time()
from experimaestro import ResumableTask, GracefulTimeout, Param, Meta, PathGenerator, field
from pathlib import Path
class LongTraining(ResumableTask):
epochs: Param[int] = 1000
checkpoint: Meta[Path] = field(default_factory=PathGenerator("checkpoint.pth"))
def execute(self):
start_epoch = 0
if self.checkpoint.exists():
start_epoch = load_checkpoint(self.checkpoint)
for epoch in range(start_epoch, self.epochs):
# Check remaining time before starting an epoch
remaining = self.remaining_time()
if remaining is not None and remaining < 300: # 5 min buffer
raise GracefulTimeout("Not enough time for another epoch")
train_one_epoch()
save_checkpoint(self.checkpoint, epoch)
When GracefulTimeout is raised:
The task is marked as failed with reason “timeout”
If
retry_count < max_retries, the task is automatically resubmittedThe task directory is preserved, allowing the task to resume from its checkpoint
This is useful when:
Training epochs have variable duration and you want to stop before being killed
You’re monitoring remaining walltime via scheduler environment variables (e.g.,
SLURM_JOB_END_TIME)You want to ensure checkpoints are saved cleanly before termination
Graceful Termination
When a task receives SIGTERM (e.g., from scancel) or SIGINT (Ctrl+C), the framework provides a mechanism for graceful cleanup:
A background thread starts the framework cleanup (writes
.failedwithreason: "cancelled", releases locks)A
TaskCancelledexception is raised in the main thread
Tasks can catch this exception to perform custom cleanup before termination:
Handling TaskCancelled
from experimaestro import Task, TaskCancelled
class LongRunningTask(Task):
def execute(self):
try:
for step in range(1000):
do_work(step)
except TaskCancelled as e:
# Custom cleanup before termination
# e.remaining_time contains seconds until SIGKILL (if known)
save_partial_state()
close_connections()
raise # Re-raise to let framework handle exit
Important
Key points about graceful termination:
Framework cleanup runs in a background thread regardless of whether the task catches the exception
If the task catches and suppresses
TaskCancelled(doesn’t re-raise), the job is still marked as cancelled (not done)SLURM provides ~30-60 seconds (
KillWaitsetting) between SIGTERM and SIGKILLUse for quick cleanup only (saving state, closing connections)
The
remaining_timeattribute contains the remaining time until SIGKILL (if known from the launcher)
Handling task events
Callbacks can be registered to accomplish some actions e.g. on task completion.
on_completed()registers a callback that is called when the task terminates successfully
Dynamic Task Outputs
For tasks that produce outputs during execution (e.g., checkpoints during training), you can use watch_output() to register callbacks that are triggered when outputs are produced. This is particularly useful for triggering evaluation jobs on intermediate checkpoints. The output methods use DependentMarker to mark dependencies.
Defining dynamic outputs
from experimaestro import ResumableTask, Config, Param, DependentMarker
class Checkpoint(Config):
step: Param[int]
model: Param[Model]
class Validation(Config):
model: Param[Model]
def checkpoint(self, dep: DependentMarker, *, step: int) -> Checkpoint:
"""Method that produces dynamic outputs"""
# dep ensures that the checkpoint depends on the learning task
# this is crucial when computing the identifier (i.e. different
# learning rates lead to different checkpoints)
return dep(Checkpoint.C(model=self.model, step=step))
def compute(self, step: int):
"""Called during task execution to register an output"""
self.register_task_output(self.checkpoint, step=step)
class Learn(ResumableTask):
model: Param[Model]
validation: Param[Validation]
learn: Param[float]
def execute(self):
for step in range(100):
train_step()
if step % 10 == 0:
self.validation.compute(step) # Triggers callbacks
Watching dynamic outputs
def on_checkpoint(checkpoint: Checkpoint):
# Called when a checkpoint is produced
Evaluate.C(checkpoint=checkpoint).submit()
learn = Learn.C(model=model, validation=validation)
learn.watch_output(validation.checkpoint, on_checkpoint)
learn.submit()
Key Features
ResumableTask only: Only
ResumableTaskcan useregister_task_output()(checked at runtime)Automatic replay: When a task is restarted, callbacks are replayed for previously produced outputs (events stored in
.experimaestro/task-outputs.jsonl)Multiple callbacks: Multiple callbacks can watch the same output method
Separate thread: Callbacks run in a dedicated worker thread
How It Works
During task execution,
register_task_output()writes events to a JSONL fileThe experiment monitors this file and triggers registered callbacks
When a task is resubmitted after a failure, existing events are replayed to new callbacks
This enables seamless resumption of training with checkpoint-based evaluation
Lightweights tasks using @cache
Sometimes, a configuration might need to compute some output that might be interesting to cache, but without relying on a fully-fledged task (because it can be done on the fly). In those cases, the cache() decorator can be used. Behind the curtain, a config cache is created (using the configuration unique identifier) and the path is locked (avoiding problems if the same configuration is used in two running tasks):
from pathlib import Path
import pickle
import numpy as np
from experimaestro import Config, cache
class Terms(Config):
@cache("terms.npy")
def load(self, path: Path):
if path.is_file():
return pickle.load(path)
# Value which can be long to compute
weights = self.compute_weights()
np.save(path, weights)
return terms
Task directory
When the task, submitted to the scheduler with .submit(...), is run,
[EXPERIMENT_FOLDER]is the main experiment folder[NAME]is task name (by default, the python class name in lowercase)[QUALIFIED_NAME]is the qualified task name (by default, the python qualified name in lower case)[JOB_ID]is the unique identifier
All the task output should be located in the directory
[EXPERIMENT_FOLDER]/jobs/[QUALIFIED_NAME]/[JOB_ID]
Inside this directory, we have
__xpm__, the folder that stores information about the process[NAME].pyand (for some launchers)[NAME].shthat contains the code that will be executed (through a cluster scheduler, e.g. slurm, or directly)[NAME].errand[NAME].outthat stores the standard output and error streams