diff --git a/ParProcCo/job_controller.py b/ParProcCo/job_controller.py index 26bd4df..e3c2140 100644 --- a/ParProcCo/job_controller.py +++ b/ParProcCo/job_controller.py @@ -11,6 +11,8 @@ from .utils import check_location, get_absolute_path from .program_wrapper import ProgramWrapper +AGGREGATION_TIME = 60 # timeout per single file, in seconds + class JobController: def __init__( @@ -19,7 +21,7 @@ def __init__( program_wrapper: ProgramWrapper, output_dir_or_file: Path, partition: str, - extra_properties: Optional[dict[str,str]] = None, + extra_properties: Optional[dict[str, str]] = None, user_name: Optional[str] = None, user_token: Optional[str] = None, timeout: timedelta = timedelta(hours=2), @@ -95,7 +97,10 @@ def run( and out_file.is_file() and self.output_file is not None ): - out_file.rename(self.output_file) + renamed_file = out_file.rename(self.output_file) + logging.debug( + "Rename %s to %s: %s", out_file, renamed_file, renamed_file.exists() + ) else: logging.error( f"Sliced jobs failed with slice_params: {slice_params}, jobscript_args: {jobscript_args}," @@ -163,7 +168,7 @@ def _submit_aggregation_job(self, memory: int) -> None: self.cluster_output_dir, self.partition, self.extra_properties, - self.timeout, + timedelta(seconds=AGGREGATION_TIME * len(self.sliced_results)), self.user_name, self.user_token, ) diff --git a/scripts/ppc_cluster_submit b/scripts/ppc_cluster_submit index 621f9e7..7b3a482 100755 --- a/scripts/ppc_cluster_submit +++ b/scripts/ppc_cluster_submit @@ -4,11 +4,11 @@ from __future__ import annotations import argparse -import datetime import logging +import os import re +from datetime import timedelta from getpass import getuser -from typing import List from ParProcCo.job_controller import JobController from ParProcCo.passthru_wrapper import PassThruWrapper @@ -18,7 +18,7 @@ from ParProcCo.utils import get_token, set_up_wrapper def create_parser() -> argparse.ArgumentParser: """ $ ppc_cluster_submit program [--partition cs05r] [--token path/to/token/file] [--output cluster_output_dir] - [--jobs 4] [--timeout 1h30m] --memory 4000 --cores 6 -s 0.01 ... [input files] + [--jobs 4] [--timeout 1h30m] --memory 4000M --cores 6 -s 0.01 ... [input files] """ parser = argparse.ArgumentParser( description="ParProcCo run script", @@ -46,9 +46,8 @@ def create_parser() -> argparse.ArgumentParser: default="2h", ) parser.add_argument( - "--memory", - help="int: memory to use per cluster job (MB)", - type=int, + '--memory', + help='maximum memory to use (e.g. 1024M, 4G, etc)', required=True ) parser.add_argument( @@ -68,7 +67,7 @@ TIMEOUT_PATTERN = re.compile( ) # @UndefinedVariable -def parse_timeout(timeout: str) -> datetime.timedelta: +def parse_timeout(timeout: str) -> timedelta: mo = TIMEOUT_PATTERN.match(timeout.strip()) if not mo: raise ValueError(f"Could not parse {timeout} as time interval") @@ -76,10 +75,28 @@ def parse_timeout(timeout: str) -> datetime.timedelta: k: int(v) for k, v in mo.groupdict().items() if v is not None } # filter out None-valued items logging.debug("Parsed time as %s", to_dict) - return datetime.timedelta(**to_dict) - - -def run_ppc(args: argparse.Namespace, script_args: List) -> None: + return timedelta(**to_dict) + + +def parse_memory(memory: str) -> int: + l = memory[-1] + if l not in ('M', 'G'): + raise ValueError('Memory specified must end with M or G') + try: + m = int(memory[:-1]) + if m <= 0: + raise ValueError('Memory specified must be greater than 0') + if l == 'M' and m < 512: + logging.warning('Memory specified is recommended to be over 512M') + if l == 'G'and m > 64: + m = m*1024 + logging.warning('Memory specified (>64G) seems to be excessive') + return m + except: + raise ValueError('Memory specified must start with a number') + + +def run_ppc(args: argparse.Namespace, script_args: list[str]) -> None: """ Run JobController """ @@ -101,7 +118,8 @@ def run_ppc(args: argparse.Namespace, script_args: List) -> None: extra_properties[k] = v timeout = parse_timeout(args.timeout) - logging.info("Running for with timeout %s", timeout) + memory = parse_memory(args.memory) + logging.info("Running with timeout %s and memory limit %dM", timeout, memory) if not script_args: raise ValueError("No script and any of its arguments given") @@ -126,7 +144,7 @@ def run_ppc(args: argparse.Namespace, script_args: List) -> None: token, timeout, ) - jc.run(args.jobs, wrapper_args, args.memory, "PPC-" + program) + jc.run(args.jobs, wrapper_args, memory, "PPC-" + program) print("complete")