Skip to content

dispatch

dispatch

Remote dispatch utilities for SSH and SLURM.

RunResult(returncode: int, stdout: str, stderr: str) dataclass

Result of a remote command execution

TunnelResult(returncode: int, pid: int | None, command: list[str], stderr: str = '') dataclass

Result of starting an SSH local port forward.

SlurmJob(name: str, command: str, partition: str | None = None, nodes: int = 1, ntasks: int = 1, ntasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus: int | None = None, gpus_per_node: int | None = None, gpu_type: str | None = None, mem: str | None = None, mem_per_cpu: str | None = None, time: str | None = None, output: str | None = None, error: str | None = None, workdir: str | None = None, root_dir: str | None = None, env: dict[str, str] = dict(), modules: list[str] = list(), uv_groups: list[str] = list(), dependency: str | None = None, exclusive: bool = False, constraint: str | None = None, account: str | None = None, qos: str | None = None, exclude: list[str] = list(), extra_directives: list[str] = list(), setup_commands: list[str] = list(), payload: str | None = None, payload_extract_to: str = '$SLURM_TMPDIR/code', juicefs_mount: JuiceFSMount | None = None, is_slurm: bool = True, bootstrap_py: str | None = None) dataclass

Configuration for an sbatch script.

pack(tarball: bytes) -> 'SlurmJob'

Return a new SlurmJob with the given tarball as payload.

Parameters:

Name Type Description Default
tarball bytes

gzip-compressed tarball bytes (from sync.snapshot())

required

Returns:

Type Description
'SlurmJob'

New SlurmJob with payload set

to_bootstrap_script() -> str

Generate the bootstrap.sh script that runs on each node.

to_sbatch_script(bootstrap_script_path: str) -> str

Generate the sbatch wrapper script that calls srun on bootstrap.sh.

to_script() -> str

Generate script for backward compatibility.

For SLURM: returns bootstrap script (sbatch wrapper generated separately) For SSH: returns bootstrap script

SlurmResult(job_id: int | None, ssh_result: RunResult) dataclass

Result of a SLURM job submission.

JobStatus(job_id: int, partition: str, name: str, user: str, state: str, time_elapsed: str, nodes: int, nodelist: str) dataclass

Status of a SLURM job from squeue.

JobInfo(job_id: str, name: str, partition: str, state: str, exit_code: str, elapsed: str, max_rss: str, nodelist: str) dataclass

Detailed info about a SLURM job from sacct.

QueueResult(jobs: list[JobStatus], ssh_result: RunResult) dataclass

Result of a queue query.

StatusResult(job: JobStatus | None, ssh_result: RunResult) dataclass

Result of a job status query.

JobInfoResult(steps: list[JobInfo], ssh_result: RunResult) dataclass

Result of a job info query.

main: JobInfo | None property

Get the main job entry (without step suffix).

NodeGres(name: str, type: str | None, configured: int, allocated: int) dataclass

GRES (generic resource) info for a node.

NodeInfo(name: str, state: str, cpus_total: int, cpus_allocated: int, memory_total: int, memory_allocated: int, gres: list[NodeGres], partitions: list[str], features: list[str]) dataclass

Detailed info about a SLURM node.

get_gres(name: str) -> NodeGres | None

Get GRES by name (e.g., 'gpu').

PartitionInfo(name: str, state: str, nodes: list[str], total_cpus: int, total_nodes: int) dataclass

Info about a SLURM partition.

ClusterConfig(root: str, work: str, log: str | None = None, share: str | None = None, mount: str | None = None, cache_size: str | None = None, cache_dir: str | None = None) dataclass

Configuration for a compute cluster's paths.

PartitionConfig(name: str, default: bool = False, constraint: str | None = None) dataclass

Configuration for a SLURM partition.

PlainHostConfig(ssh: str, cluster: str, type: Literal['plain'] = 'plain', chips: dict[str, int] = dict(), uv_groups: list[str] = list()) dataclass

Configuration for a plain SSH host (no scheduler).

SlurmHostConfig(ssh: str, cluster: str, type: Literal['slurm'] = 'slurm', partitions: list[PartitionConfig] = list(), account: str | None = None, qos: str | None = None, mem: str | None = None, exclude: list[str] = list(), uv_groups: list[str] = list(), chips: dict[str, int] | None = None, cpu_partitions: list[str] = list(), annotations: dict[str, str] = dict()) dataclass

Configuration for a SLURM cluster login node.

DispatchConfig(mount: str | None = None, proxy: str | None = None, clusters: dict[str, ClusterConfig] = dict(), hosts: dict[str, PlainHostConfig | SlurmHostConfig] = dict(), priority: list[str] = list(), gres_mapping: dict[str, str] = dict()) dataclass

