Skip to content

services

services

Service layer for the web UI.

Services handle reading data from the filesystem and transforming it into the models used by the API.

StatusService(status_dir: Path)

Service for reading job status from the filesystem.

When a JobCache is attached (via set_cache), list operations are served from the in-memory cache instead of walking the filesystem on every request. Single-job lookups still hit the filesystem for maximum freshness.

set_cache(cache) -> None

Attach a JobCache instance to serve list queries from memory.

list_all_jobs(project: Optional[str] = None, group: Optional[str] = None, status: Optional[JobStatus] = None, limit: int = 100) -> list[JobMetadata]

List all jobs, optionally filtered by project/group/status.

Jobs are returned sorted by start_time (most recent first).

get_job(project: str, group: str, name: str, run_id: str) -> Optional[JobMetadata]

Get a specific job by its identifiers.

Always reads from filesystem for maximum freshness on detail pages. Falls back to cache if the filesystem read fails or file is missing.

get_job_runs(project: str, group: str, name: str) -> list[JobMetadata]

Get all runs for a specific job name.

list_projects() -> list[ProjectSummary]

List all projects with summary stats.

get_running_jobs() -> list[JobMetadata]

Get all currently running jobs, excluding stale ones (no heartbeat >5min).

get_recent_jobs(hours: int = 24, limit: int = 50) -> list[JobMetadata]

Get jobs started within the last N hours.

get_dashboard_stats() -> DashboardStats

Get aggregated stats for the dashboard.

delete_job(project: str, group: str, name: str, run_id: str) -> bool

Delete a specific job run. Returns True if successful.

CheckpointService(checkpoints_dir: Path)

Service for browsing checkpoints.

list_all_checkpoints(project: Optional[str] = None, group: Optional[str] = None, job_name: Optional[str] = None, limit: int = 100) -> list[CheckpointInfo]

List all checkpoints, optionally filtered.

Returns checkpoints sorted by creation time (most recent first).

get_checkpoint(project: str, group: str, job_name: str, suffix: str) -> Optional[CheckpointInfo]

Get a specific checkpoint.

get_latest_checkpoint(project: str, group: str, job_name: str) -> Optional[CheckpointInfo]

Get the latest checkpoint for a job.

list_job_checkpoints(project: str, group: str, job_name: str) -> list[CheckpointInfo]

List all checkpoints for a specific job.

get_checkpoint_config(project: str, group: str, job_name: str, suffix: str) -> Optional[dict[str, Any]]

Read the config from a checkpoint (tries config.yaml then config.json).

get_checkpoint_job_spec(project: str, group: str, job_name: str, suffix: str) -> Optional[dict[str, Any]]

Read the job.json from a checkpoint.

count_checkpoints() -> int

Count total number of checkpoints.

get_total_size() -> int

Get total size of all checkpoints in bytes.

format_size(size_bytes: int) -> str

Format bytes as human-readable string.

delete_job_checkpoints(project: str, group: str, job_name: str) -> bool

Delete all checkpoints for a job. Returns True if successful.

LogService(status_dir: Path)

Service for reading and streaming job logs.

read_log(project: str, group: str, name: str, run_id: str, offset: int = 0, limit: Optional[int] = None) -> tuple[str, int]

Read log file contents.

Parameters:

Name Type Description Default
project str

Project name

required
group str

Group name

required
name str

Job name

required
run_id str

Run ID

required
offset int

Byte offset to start reading from

0
limit Optional[int]

Maximum bytes to read (None for all)

None

Returns:

Type Description
tuple[str, int]

Tuple of (content, new_offset)

tail_log(project: str, group: str, name: str, run_id: str, lines: int | None = None) -> str

Read the last N lines of a log file, or entire file if lines=None.

Parameters:

Name Type Description Default
lines int | None

Number of lines to return from end. None = entire file.

None

stream_log(project: str, group: str, name: str, run_id: str, poll_interval: float = 1.0) -> AsyncGenerator[str, None] async

Stream log file contents as they're written.

Yields new content whenever the file is updated. Uses polling - TODO: consider inotify/fsevents for efficiency.

get_log_size(project: str, group: str, name: str, run_id: str) -> int

Get the size of a log file in bytes.

log_exists(project: str, group: str, name: str, run_id: str) -> bool

Check if a log file exists.

JobCache(status_dir: Path, mode: str = 'polling', poll_interval: float = 10.0)

In-memory cache of job metadata with background refresh.

start() -> None async

Start the cache: do initial full scan, then begin background refresh.

stop() -> None async

Stop background refresh.

list_all_jobs(project: Optional[str] = None, group: Optional[str] = None, status: Optional[JobStatus] = None, limit: int = 100) -> list[JobMetadata]

List all jobs from cache, with optional filtering.

get_job(project: str, group: str, name: str, run_id: str) -> Optional[JobMetadata]

Get a single job from cache.

get_job_runs(project: str, group: str, name: str) -> list[JobMetadata]

Get all runs for a specific job name from cache.

list_projects() -> list[ProjectSummary]

List all projects with summary stats, from cache.

get_running_jobs() -> list[JobMetadata]

Get all running, non-stale jobs from cache.

get_recent_jobs(hours: int = 24, limit: int = 50) -> list[JobMetadata]

Get jobs started within the last N hours, from cache.

get_dashboard_stats() -> DashboardStats

Get aggregated stats from cache.

delete_job(project: str, group: str, name: str, run_id: str) -> None

Remove a job from the cache (called after filesystem delete).

invalidate(project: str, group: str, name: str, run_id: str) -> None

Force re-read a specific job on next incremental scan.