Skip to content

Commit

Permalink
Remove cluster config
Browse files Browse the repository at this point in the history
Add extra property environments that allow Slurm's fields in job properties to be set from environment variables. Also, fix memory argument and tweak timing in time out test
  • Loading branch information
PeterC-DLS committed Oct 2, 2023
1 parent ced34c9 commit 9ba54cb
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 69 deletions.
4 changes: 4 additions & 0 deletions ParProcCo/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(
program_wrapper: ProgramWrapper,
output_dir_or_file: Path,
partition: str,
extra_properties: Optional[dict[str,str]] = None,
version: str = "v0.0.38",
user_name: Optional[str] = None,
user_token: Optional[str] = None,
Expand All @@ -28,6 +29,7 @@ def __init__(
self.url = url
self.program_wrapper = program_wrapper
self.partition = partition
self.extra_properties = extra_properties
self.output_file: Optional[Path] = None
self.cluster_output_dir: Optional[Path] = None

Expand Down Expand Up @@ -121,6 +123,7 @@ def _submit_sliced_jobs(
self.working_directory,
self.cluster_output_dir,
self.partition,
self.extra_properties,
self.timeout,
self.version,
self.user_name,
Expand Down Expand Up @@ -162,6 +165,7 @@ def _submit_aggregation_job(self, memory: int) -> None:
self.working_directory,
self.cluster_output_dir,
self.partition,
self.extra_properties,
self.timeout,
self.version,
self.user_name,
Expand Down
21 changes: 7 additions & 14 deletions ParProcCo/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def __init__(
working_directory: Optional[Union[Path, str]],
cluster_output_dir: Optional[Union[Path, str]],
partition: str,
extra_properties: Optional[dict[str,str]] = None,
timeout: timedelta = timedelta(hours=2),
version: str = "v0.0.38",
user_name: Optional[str] = None,
Expand All @@ -165,6 +166,7 @@ def __init__(
else (self.cluster_output_dir if self.cluster_output_dir else Path.home())
)
self.partition = partition
self.extra_properties = extra_properties
self.scheduler_mode: SchedulerModeInterface
self.memory: int
self.cores: int
Expand All @@ -178,6 +180,7 @@ def __init__(
self.token = user_token if user_token else get_slurm_token()
self._session.headers["X-SLURM-USER-NAME"] = self.user
self._session.headers["X-SLURM-USER-TOKEN"] = self.token
self._session.headers["Content-Type"] = "application/json"

def get(
self,
Expand All @@ -191,25 +194,11 @@ def get(
response.raise_for_status()
return response

def _prepare_request(
self, data: BaseModel
) -> tuple[str, dict[str, str]] | tuple[None, None]:
if data is None:
return None, None
return data.model_dump_json(exclude_defaults=True), {
"X-SLURM-USER-NAME": self.user,
"X-SLURM-USER-TOKEN": self.token,
"Content-Type": "application/json",
}

def _post(self, data: BaseModel, endpoint) -> requests.Response:
url = f"{self._url}/{endpoint}"
resp = self._session.post(
url=url,
data=data.model_dump_json(exclude_defaults=True),
headers={
"Content-Type": "application/json",
},
)
return resp

Expand Down Expand Up @@ -406,6 +395,9 @@ def make_job_submission(self, i: int, job=None, jobs=None) -> JobSubmission:
standard_error=stderr_fp,
get_user_environment="10L",
)
if self.extra_properties:
for k,v in self.extra_properties.items():
setattr(job, k, v)

return JobSubmission(script=self.jobscript_command, job=job)

Expand Down Expand Up @@ -444,6 +436,7 @@ def _wait_for_jobs(
now = time.time()
max_time = int(round(self.timeout.total_seconds()))
check_time = min(int(round(max_time / 2)), 60) # half of max_time or one minute
logging.info("Jobs have check_time=%d and max_time=%d", check_time, max_time)
try:
remaining_jobs = list(self.status_infos)
# Wait for jobs to start (timeout shouldn't include queue time)
Expand Down
38 changes: 7 additions & 31 deletions ParProcCo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional, Set, Union
from typing import Dict, Optional, Union
from yaml import YAMLObject, SafeLoader


Expand Down Expand Up @@ -103,27 +103,14 @@ def slice_to_string(s: Optional[slice]) -> str:
return f"{start}:{stop}:{step}"


@dataclass
class PPCCluster(YAMLObject):
yaml_tag = "!PPCCluster"
yaml_loader = SafeLoader

module: str # module loaded to submit jobs
default_queue: str # default cluster queue
user_queues: Optional[
Dict[str, List[str]]
] = None # specific queues with allowed users
resources: Optional[Dict[str, str]] = None # job resources


@dataclass
class PPCConfig(YAMLObject):
yaml_tag = "!PPCConfig"
yaml_loader = SafeLoader

allowed_programs: Dict[str, str] # program name, python package with wrapper module
project_env_var: str # name of environment variable holding project passed to qsub
url: str # slurm rest url
extra_property_envs: Optional[Dict[str, str]] = None # mapping of extra properties to environment variables to pass to slurm's JobProperties


PPC_YAML = "par_proc_co.yaml"
Expand All @@ -142,17 +129,6 @@ def load_cfg() -> PPCConfig:
with open(cfg, "r") as cff:
ppc_config = yaml.safe_load(cff)

for ccfg in ppc_config.clusters.values():
if ccfg.user_queues:
users: Set[str] = set() # check for overlaps
for us in ccfg.user_queues.values():
common = users.intersection(set(us))
if common:
raise ValueError(
"Users %s cannot be assigned to more than one queue",
", ".join(common),
)
users.update(us)
return ppc_config


Expand Down Expand Up @@ -188,8 +164,8 @@ def set_up_wrapper(cfg: PPCConfig, program: str):

if sys.version_info < (3, 10):
from backports.entry_points_selectable import (
entry_points,
) # @UnresolvedImport
entry_points, # @UnresolvedImport
)
else:
from importlib.metadata import entry_points # @UnresolvedImport

Expand Down Expand Up @@ -220,9 +196,9 @@ def set_up_wrapper(cfg: PPCConfig, program: str):

def find_cfg_file(name: str) -> Path:
""" """
cp = os.getenv("PPC_CONFIG")
if cp:
return Path(cp)
c = os.getenv("PPC_CONFIG")
if c:
return Path(c)

cp = Path.home() / ("." + name)
if cp.is_file():
Expand Down
15 changes: 3 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,9 @@ allowed_programs:
rs_map: msmapper_utils
blah1: whatever_package1
blah2: whatever_package2
project_env_var: CLUSTER_PROJECT
cluster_help_msg: Please module load blah
clusters:
cluster_one: !PPCCluster
default_queue: basic.q
user_queues:
better.q: middle_user1
best.q: power_user1, power_user2
cluster_two: !PPCCluster
default_queue: only.q
resources:
cpu_model: arm64
url: https://slurm.local:8443
extra_property_envs: # optional dictionary for slurm job properties and environment variables
account: MY_ACCOUNT # env var that holds account
```

An entry point called `ParProcCo.allowed_programs` can be added to other packages' `setup.py`:
Expand Down
20 changes: 16 additions & 4 deletions scripts/ppc_cluster_submit
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ def create_parser() -> argparse.ArgumentParser:
default="2h",
)
parser.add_argument(
"--memory", help="str: memory to use per cluster job", required=True
"--memory",
help="int: memory to use per cluster job (MB)",
type=int,
required=True
)
parser.add_argument(
"--cores",
Expand Down Expand Up @@ -87,9 +90,17 @@ def run_ppc(args: argparse.Namespace, script_args: List) -> None:
cfg = load_cfg()
url = cfg.url
token = get_token(args.token)
partition = args.partition
user = getuser()
partition = args.partition

extra_properties = None
if cfg.extra_property_envs:
extra_properties = {}
logging.debug("Extra job properties:")
for k,e in cfg.extra_property_envs:
v = os.getenv(e)
logging.debug("\t%s: %s", k, v)
if v:
extra_properties[k] = v

timeout = parse_timeout(args.timeout)
logging.info("Running for with timeout %s", timeout)
Expand All @@ -111,7 +122,8 @@ def run_ppc(args: argparse.Namespace, script_args: List) -> None:
url,
wrapper,
output,
partition,
args.partition,
extra_properties,
SLURM_VERSION,
user,
token,
Expand Down
16 changes: 8 additions & 8 deletions tests/test_job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


def create_js(work_dir, out_dir, timeout=timedelta(hours=2)) -> JobScheduler:
return JobScheduler(slurm_rest_url, work_dir, out_dir, PARTITION, timeout)
return JobScheduler(slurm_rest_url, work_dir, out_dir, PARTITION, timeout=timeout)


@pytest.mark.skipif(gh_testing, reason="running GitHub workflow")
Expand Down Expand Up @@ -229,7 +229,7 @@ def test_job_times_out(self) -> None:
runner_script = setup_runner_script(working_directory)
jobscript = setup_jobscript(working_directory)
with open(jobscript, "a+") as f:
f.write(" import time\n time.sleep(60)\n")
f.write(" import time\n time.sleep(120)\n")

input_path, _, _, slices = setup_data_files(
working_directory, cluster_output_dir
Expand All @@ -240,7 +240,7 @@ def test_job_times_out(self) -> None:

# submit jobs
js = create_js(
working_directory, cluster_output_dir, timeout=timedelta(seconds=1)
working_directory, cluster_output_dir, timeout=timedelta(seconds=10)
)

with self.assertLogs(level="WARNING") as context:
Expand Down Expand Up @@ -299,7 +299,7 @@ def test_job_times_out(self) -> None:
]
)
def test_script(
self, name, rs_name, open_rs, permissions, error_name, error_msg
self, _name, rs_name, open_rs, permissions, error_name, error_msg
) -> None:
with TemporaryDirectory(
prefix="test_dir_", dir=self.base_dir
Expand Down Expand Up @@ -349,15 +349,15 @@ def test_get_output_paths(self) -> None:
("true_false", True, False, False),
]
)
def test_get_success(self, name, stat_0, stat_1, success) -> None:
def test_get_success(self, _name, stat_0, stat_1, success) -> None:
with TemporaryDirectory(prefix="test_dir_") as working_directory:
cluster_output_dir = Path(working_directory) / "cluster_output"
js = create_js(working_directory, cluster_output_dir)
js.job_completion_status = {"0": stat_0, "1": stat_1}
self.assertEqual(js.get_success(), success)

@parameterized.expand([("true", True), ("false", False)])
def test_timestamp_ok_true(self, name, run_scheduler_last) -> None:
def test_timestamp_ok_true(self, _name, run_scheduler_last) -> None:
with TemporaryDirectory(
prefix="test_dir_", dir=self.base_dir
) as working_directory:
Expand Down Expand Up @@ -450,7 +450,7 @@ def test_get_jobs_response(self) -> None:
),
]
)
def test_filter_killed_jobs(self, name, failed_jobs, result) -> None:
def test_filter_killed_jobs(self, _name, failed_jobs, result) -> None:
with TemporaryDirectory(
prefix="test_dir_", dir=self.base_dir
) as working_directory:
Expand Down Expand Up @@ -727,7 +727,7 @@ def test_resubmit_jobs(self) -> None:
)
def test_resubmit_killed_jobs(
self,
name,
_name,
allow_all_failed,
job_history,
job_completion_status,
Expand Down

0 comments on commit 9ba54cb

Please sign in to comment.