Top-level dispatch configuration.

RemoteInventory(config: DispatchConfig)

Resolves dispatch config into usable objects.

get_cluster(name: str) -> Cluster

Get or create a Cluster object by name.

get_host(name: str) -> PlainHostConfig | SlurmHostConfig

Get host configuration by name.

get_chip(name: str) -> Chip

Get a Chip by name from SUPPORTED_CHIPS.

plain_hosts() -> list[str]

List all plain SSH hosts.

slurm_hosts() -> list[str]

List all SLURM hosts.

hosts_with_chip(chip: str | Chip) -> list[str]

Find plain hosts that have a specific chip type.

total_chips(chip: str | Chip) -> int

Get total count of a chip type across all plain hosts.

default_partition(host: str) -> PartitionConfig | None

Get the default partition for a SLURM host.

SolveResult(result: HardwareResult | None, host_name: str | None, host_config: PlainHostConfig | SlurmHostConfig | None, is_slurm: bool, partition: str | None) dataclass

Result of solving for hardware allocation.

ReplResult(ok: bool, is_slurm: bool, selected_host: str, ssh_host: str, log_path: str, job_id: int | None = None, allocated_hostname: str | None = None, remote_pid: int | None = None, remote_port: int | None = None, token: str | None = None, remote_url: str | None = None, local_port: int | None = None, local_url: str | None = None, tunnel_pid: int | None = None, cluster_name: str | None = None, cluster_root: str | None = None, cluster_mount: str | None = None, work_dir: str | None = None, mailbox_job_id: str | None = None, stderr: str = '') dataclass

Result of launching an interactive Jupyter REPL session.

hosts() -> list[str]

Parse ~/.ssh/config to list accessible hosts.

Returns a list of host names/aliases defined in the SSH config. Excludes wildcard patterns and special entries.

run(cmd: str | list[str], host: str, timeout: float | None = None) -> RunResult

Execute a command remotely on a host via SSH.

Runs in a login shell to ensure environment variables are loaded. Retries on transient connection errors with exponential backoff.

Parameters:

Name Type Description Default
cmd str | list[str]

Command to execute (string or list of args)

required
host str

SSH host (name, alias, or user@host)

required
timeout float | None

Optional timeout in seconds

None

Returns:

Type Description
RunResult

RunResult with returncode, stdout, and stderr

run_many(cmd: str | list[str], hosts: list[str], timeout: float | None = None) -> dict[str, RunResult]

Execute a command on multiple hosts in parallel.

Parameters:

Name Type Description Default
cmd str | list[str]

Command to execute

required
hosts list[str]

List of SSH hosts

required
timeout float | None

Optional timeout in seconds per host

None

Returns:

Type Description
dict[str, RunResult]

Dict mapping host -> RunResult

copy_to(local_path: str | Path, host: str, remote_path: str, timeout: float | None = None) -> RunResult

Copy a local file/directory to a remote host via scp.

Parameters:

Name Type Description Default
local_path str | Path

Local file or directory path

required
host str

SSH host

required
remote_path str

Destination path on remote host

required
timeout float | None

Optional timeout in seconds

None

Returns:

Type Description
RunResult

RunResult with returncode, stdout, and stderr

copy_from(host: str, remote_path: str, local_path: str | Path, timeout: float | None = None) -> RunResult

Copy a remote file/directory to local via scp.

Parameters:

Name Type Description Default
host str

SSH host

required
remote_path str

Source path on remote host

required
local_path str | Path

Local destination path

required
timeout float | None

Optional timeout in seconds

None

Returns:

Type Description
RunResult

RunResult with returncode, stdout, and stderr

is_reachable(host: str, timeout: float = 5.0) -> bool

Check if a host is reachable via SSH.

Parameters:

Name Type Description Default
host str

SSH host to check

required
timeout float

Connection timeout in seconds

5.0

Returns:

Type Description
bool

True if host is reachable

forward_port(host: str, local_port: int, remote_port: int) -> TunnelResult

Start a background SSH tunnel forwarding local_port -> remote localhost:remote_port.

Parameters:

Name Type Description Default
host str

SSH host (name, alias, or user@host)

required
local_port int

Local port to bind

required
remote_port int

Remote port on localhost to forward to

required

Returns:

Type Description
TunnelResult

TunnelResult with PID on success

submit(job: SlurmJob, host: str, share_dir: str | None = None, script_path: str | None = None, timeout: float | None = None) -> SlurmResult

Submit a SLURM job to a remote host via SSH.

