Skip to content

cache

cache

In-memory job metadata cache with background refresh.

Avoids hammering the filesystem (especially on network filesystems like JuiceFS) by caching all job metadata in memory and refreshing via background polling.

Supports two refresh modes
  • "polling" (default): Background asyncio task periodically stats files and re-reads only those whose mtime changed. Safe for JuiceFS and any filesystem.
  • "watchdog": Uses the watchdog library's native OS file watcher for instant change detection. Falls back to polling on platforms/filesystems that lack inotify support. Best for local development.
Usage

cache = JobCache(status_dir, mode="polling", poll_interval=10.0) await cache.start() # initial scan + begin background refresh jobs = cache.list_all_jobs(project="foo") ... await cache.stop()

JobCache(status_dir: Path, mode: str = 'polling', poll_interval: float = 10.0)

In-memory cache of job metadata with background refresh.

start() -> None async

Start the cache: do initial full scan, then begin background refresh.

stop() -> None async

Stop background refresh.

list_all_jobs(project: Optional[str] = None, group: Optional[str] = None, status: Optional[JobStatus] = None, limit: int = 100) -> list[JobMetadata]

List all jobs from cache, with optional filtering.

get_job(project: str, group: str, name: str, run_id: str) -> Optional[JobMetadata]

Get a single job from cache.

get_job_runs(project: str, group: str, name: str) -> list[JobMetadata]

Get all runs for a specific job name from cache.

list_projects() -> list[ProjectSummary]

List all projects with summary stats, from cache.

get_running_jobs() -> list[JobMetadata]

Get all running, non-stale jobs from cache.

get_recent_jobs(hours: int = 24, limit: int = 50) -> list[JobMetadata]

Get jobs started within the last N hours, from cache.

get_dashboard_stats() -> DashboardStats

Get aggregated stats from cache.

delete_job(project: str, group: str, name: str, run_id: str) -> None

Remove a job from the cache (called after filesystem delete).

invalidate(project: str, group: str, name: str, run_id: str) -> None

Force re-read a specific job on next incremental scan.