dispatch
dispatch
¶
Remote dispatch utilities for SSH and SLURM.
RunResult(returncode: int, stdout: str, stderr: str)
dataclass
¶
Result of a remote command execution
TunnelResult(returncode: int, pid: int | None, command: list[str], stderr: str = '')
dataclass
¶
Result of starting an SSH local port forward.
SlurmJob(name: str, command: str, partition: str | None = None, nodes: int = 1, ntasks: int = 1, ntasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus: int | None = None, gpus_per_node: int | None = None, gpu_type: str | None = None, mem: str | None = None, mem_per_cpu: str | None = None, time: str | None = None, output: str | None = None, error: str | None = None, workdir: str | None = None, root_dir: str | None = None, env: dict[str, str] = dict(), modules: list[str] = list(), uv_groups: list[str] = list(), dependency: str | None = None, exclusive: bool = False, constraint: str | None = None, account: str | None = None, qos: str | None = None, exclude: list[str] = list(), extra_directives: list[str] = list(), setup_commands: list[str] = list(), payload: str | None = None, payload_extract_to: str = '$SLURM_TMPDIR/code', juicefs_mount: JuiceFSMount | None = None, is_slurm: bool = True, bootstrap_py: str | None = None)
dataclass
¶
Configuration for an sbatch script.
pack(tarball: bytes) -> 'SlurmJob'
¶
Return a new SlurmJob with the given tarball as payload.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tarball
|
bytes
|
gzip-compressed tarball bytes (from sync.snapshot()) |
required |
Returns:
| Type | Description |
|---|---|
'SlurmJob'
|
New SlurmJob with payload set |
to_bootstrap_script() -> str
¶
Generate the bootstrap.sh script that runs on each node.
to_sbatch_script(bootstrap_script_path: str) -> str
¶
Generate the sbatch wrapper script that calls srun on bootstrap.sh.
to_script() -> str
¶
Generate script for backward compatibility.
For SLURM: returns bootstrap script (sbatch wrapper generated separately) For SSH: returns bootstrap script
SlurmResult(job_id: int | None, ssh_result: RunResult)
dataclass
¶
Result of a SLURM job submission.
JobStatus(job_id: int, partition: str, name: str, user: str, state: str, time_elapsed: str, nodes: int, nodelist: str)
dataclass
¶
Status of a SLURM job from squeue.
JobInfo(job_id: str, name: str, partition: str, state: str, exit_code: str, elapsed: str, max_rss: str, nodelist: str)
dataclass
¶
Detailed info about a SLURM job from sacct.
QueueResult(jobs: list[JobStatus], ssh_result: RunResult)
dataclass
¶
Result of a queue query.
StatusResult(job: JobStatus | None, ssh_result: RunResult)
dataclass
¶
Result of a job status query.
JobInfoResult(steps: list[JobInfo], ssh_result: RunResult)
dataclass
¶
Result of a job info query.
main: JobInfo | None
property
¶
Get the main job entry (without step suffix).
NodeGres(name: str, type: str | None, configured: int, allocated: int)
dataclass
¶
GRES (generic resource) info for a node.
NodeInfo(name: str, state: str, cpus_total: int, cpus_allocated: int, memory_total: int, memory_allocated: int, gres: list[NodeGres], partitions: list[str], features: list[str])
dataclass
¶
Detailed info about a SLURM node.
get_gres(name: str) -> NodeGres | None
¶
Get GRES by name (e.g., 'gpu').
PartitionInfo(name: str, state: str, nodes: list[str], total_cpus: int, total_nodes: int)
dataclass
¶
Info about a SLURM partition.
ClusterConfig(root: str, work: str, log: str | None = None, share: str | None = None, mount: str | None = None, cache_size: str | None = None, cache_dir: str | None = None)
dataclass
¶
Configuration for a compute cluster's paths.
PartitionConfig(name: str, default: bool = False, constraint: str | None = None)
dataclass
¶
Configuration for a SLURM partition.
PlainHostConfig(ssh: str, cluster: str, type: Literal['plain'] = 'plain', chips: dict[str, int] = dict(), uv_groups: list[str] = list())
dataclass
¶
Configuration for a plain SSH host (no scheduler).
SlurmHostConfig(ssh: str, cluster: str, type: Literal['slurm'] = 'slurm', partitions: list[PartitionConfig] = list(), account: str | None = None, qos: str | None = None, mem: str | None = None, exclude: list[str] = list(), uv_groups: list[str] = list(), chips: dict[str, int] | None = None, cpu_partitions: list[str] = list(), annotations: dict[str, str] = dict())
dataclass
¶
Configuration for a SLURM cluster login node.
DispatchConfig(mount: str | None = None, proxy: str | None = None, clusters: dict[str, ClusterConfig] = dict(), hosts: dict[str, PlainHostConfig | SlurmHostConfig] = dict(), priority: list[str] = list(), gres_mapping: dict[str, str] = dict())
dataclass
¶
Top-level dispatch configuration.
RemoteInventory(config: DispatchConfig)
¶
Resolves dispatch config into usable objects.
get_cluster(name: str) -> Cluster
¶
Get or create a Cluster object by name.
get_host(name: str) -> PlainHostConfig | SlurmHostConfig
¶
Get host configuration by name.
get_chip(name: str) -> Chip
¶
Get a Chip by name from SUPPORTED_CHIPS.
plain_hosts() -> list[str]
¶
List all plain SSH hosts.
slurm_hosts() -> list[str]
¶
List all SLURM hosts.
hosts_with_chip(chip: str | Chip) -> list[str]
¶
Find plain hosts that have a specific chip type.
total_chips(chip: str | Chip) -> int
¶
Get total count of a chip type across all plain hosts.
default_partition(host: str) -> PartitionConfig | None
¶
Get the default partition for a SLURM host.
SolveResult(result: HardwareResult | None, host_name: str | None, host_config: PlainHostConfig | SlurmHostConfig | None, is_slurm: bool, partition: str | None)
dataclass
¶
Result of solving for hardware allocation.
ReplResult(ok: bool, is_slurm: bool, selected_host: str, ssh_host: str, log_path: str, job_id: int | None = None, allocated_hostname: str | None = None, remote_pid: int | None = None, remote_port: int | None = None, token: str | None = None, remote_url: str | None = None, local_port: int | None = None, local_url: str | None = None, tunnel_pid: int | None = None, cluster_name: str | None = None, cluster_root: str | None = None, cluster_mount: str | None = None, work_dir: str | None = None, mailbox_job_id: str | None = None, stderr: str = '')
dataclass
¶
Result of launching an interactive Jupyter REPL session.
hosts() -> list[str]
¶
Parse ~/.ssh/config to list accessible hosts.
Returns a list of host names/aliases defined in the SSH config. Excludes wildcard patterns and special entries.
run(cmd: str | list[str], host: str, timeout: float | None = None) -> RunResult
¶
Execute a command remotely on a host via SSH.
Runs in a login shell to ensure environment variables are loaded. Retries on transient connection errors with exponential backoff.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cmd
|
str | list[str]
|
Command to execute (string or list of args) |
required |
host
|
str
|
SSH host (name, alias, or user@host) |
required |
timeout
|
float | None
|
Optional timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult with returncode, stdout, and stderr |
run_many(cmd: str | list[str], hosts: list[str], timeout: float | None = None) -> dict[str, RunResult]
¶
Execute a command on multiple hosts in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cmd
|
str | list[str]
|
Command to execute |
required |
hosts
|
list[str]
|
List of SSH hosts |
required |
timeout
|
float | None
|
Optional timeout in seconds per host |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, RunResult]
|
Dict mapping host -> RunResult |
copy_to(local_path: str | Path, host: str, remote_path: str, timeout: float | None = None) -> RunResult
¶
Copy a local file/directory to a remote host via scp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
local_path
|
str | Path
|
Local file or directory path |
required |
host
|
str
|
SSH host |
required |
remote_path
|
str
|
Destination path on remote host |
required |
timeout
|
float | None
|
Optional timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult with returncode, stdout, and stderr |
copy_from(host: str, remote_path: str, local_path: str | Path, timeout: float | None = None) -> RunResult
¶
Copy a remote file/directory to local via scp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host |
required |
remote_path
|
str
|
Source path on remote host |
required |
local_path
|
str | Path
|
Local destination path |
required |
timeout
|
float | None
|
Optional timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult with returncode, stdout, and stderr |
is_reachable(host: str, timeout: float = 5.0) -> bool
¶
Check if a host is reachable via SSH.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host to check |
required |
timeout
|
float
|
Connection timeout in seconds |
5.0
|
Returns:
| Type | Description |
|---|---|
bool
|
True if host is reachable |
forward_port(host: str, local_port: int, remote_port: int) -> TunnelResult
¶
Start a background SSH tunnel forwarding local_port -> remote localhost:remote_port.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host (name, alias, or user@host) |
required |
local_port
|
int
|
Local port to bind |
required |
remote_port
|
int
|
Remote port on localhost to forward to |
required |
Returns:
| Type | Description |
|---|---|
TunnelResult
|
TunnelResult with PID on success |
submit(job: SlurmJob, host: str, share_dir: str | None = None, script_path: str | None = None, timeout: float | None = None) -> SlurmResult
¶
Submit a SLURM job to a remote host via SSH.
Creates two scripts on remote: - bootstrap.sh: runs on each node via srun (setup + command) - sbatch wrapper: contains SBATCH directives, calls srun bootstrap.sh
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
SlurmJob
|
SlurmJob configuration |
required |
host
|
str
|
SSH host with SLURM access |
required |
share_dir
|
str | None
|
Shared directory visible to all nodes (required for multi-node jobs) |
None
|
script_path
|
str | None
|
Optional remote path prefix for scripts |
None
|
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
SlurmResult
|
SlurmResult with job_id and SSH result |
submit_packed(job: SlurmJob, host: str, repo_path: str | None = None, share_dir: str | None = None, dirty: bool = False, script_path: str | None = None, timeout: float | None = None) -> SlurmResult
¶
Submit a SLURM job with code packed into the script.
The code tarball is embedded in the sbatch script and extracted at runtime on the compute node (to $SLURM_TMPDIR/code by default).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
SlurmJob
|
SlurmJob configuration |
required |
host
|
str
|
SSH host with SLURM access |
required |
repo_path
|
str | None
|
Local git repo to pack (default: cwd) |
None
|
share_dir
|
str | None
|
Shared directory visible to all nodes for scripts |
None
|
dirty
|
bool
|
Include uncommitted changes (default: False) |
False
|
script_path
|
str | None
|
Optional remote path for script |
None
|
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
SlurmResult
|
SlurmResult with job_id and SSH result |
status(job_id: int, host: str, timeout: float | None = None) -> StatusResult
¶
Check the status of a SLURM job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
int
|
SLURM job ID |
required |
host
|
str
|
SSH host with SLURM access |
required |
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
StatusResult
|
StatusResult with parsed JobStatus |
cancel(job_id: int, host: str, timeout: float | None = None) -> RunResult
¶
Cancel a SLURM job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
int
|
SLURM job ID to cancel |
required |
host
|
str
|
SSH host with SLURM access |
required |
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult from scancel |
job_info(job_id: int, host: str, timeout: float | None = None) -> JobInfoResult
¶
Get detailed info about a SLURM job (including completed jobs).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
int
|
SLURM job ID |
required |
host
|
str
|
SSH host with SLURM access |
required |
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
JobInfoResult
|
JobInfoResult with parsed job steps |
queue(host: str, user: str | None = None, timeout: float | None = None) -> QueueResult
¶
List jobs in the SLURM queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host with SLURM access |
required |
user
|
str | None
|
Filter by user (default: all users) |
None
|
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
QueueResult
|
QueueResult with list of parsed JobStatus |
wait(job_id: int, host: str, poll_interval: float = 10.0, timeout: float | None = None) -> JobInfoResult
¶
Wait for a SLURM job to complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
int
|
SLURM job ID |
required |
host
|
str
|
SSH host with SLURM access |
required |
poll_interval
|
float
|
Seconds between status checks |
10.0
|
timeout
|
float | None
|
Total timeout in seconds (None = wait forever) |
None
|
Returns:
| Type | Description |
|---|---|
JobInfoResult
|
JobInfoResult with final job state |
partitions(host: str, timeout: float | None = None) -> list[PartitionInfo]
¶
List all SLURM partitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host with SLURM access |
required |
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
list[PartitionInfo]
|
List of PartitionInfo |
partition_nodes(partition: str, host: str, timeout: float | None = None) -> list[str]
¶
List all nodes in a SLURM partition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition
|
str
|
Partition name |
required |
host
|
str
|
SSH host with SLURM access |
required |
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of node names |
node_info(nodename: str, host: str, timeout: float | None = None) -> NodeInfo | None
¶
Get detailed info about a SLURM node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodename
|
str
|
Name of the node |
required |
host
|
str
|
SSH host with SLURM access |
required |
timeout
|
float | None
|
SSH timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
NodeInfo | None
|
NodeInfo or None if node not found |
nodes_info(nodenames: list[str], host: str, timeout: float | None = None) -> dict[str, NodeInfo]
¶
Get info about multiple nodes in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodenames
|
list[str]
|
List of node names |
required |
host
|
str
|
SSH host with SLURM access |
required |
timeout
|
float | None
|
SSH timeout per node |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, NodeInfo]
|
Dict mapping nodename -> NodeInfo (excludes failed lookups) |
available_gpus(partition: str, host: str, gpu_type: str | None = None, timeout: float | None = None) -> list[tuple[str, int]]
¶
Find nodes with available GPUs in a partition.
Uses a single sinfo command to get all node GPU info efficiently, avoiding rate limits from multiple SSH connections.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition
|
str
|
Partition name |
required |
host
|
str
|
SSH host with SLURM access |
required |
gpu_type
|
str | None
|
Optional GPU type filter (e.g., "a100") |
None
|
timeout
|
float | None
|
SSH timeout |
None
|
Returns:
| Type | Description |
|---|---|
list[tuple[str, int]]
|
List of (nodename, available_gpu_count) tuples, sorted by availability descending |
first_node_from_nodelist(nodelist: str) -> str | None
¶
Get first hostname from a SLURM nodelist expression.
wait_until_running(job_id: int, host: str, poll_interval: float = 10.0, timeout: float | None = None) -> tuple[str | None, StatusResult]
¶
Wait until a job is running and return the first allocated hostname.
load_dispatch_config(path: str | Path) -> DispatchConfig
¶
Load dispatch configuration from a YAML file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to YAML config file |
required |
Returns:
| Type | Description |
|---|---|
DispatchConfig
|
Parsed DispatchConfig |
parse_dispatch_config(cfg: DictConfig) -> DispatchConfig
¶
Parse dispatch configuration from OmegaConf.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cfg
|
DictConfig
|
OmegaConf config with 'clusters' and 'hosts' keys |
required |
Returns:
| Type | Description |
|---|---|
DispatchConfig
|
Parsed DispatchConfig |
discover_plain_host(ssh_alias: str, timeout: float = 30.0) -> dict[str, int]
¶
Discover chips on a plain SSH host by querying nvidia-smi.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ssh_alias
|
str
|
SSH config alias |
required |
timeout
|
float
|
SSH timeout |
30.0
|
Returns:
| Type | Description |
|---|---|
dict[str, int]
|
Dict mapping chip name -> count |
discover_slurm_partitions(ssh_alias: str, timeout: float = 30.0) -> list[PartitionConfig]
¶
Discover SLURM partitions on a host.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ssh_alias
|
str
|
SSH config alias |
required |
timeout
|
float
|
SSH timeout |
30.0
|
Returns:
| Type | Description |
|---|---|
list[PartitionConfig]
|
List of discovered PartitionConfig |
snapshot(repo_path: str | Path | None = None, ref: str = 'HEAD') -> bytes
¶
Create a tarball snapshot of the repo using git archive.
Only includes tracked files, respects .gitignore, excludes .git/.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repo_path
|
str | Path | None
|
Path to git repo (default: current working directory) |
None
|
ref
|
str
|
Git ref to archive (default: HEAD, can be branch/tag/commit) |
'HEAD'
|
Returns:
| Type | Description |
|---|---|
bytes
|
Tarball bytes (gzip compressed) |
ship(host: str, remote_path: str, repo_path: str | Path | None = None, ref: str = 'HEAD', timeout: float | None = None) -> RunResult
¶
Ship a snapshot of the repo to a remote host.
Creates a tarball of tracked files and extracts on remote.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host |
required |
remote_path
|
str
|
Destination directory on remote (will be created) |
required |
repo_path
|
str | Path | None
|
Local git repo path (default: cwd) |
None
|
ref
|
str
|
Git ref to archive (default: HEAD) |
'HEAD'
|
timeout
|
float | None
|
SSH timeout |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult from extraction |
ship_dirty(host: str, remote_path: str, repo_path: str | Path | None = None, timeout: float | None = None) -> RunResult
¶
Ship repo including uncommitted changes.
Uses git stash to capture working directory state, then archives. Useful for development/testing when you want to ship uncommitted code.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host |
required |
remote_path
|
str
|
Destination directory on remote |
required |
repo_path
|
str | Path | None
|
Local git repo path (default: cwd) |
None
|
timeout
|
float | None
|
SSH timeout |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult from extraction |
ship_files(host: str, remote_path: str, files: list[str | Path], base_path: str | Path | None = None, timeout: float | None = None) -> RunResult
¶
Ship specific files to a remote host.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
SSH host |
required |
remote_path
|
str
|
Destination directory on remote |
required |
files
|
list[str | Path]
|
List of file paths to include |
required |
base_path
|
str | Path | None
|
Base path for relative file paths (default: cwd) |
None
|
timeout
|
float | None
|
SSH timeout |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult from extraction |
solve_or_raise(request: HardwareRequest, config: DispatchConfig, check_availability: bool = False, timeout: float = 30.0) -> SolveResult
¶
Like solve(), but raises if no solution found.
dispatch_repl(spec: JobSpec, hardware: HardwareRequest, dispatch_config: DispatchConfig, local_port: int, dirty: bool = False, check_availability: bool = True, mem: str | None = None, timeout: float = 60.0, startup_timeout: float = 180.0, slurm_wait_timeout: float | None = None, sync_enabled: bool = False, extra_uv_groups: list[str] | None = None) -> ReplResult
¶
Dispatch an interactive Jupyter session on selected infrastructure.