Creates two scripts on remote: - bootstrap.sh: runs on each node via srun (setup + command) - sbatch wrapper: contains SBATCH directives, calls srun bootstrap.sh

Parameters:

Name Type Description Default
job SlurmJob

SlurmJob configuration

required
host str

SSH host with SLURM access

required
share_dir str | None

Shared directory visible to all nodes (required for multi-node jobs)

None
script_path str | None

Optional remote path prefix for scripts

None
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
SlurmResult

SlurmResult with job_id and SSH result

submit_packed(job: SlurmJob, host: str, repo_path: str | None = None, share_dir: str | None = None, dirty: bool = False, script_path: str | None = None, timeout: float | None = None) -> SlurmResult

Submit a SLURM job with code packed into the script.

The code tarball is embedded in the sbatch script and extracted at runtime on the compute node (to $SLURM_TMPDIR/code by default).

Parameters:

Name Type Description Default
job SlurmJob

SlurmJob configuration

required
host str

SSH host with SLURM access

required
repo_path str | None

Local git repo to pack (default: cwd)

None
share_dir str | None

Shared directory visible to all nodes for scripts

None
dirty bool

Include uncommitted changes (default: False)

False
script_path str | None

Optional remote path for script

None
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
SlurmResult

SlurmResult with job_id and SSH result

status(job_id: int, host: str, timeout: float | None = None) -> StatusResult

Check the status of a SLURM job.

Parameters:

Name Type Description Default
job_id int

SLURM job ID

required
host str

SSH host with SLURM access

required
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
StatusResult

StatusResult with parsed JobStatus

cancel(job_id: int, host: str, timeout: float | None = None) -> RunResult

Cancel a SLURM job.

Parameters:

Name Type Description Default
job_id int

SLURM job ID to cancel

required
host str

SSH host with SLURM access

required
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
RunResult

RunResult from scancel

job_info(job_id: int, host: str, timeout: float | None = None) -> JobInfoResult

Get detailed info about a SLURM job (including completed jobs).

Parameters:

Name Type Description Default
job_id int

SLURM job ID

required
host str

SSH host with SLURM access

required
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
JobInfoResult

JobInfoResult with parsed job steps

queue(host: str, user: str | None = None, timeout: float | None = None) -> QueueResult

List jobs in the SLURM queue.

Parameters:

Name Type Description Default
host str

SSH host with SLURM access

required
user str | None

Filter by user (default: all users)

None
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
QueueResult

QueueResult with list of parsed JobStatus

wait(job_id: int, host: str, poll_interval: float = 10.0, timeout: float | None = None) -> JobInfoResult

Wait for a SLURM job to complete.

Parameters:

Name Type Description Default
job_id int

SLURM job ID

required
host str

SSH host with SLURM access

required
poll_interval float

Seconds between status checks

10.0
timeout float | None

Total timeout in seconds (None = wait forever)

None

Returns:

Type Description
JobInfoResult

JobInfoResult with final job state

partitions(host: str, timeout: float | None = None) -> list[PartitionInfo]

List all SLURM partitions.

Parameters:

Name Type Description Default
host str

SSH host with SLURM access

required
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
list[PartitionInfo]

List of PartitionInfo

partition_nodes(partition: str, host: str, timeout: float | None = None) -> list[str]

List all nodes in a SLURM partition.

Parameters:

Name Type Description Default
partition str

Partition name

required
host str

SSH host with SLURM access

required
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
list[str]

List of node names

node_info(nodename: str, host: str, timeout: float | None = None) -> NodeInfo | None

Get detailed info about a SLURM node.

Parameters:

Name Type Description Default
nodename str

Name of the node

required
host str

SSH host with SLURM access

required
timeout float | None

SSH timeout in seconds

None

Returns:

Type Description
NodeInfo | None

NodeInfo or None if node not found

nodes_info(nodenames: list[str], host: str, timeout: float | None = None) -> dict[str, NodeInfo]

Get info about multiple nodes in parallel.

Parameters:

Name Type Description Default
nodenames list[str]

List of node names

required
host str

SSH host with SLURM access

required
timeout float | None

SSH timeout per node

None

Returns:

Type Description
dict[str, NodeInfo]

Dict mapping nodename -> NodeInfo (excludes failed lookups)

available_gpus(partition: str, host: str, gpu_type: str | None = None, timeout: float | None = None) -> list[tuple[str, int]]

Find nodes with available GPUs in a partition.

Uses a single sinfo command to get all node GPU info efficiently, avoiding rate limits from multiple SSH connections.

Parameters:

Name Type Description Default
partition str

Partition name

required
host str

SSH host with SLURM access

