From eb200a0f560e1b7f1446bda2e5d78c615d573a2c Mon Sep 17 00:00:00 2001 From: Pete Date: Thu, 13 Jun 2024 10:41:06 -0700 Subject: [PATCH] Add to `Job` and `ClusterUtilization` APIs (#278) --- CHANGELOG.md | 7 +++++++ beaker/data_model/cluster.py | 2 ++ beaker/data_model/job.py | 12 ++++++++++++ beaker/services/cluster.py | 12 ++++++++---- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ab1ea..671314d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ use patch releases for compatibility fixes instead. ## Unreleased +### Added + +- Added `Job.is_preemptible` property. +- Added `Job.is_running` property. +- Added `Job.is_queued` property. +- Added `ClusterUtilization.jobs` field. + ## [v1.27.2](https://github.com/allenai/beaker-py/releases/tag/v1.27.2) - 2024-05-31 ### Added diff --git a/beaker/data_model/cluster.py b/beaker/data_model/cluster.py index fb4399e..43f8884 100644 --- a/beaker/data_model/cluster.py +++ b/beaker/data_model/cluster.py @@ -2,6 +2,7 @@ from typing import Optional, Tuple from .base import BaseModel, StrEnum, field_validator +from .job import Job from .node import NodeResources, NodeUtilization __all__ = ["ClusterStatus", "Cluster", "ClusterUtilization", "ClusterSpec", "ClusterPatch"] @@ -77,6 +78,7 @@ class ClusterUtilization(BaseModel): queued_jobs: int running_preemptible_jobs: int nodes: Tuple[NodeUtilization, ...] + jobs: Tuple[Job, ...] @property def id(self) -> str: diff --git a/beaker/data_model/job.py b/beaker/data_model/job.py index 833152a..853b1e9 100644 --- a/beaker/data_model/job.py +++ b/beaker/data_model/job.py @@ -199,6 +199,14 @@ def is_done(self) -> bool: """ return self.status.current == CurrentJobStatus.finalized + @property + def is_running(self) -> bool: + return self.status.current in (CurrentJobStatus.running, CurrentJobStatus.idle) + + @property + def is_queued(self) -> bool: + return self.status.current == CurrentJobStatus.created + @property def was_preempted(self) -> bool: return self.status.canceled is not None and self.status.canceled_code in { @@ -206,6 +214,10 @@ def was_preempted(self) -> bool: CanceledCode.user_preemption, } + @property + def is_preemptible(self) -> bool: + return self.preemptible or (self.priority == Priority.preemptible) + @property def priority(self) -> Optional[Priority]: """ diff --git a/beaker/services/cluster.py b/beaker/services/cluster.py index f4f2db9..06ad397 100644 --- a/beaker/services/cluster.py +++ b/beaker/services/cluster.py @@ -208,6 +208,7 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: running_jobs = 0 queued_jobs = 0 running_preemptible_jobs = 0 + jobs: List[Job] = [] node_to_util: Dict[str, Dict[str, Union[int, float]]] = { node.id: { "running_jobs": 0, @@ -219,22 +220,24 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: } for job in self.beaker.job.list(cluster=cluster, finalized=False): - if job.status.current in (CurrentJobStatus.running, CurrentJobStatus.idle): + if job.is_running: if job.node not in node_to_util: continue running_jobs += 1 - if job.priority == Priority.preemptible or job.preemptible: + if job.is_preemptible: running_preemptible_jobs += 1 - elif job.status.current == CurrentJobStatus.created: + elif job.is_queued: queued_jobs += 1 + jobs.append(job) + if job.node is not None: if job.node not in node_to_util: continue # unlikely node_util = node_to_util[job.node] node_util["running_jobs"] += 1 - if job.priority == Priority.preemptible or job.preemptible: + if job.is_preemptible: node_util["running_preemptible_jobs"] += 1 if job.limits is not None: if job.limits.gpus is not None: @@ -285,6 +288,7 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: queued_jobs=queued_jobs, running_preemptible_jobs=running_preemptible_jobs, nodes=tuple(node_utilizations), + jobs=tuple(jobs), ) def filter_available(