Skip to content

Commit

Permalink
Use a string to specify memory for ppc cluster submit
Browse files Browse the repository at this point in the history
Also, set timeout for aggregation to scale with number of files
  • Loading branch information
PeterC-DLS committed Oct 9, 2023
1 parent 36bbfd3 commit d83439d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
11 changes: 8 additions & 3 deletions ParProcCo/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from .utils import check_location, get_absolute_path
from .program_wrapper import ProgramWrapper

AGGREGATION_TIME = 60 # timeout per single file, in seconds


class JobController:
def __init__(
Expand All @@ -19,7 +21,7 @@ def __init__(
program_wrapper: ProgramWrapper,
output_dir_or_file: Path,
partition: str,
extra_properties: Optional[dict[str,str]] = None,
extra_properties: Optional[dict[str, str]] = None,
user_name: Optional[str] = None,
user_token: Optional[str] = None,
timeout: timedelta = timedelta(hours=2),
Expand Down Expand Up @@ -95,7 +97,10 @@ def run(
and out_file.is_file()
and self.output_file is not None
):
out_file.rename(self.output_file)
renamed_file = out_file.rename(self.output_file)
logging.debug(
"Rename %s to %s: %s", out_file, renamed_file, renamed_file.exists()
)
else:
logging.error(
f"Sliced jobs failed with slice_params: {slice_params}, jobscript_args: {jobscript_args},"
Expand Down Expand Up @@ -163,7 +168,7 @@ def _submit_aggregation_job(self, memory: int) -> None:
self.cluster_output_dir,
self.partition,
self.extra_properties,
self.timeout,
timedelta(seconds=AGGREGATION_TIME * len(self.sliced_results)),
self.user_name,
self.user_token,
)
Expand Down
44 changes: 31 additions & 13 deletions scripts/ppc_cluster_submit
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from __future__ import annotations

import argparse
import datetime
import logging
import os
import re
from datetime import timedelta
from getpass import getuser
from typing import List

from ParProcCo.job_controller import JobController
from ParProcCo.passthru_wrapper import PassThruWrapper
Expand All @@ -18,7 +18,7 @@ from ParProcCo.utils import get_token, set_up_wrapper
def create_parser() -> argparse.ArgumentParser:
"""
$ ppc_cluster_submit program [--partition cs05r] [--token path/to/token/file] [--output cluster_output_dir]
[--jobs 4] [--timeout 1h30m] --memory 4000 --cores 6 -s 0.01 ... [input files]
[--jobs 4] [--timeout 1h30m] --memory 4000M --cores 6 -s 0.01 ... [input files]
"""
parser = argparse.ArgumentParser(
description="ParProcCo run script",
Expand Down Expand Up @@ -46,9 +46,8 @@ def create_parser() -> argparse.ArgumentParser:
default="2h",
)
parser.add_argument(
"--memory",
help="int: memory to use per cluster job (MB)",
type=int,
'--memory',
help='maximum memory to use (e.g. 1024M, 4G, etc)',
required=True
)
parser.add_argument(
Expand All @@ -68,18 +67,36 @@ TIMEOUT_PATTERN = re.compile(
) # @UndefinedVariable


def parse_timeout(timeout: str) -> datetime.timedelta:
def parse_timeout(timeout: str) -> timedelta:
mo = TIMEOUT_PATTERN.match(timeout.strip())
if not mo:
raise ValueError(f"Could not parse {timeout} as time interval")
to_dict = {
k: int(v) for k, v in mo.groupdict().items() if v is not None
} # filter out None-valued items
logging.debug("Parsed time as %s", to_dict)
return datetime.timedelta(**to_dict)


def run_ppc(args: argparse.Namespace, script_args: List) -> None:
return timedelta(**to_dict)


def parse_memory(memory: str) -> int:
l = memory[-1]
if l not in ('M', 'G'):
raise ValueError('Memory specified must end with M or G')
try:
m = int(memory[:-1])
if m <= 0:
raise ValueError('Memory specified must be greater than 0')
if l == 'M' and m < 512:
logging.warning('Memory specified is recommended to be over 512M')
if l == 'G'and m > 64:
m = m*1024
logging.warning('Memory specified (>64G) seems to be excessive')
return m
except:
raise ValueError('Memory specified must start with a number')


def run_ppc(args: argparse.Namespace, script_args: list[str]) -> None:
"""
Run JobController
"""
Expand All @@ -101,7 +118,8 @@ def run_ppc(args: argparse.Namespace, script_args: List) -> None:
extra_properties[k] = v

timeout = parse_timeout(args.timeout)
logging.info("Running for with timeout %s", timeout)
memory = parse_memory(args.memory)
logging.info("Running with timeout %s and memory limit %dM", timeout, memory)

if not script_args:
raise ValueError("No script and any of its arguments given")
Expand All @@ -126,7 +144,7 @@ def run_ppc(args: argparse.Namespace, script_args: List) -> None:
token,
timeout,
)
jc.run(args.jobs, wrapper_args, args.memory, "PPC-" + program)
jc.run(args.jobs, wrapper_args, memory, "PPC-" + program)
print("complete")


Expand Down

0 comments on commit d83439d

Please sign in to comment.