required
gpu_type str | None

Optional GPU type filter (e.g., "a100")

None
timeout float | None

SSH timeout

None

Returns:

Type Description
list[tuple[str, int]]

List of (nodename, available_gpu_count) tuples, sorted by availability descending

first_node_from_nodelist(nodelist: str) -> str | None

Get first hostname from a SLURM nodelist expression.

wait_until_running(job_id: int, host: str, poll_interval: float = 10.0, timeout: float | None = None) -> tuple[str | None, StatusResult]

Wait until a job is running and return the first allocated hostname.

load_dispatch_config(path: str | Path) -> DispatchConfig

Load dispatch configuration from a YAML file.

Parameters:

Name Type Description Default
path str | Path

Path to YAML config file

required

Returns:

Type Description
DispatchConfig

Parsed DispatchConfig

parse_dispatch_config(cfg: DictConfig) -> DispatchConfig

Parse dispatch configuration from OmegaConf.

Parameters:

Name Type Description Default
cfg DictConfig

OmegaConf config with 'clusters' and 'hosts' keys

required

Returns:

Type Description
DispatchConfig

Parsed DispatchConfig

discover_plain_host(ssh_alias: str, timeout: float = 30.0) -> dict[str, int]

Discover chips on a plain SSH host by querying nvidia-smi.

Parameters:

Name Type Description Default
ssh_alias str

SSH config alias

required
timeout float

SSH timeout

30.0

Returns:

Type Description
dict[str, int]

Dict mapping chip name -> count

discover_slurm_partitions(ssh_alias: str, timeout: float = 30.0) -> list[PartitionConfig]

Discover SLURM partitions on a host.

Parameters:

Name Type Description Default
ssh_alias str

SSH config alias

required
timeout float

SSH timeout

30.0

Returns:

Type Description
list[PartitionConfig]

List of discovered PartitionConfig

snapshot(repo_path: str | Path | None = None, ref: str = 'HEAD') -> bytes

Create a tarball snapshot of the repo using git archive.

Only includes tracked files, respects .gitignore, excludes .git/.

Parameters:

Name Type Description Default
repo_path str | Path | None

Path to git repo (default: current working directory)

None
ref str

Git ref to archive (default: HEAD, can be branch/tag/commit)

'HEAD'

Returns:

Type Description
bytes

Tarball bytes (gzip compressed)

ship(host: str, remote_path: str, repo_path: str | Path | None = None, ref: str = 'HEAD', timeout: float | None = None) -> RunResult

Ship a snapshot of the repo to a remote host.

Creates a tarball of tracked files and extracts on remote.

Parameters:

Name Type Description Default
host str

SSH host

required
remote_path str

Destination directory on remote (will be created)

required
repo_path str | Path | None

Local git repo path (default: cwd)

None
ref str

Git ref to archive (default: HEAD)

'HEAD'
timeout float | None

SSH timeout

None

Returns:

Type Description
RunResult

RunResult from extraction

ship_dirty(host: str, remote_path: str, repo_path: str | Path | None = None, timeout: float | None = None) -> RunResult

Ship repo including uncommitted changes.

Uses git stash to capture working directory state, then archives. Useful for development/testing when you want to ship uncommitted code.

Parameters:

Name Type Description Default
host str

SSH host

required
remote_path str

Destination directory on remote

required
repo_path str | Path | None

Local git repo path (default: cwd)

None
timeout float | None

SSH timeout

None

Returns:

Type Description
RunResult

RunResult from extraction

ship_files(host: str, remote_path: str, files: list[str | Path], base_path: str | Path | None = None, timeout: float | None = None) -> RunResult

Ship specific files to a remote host.

Parameters:

Name Type Description Default
host str

SSH host

required
remote_path str

Destination directory on remote

required
files list[str | Path]

List of file paths to include

required
base_path str | Path | None

Base path for relative file paths (default: cwd)

None
timeout float | None

SSH timeout

None

Returns:

Type Description
RunResult

RunResult from extraction

solve_or_raise(request: HardwareRequest, config: DispatchConfig, check_availability: bool = False, timeout: float = 30.0) -> SolveResult

Like solve(), but raises if no solution found.

dispatch_repl(spec: JobSpec, hardware: HardwareRequest, dispatch_config: DispatchConfig, local_port: int, dirty: bool = False, check_availability: bool = True, mem: str | None = None, timeout: float = 60.0, startup_timeout: float = 180.0, slurm_wait_timeout: float | None = None, sync_enabled: bool = False, extra_uv_groups: list[str] | None = None) -> ReplResult

Dispatch an interactive Jupyter session on selected infrastructure.