diff --git a/law.cfg.example b/law.cfg.example index faa7d1f9..71d48eca 100644 --- a/law.cfg.example +++ b/law.cfg.example @@ -844,9 +844,17 @@ ; values above are used. The only exception is "htcondor_job_file_dir_cleanup" whose default value ; is False. +; htcondor_job_grouping_submit +; Desciption: Whether to use job grouping (cluster submission in HTCondor nomenclature) or not. If +; not, the standard batched submission is used and settings such as "htcondor_chunk_size_submit" and +; "htcondor_merge_job_files" are considered. +; Type: boolean +; Default: True + ; htcondor_chunk_size_submit ; Description: Number of jobs that can be submitted in parallel inside a single call to -; "law.htcondor.HTCondorJobManager.submit", i.e., in a single "condor_submit" command. +; "law.htcondor.HTCondorJobManager.submit", i.e., in a single "condor_submit" command. Ignored when +; job grouping is enabled in "htcondor_job_grouping_submit". ; Type: integer ; Default: 25 @@ -864,8 +872,9 @@ ; htcondor_merge_job_files ; Description: A boolean flag that decides whether multiple job description files should be merged -; into a single file before submission. When "False", the "htcondor_chunk_size_submit" option is -; not considered either. +; into a single file before submission. Ignored when job grouping is enabled in +; "htcondor_job_grouping_submit". When "False", the "htcondor_chunk_size_submit" option is not +; considered either. ; Type: boolean ; Default: True diff --git a/law/contrib/cms/job.py b/law/contrib/cms/job.py index b0280693..d89edcff 100644 --- a/law/contrib/cms/job.py +++ b/law/contrib/cms/job.py @@ -56,7 +56,10 @@ class CrabJobManager(BaseJobManager): log_file_pattern = "https://cmsweb.cern.ch:8443/scheddmon/{scheduler_id}/{user}/{task_name}/job_out.{crab_num}.{attempt}.txt" # noqa - job_grouping = True + job_grouping_submit = True + job_grouping_query = True + job_grouping_cancel = True + job_grouping_cleanup = True JobId = namedtuple("JobId", ["crab_num", "task_name", "proj_dir"]) diff --git a/law/contrib/cms/workflow.py b/law/contrib/cms/workflow.py index 5a3b5921..b45b2b45 100644 --- a/law/contrib/cms/workflow.py +++ b/law/contrib/cms/workflow.py @@ -160,7 +160,7 @@ def create_job_file(self, submit_jobs): c.custom_log_file = log_file # task hook - c = task.crab_job_config(c, submit_jobs) + c = task.crab_job_config(c, list(submit_jobs.keys()), list(submit_jobs.values())) # build the job file and get the sanitized config job_file, c = self.job_file_factory(**c.__dict__) diff --git a/law/contrib/htcondor/config.py b/law/contrib/htcondor/config.py index 59944cc8..a63111b2 100644 --- a/law/contrib/htcondor/config.py +++ b/law/contrib/htcondor/config.py @@ -8,6 +8,7 @@ def config_defaults(default_config): return { "job": { + "htcondor_job_grouping_submit": True, "htcondor_job_file_dir": None, "htcondor_job_file_dir_mkdtemp": None, "htcondor_job_file_dir_cleanup": False, diff --git a/law/contrib/htcondor/htcondor_wrapper.sh b/law/contrib/htcondor/htcondor_wrapper.sh new file mode 100644 index 00000000..a730f235 --- /dev/null +++ b/law/contrib/htcondor/htcondor_wrapper.sh @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Wrapper script that is to be configured as htcondor's main executable file + +htcondor_wrapper() { + # helper to select the correct python executable + _law_python() { + command -v python &> /dev/null && python "$@" || python3 "$@" + } + + # + # detect variables + # + + local shell_is_zsh="$( [ -z "${ZSH_VERSION}" ] && echo "false" || echo "true" )" + local this_file="$( ${shell_is_zsh} && echo "${(%):-%x}" || echo "${BASH_SOURCE[0]}" )" + local this_file_base="$( basename "${this_file}" )" + + # get the job number + export LAW_HTCONDOR_JOB_NUMBER="${LAW_HTCONDOR_JOB_PROCESS}" + if [ -z "${LAW_HTCONDOR_JOB_NUMBER}" ]; then + >&2 echo "could not determine htcondor job number" + return "1" + fi + # htcondor process numbers start at 0, law job numbers at 1, so increment + ((LAW_HTCONDOR_JOB_NUMBER++)) + echo "running ${this_file_base} for job number ${LAW_HTCONDOR_JOB_NUMBER}" + + + # + # job argument definitons, depending on LAW_HTCONDOR_JOB_NUMBER + # + + # definition + local htcondor_job_arguments_map + declare -A htcondor_job_arguments_map + htcondor_job_arguments_map=( + {{htcondor_job_arguments_map}} + ) + + # pick + local htcondor_job_arguments="${htcondor_job_arguments_map[${LAW_HTCONDOR_JOB_NUMBER}]}" + if [ -z "${htcondor_job_arguments}" ]; then + >&2 echo "empty htcondor job arguments for LAW_HTCONDOR_JOB_NUMBER ${LAW_HTCONDOR_JOB_NUMBER}" + return "3" + fi + + + # + # variable rendering + # + + # check variables + local render_variables="{{render_variables}}" + if [ -z "${render_variables}" ]; then + >&2 echo "empty render variables" + return "4" + fi + + # decode + render_variables="$( echo "${render_variables}" | base64 --decode )" + + # check files to render + local input_files_render=( {{input_files_render}} ) + if [ "${#input_files_render[@]}" == "0" ]; then + >&2 echo "received empty input files for rendering for LAW_HTCONDOR_JOB_NUMBER ${LAW_HTCONDOR_JOB_NUMBER}" + return "5" + fi + + # render files + local input_file_render + for input_file_render in ${input_files_render[@]}; do + # skip if the file refers to _this_ one + local input_file_render_base="$( basename "${input_file_render}" )" + [ "${input_file_render_base}" = "${this_file_base}" ] && continue + # render + echo "render ${input_file_render}" + _law_python -c "\ +import re;\ +repl = ${render_variables};\ +repl['input_files_render'] = '';\ +repl['file_postfix'] = '${file_postfix}' or repl.get('file_postfix', '');\ +repl['log_file'] = '${log_file}' or repl.get('log_file', '');\ +content = open('${input_file_render}', 'r').read();\ +content = re.sub(r'\{\{(\w+)\}\}', lambda m: repl.get(m.group(1), ''), content);\ +open('${input_file_render_base}', 'w').write(content);\ +" + local render_ret="$?" + # handle rendering errors + if [ "${render_ret}" != "0" ]; then + >&2 echo "input file rendering failed with code ${render_ret}" + return "6" + fi + done + + + # + # run the actual job file + # + + # check the job file + local job_file="{{job_file}}" + if [ ! -f "${job_file}" ]; then + >&2 echo "job file '${job_file}' does not exist" + return "7" + fi + + # helper to print a banner + banner() { + local msg="$1" + + echo + echo "================================================================================" + echo "=== ${msg}" + echo "================================================================================" + echo + } + + # debugging: print its contents + # echo "=== content of job file '${job_file}'" + # echo + # cat "${job_file}" + # echo + # echo "=== end of job file content" + + # run it + banner "Start of law job" + + local job_ret + bash "${job_file}" ${htcondor_job_arguments} + job_ret="$?" + + banner "End of law job" + + return "${job_ret}" +} + +action() { + # arguments: file_postfix, log_file + local file_postfix="$1" + local log_file="$2" + + if [ -z "${log_file}" ]; then + htcondor_wrapper "$@" + elif command -v tee &> /dev/null; then + set -o pipefail + echo "---" >> "${log_file}" + htcondor_wrapper "$@" 2>&1 | tee -a "${log_file}" + else + echo "---" >> "${log_file}" + htcondor_wrapper "$@" &>> "${log_file}" + fi +} + +action "$@" diff --git a/law/contrib/htcondor/job.py b/law/contrib/htcondor/job.py index 2ed3df19..6748e6e4 100644 --- a/law/contrib/htcondor/job.py +++ b/law/contrib/htcondor/job.py @@ -30,15 +30,24 @@ class HTCondorJobManager(BaseJobManager): - # whether to merge jobs files for batched submission - merge_job_files = _cfg.get_expanded_bool("job", "htcondor_merge_job_files") - - # chunking settings - chunk_size_submit = ( - _cfg.get_expanded_int("job", "htcondor_chunk_size_submit") - if merge_job_files - else 0 - ) + # whether to use job grouping or batched submission + job_grouping_submit = _cfg.get_expanded_bool("job", "htcondor_job_grouping_submit") + + # settings depending on job grouping or batched submission + merge_job_files = False + chunk_size_submit = 0 + if not job_grouping_submit: + # whether to merge jobs files for batched submission + merge_job_files = _cfg.get_expanded_bool("job", "htcondor_merge_job_files") + + # chunking for batched submission + chunk_size_submit = ( + _cfg.get_expanded_int("job", "htcondor_chunk_size_submit") + if merge_job_files + else 0 + ) + + # other chunking settings chunk_size_cancel = _cfg.get_expanded_int("job", "htcondor_chunk_size_cancel") chunk_size_query = _cfg.get_expanded_int("job", "htcondor_chunk_size_query") @@ -66,7 +75,28 @@ def cleanup(self, *args, **kwargs): def cleanup_batch(self, *args, **kwargs): raise NotImplementedError("HTCondorJobManager.cleanup_batch is not implemented") - def submit(self, job_file, pool=None, scheduler=None, retries=0, retry_delay=3, silent=False): + def submit(self, job_file, job_files=None, pool=None, scheduler=None, retries=0, retry_delay=3, + silent=False): + # signature is the superset for both grouped and batched submission, and the dispatching to + # the actual submission implementation is based on the presence of job_files + kwargs = { + "pool": pool, + "scheduler": scheduler, + "retries": retries, + "retry_delay": retry_delay, + "silent": silent, + } + + if job_files is None: + func = self._submit_impl_batched + else: + kwargs["job_files"] = job_files + func = self._submit_impl_grouped + + return func(job_file, **kwargs) + + def _submit_impl_batched(self, job_file, pool=None, scheduler=None, retries=0, retry_delay=3, + silent=False): # default arguments if pool is None: pool = self.pool @@ -159,6 +189,62 @@ def has_initialdir(job_file): raise Exception("submission of htcondor job(s) '{}' failed:\n{}".format( job_files_repr, err)) + def _submit_impl_grouped(self, job_file, job_files=None, pool=None, scheduler=None, retries=0, + retry_delay=3, silent=False): + # default arguments + if pool is None: + pool = self.pool + if scheduler is None: + scheduler = self.scheduler + + # build the command + cmd = ["condor_submit"] + if pool: + cmd += ["-pool", pool] + if scheduler: + cmd += ["-name", scheduler] + cmd.append(os.path.basename(job_file)) + cmd = quote_cmd(cmd) + + # define the actual submission in a loop to simplify retries + while True: + # run the command + logger.debug("submit htcondor job with command '{}'".format(cmd)) + code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", + stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file)) + + # get the job id(s) + if code == 0: + # loop through all lines and try to match the expected pattern + job_ids = [] + for line in out.strip().split("\n"): + m = self.submission_job_id_cre.match(line.strip()) + if m: + job_ids.extend([ + "{}.{}".format(m.group(2), i) + for i in range(int(m.group(1))) + ]) + if not job_ids: + code = 1 + err = "cannot parse htcondor job id(s) from output:\n{}".format(out) + + # retry or done? + if code == 0: + return job_ids + + logger.debug("submission of htcondor job(s) '{}' failed with code {}:\n{}".format( + job_file, code, err)) + + if retries > 0: + retries -= 1 + time.sleep(retry_delay) + continue + + if silent: + return None + + raise Exception("submission of htcondor job(s) '{}' failed:\n{}".format(job_file, err)) + def cancel(self, job_id, pool=None, scheduler=None, silent=False): # default arguments if pool is None: @@ -344,13 +430,13 @@ class HTCondorJobFileFactory(BaseJobFileFactory): config_attrs = BaseJobFileFactory.config_attrs + [ "file_name", "command", "executable", "arguments", "input_files", "output_files", "log", - "stdout", "stderr", "postfix_output_files", "universe", "notification", "custom_content", - "absolute_paths", + "stdout", "stderr", "postfix_output_files", "postfix", "universe", + "notification", "custom_content", "absolute_paths", ] def __init__(self, file_name="htcondor_job.jdl", command=None, executable=None, arguments=None, input_files=None, output_files=None, log="log.txt", stdout="stdout.txt", - stderr="stderr.txt", postfix_output_files=True, universe="vanilla", + stderr="stderr.txt", postfix_output_files=True, postfix=None, universe="vanilla", notification="Never", custom_content=None, absolute_paths=False, **kwargs): # get some default kwargs from the config cfg = Config.instance() @@ -376,18 +462,28 @@ def __init__(self, file_name="htcondor_job.jdl", command=None, executable=None, self.stdout = stdout self.stderr = stderr self.postfix_output_files = postfix_output_files + self.postfix = postfix self.universe = universe self.notification = notification self.custom_content = custom_content self.absolute_paths = absolute_paths - def create(self, postfix=None, **kwargs): + def create(self, grouped_submission=False, **kwargs): # merge kwargs and instance attributes c = self.get_config(**kwargs) # some sanity checks if not c.file_name: raise ValueError("file_name must not be empty") + if not c.arguments: + raise ValueError("arguments must not be empty") + c.arguments = make_list(c.arguments) + if grouped_submission and c.postfix: + c.postfix = make_list(c.postfix) + if len(c.postfix) != len(c.arguments): + raise ValueError("number of postfixes does not match the number of arguments") + if c.postfix_output_files and not c.postfix: + raise ValueError("postfix must not be empty when postfix_output_files is set") if not c.command and not c.executable: raise ValueError("either command or executable must not be empty") if not c.universe: @@ -398,6 +494,7 @@ def create(self, postfix=None, **kwargs): c.output_files.append(c.custom_log_file) # postfix certain output files + postfix = "$(law_job_postfix)" if grouped_submission else c.postfix c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: skip_postfix_cre = re.compile(r"^(/dev/).*$") @@ -438,7 +535,7 @@ def prepare_input(f): # copy the file abs_path = self.provide_input( src=abs_path, - postfix=postfix if f.postfix and not f.share else None, + postfix=c.postfix if not grouped_submission and f.postfix and not f.share else None, dir=c.dir, skip_existing=f.share, ) @@ -496,14 +593,24 @@ def prepare_input(f): c.render_variables["log_file"] = c.custom_log_file # add the file postfix to render variables - if postfix and "file_postfix" not in c.render_variables: - c.render_variables["file_postfix"] = postfix + # (this is done in the wrapper script for grouped submission) + if not grouped_submission and c.postfix and "file_postfix" not in c.render_variables: + c.render_variables["file_postfix"] = c.postfix + + # inject arguments into the htcondor wrapper via render variables + if grouped_submission: + c.render_variables["htcondor_job_arguments_map"] = ("\n" + 8 * " ").join( + "['{}']=\"{}\"".format(i + 1, str(args)) + for i, args in enumerate(c.arguments) + ) # linearize render variables render_variables = self.linearize_render_variables(c.render_variables) # prepare the job description file - job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) + job_file = os.path.join(c.dir, str(c.file_name)) + if not grouped_submission: + job_file = self.postfix_input_file(job_file, c.postfix) # render copied, non-forwarded input files for key, f in c.input_files.items(): @@ -513,7 +620,7 @@ def prepare_input(f): f.path_sub_abs, f.path_sub_abs, render_variables, - postfix=postfix if f.postfix else None, + postfix=c.postfix if not grouped_submission and f.postfix else None, ) # prepare the executable when given @@ -524,13 +631,21 @@ def prepare_input(f): if os.path.exists(path): os.chmod(path, os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP) + # helper to encode lists + def encode_list(items, sep=" ", quote=True): + items = make_list(items) + s = sep.join(map(str, items)) + if quote: + s = "\"{}\"".format(s) # noqa: Q003 + return s + # job file content content = [] content.append(("universe", c.universe)) if c.command: cmd = quote_cmd(c.command) if isinstance(c.command, (list, tuple)) else c.command content.append(("executable", cmd)) - elif c.executable: + else: content.append(("executable", c.executable)) if c.log: content.append(("log", c.log)) @@ -541,13 +656,21 @@ def prepare_input(f): if c.input_files or c.output_files: content.append(("should_transfer_files", "YES")) if c.input_files: - content.append(("transfer_input_files", make_unique( - f.path_sub_rel - for f in c.input_files.values() - if f.path_sub_rel + content.append(("transfer_input_files", encode_list( + make_unique( + f.path_sub_rel + for f in c.input_files.values() + if f.path_sub_rel + ), + sep=",", + quote=False, ))) if c.output_files: - content.append(("transfer_output_files", make_unique(c.output_files))) + content.append(("transfer_output_files", encode_list( + make_unique(c.output_files), + sep=",", + quote=False, + ))) content.append(("when_to_transfer_output", "ON_EXIT")) if c.notification: content.append(("notification", c.notification)) @@ -556,9 +679,33 @@ def prepare_input(f): if c.custom_content: content += c.custom_content - # finally arguments and queuing statements - if c.arguments: - for _arguments in make_list(c.arguments): + # add htcondor specific env variables + env_vars = [] + _content = [] + for obj in content: + if isinstance(obj, tuple) and len(obj) == 2 and obj[0].lower() == "environment": + env_vars.append(obj[1].strip("\"")) # noqa: Q003 + else: + _content.append(obj) + content = _content + # add new ones and add back to content + env_vars.append("LAW_HTCONDOR_JOB_CLUSTER=$(Cluster)") + env_vars.append("LAW_HTCONDOR_JOB_PROCESS=$(Process)") + content.append(("environment", encode_list(env_vars, sep=" ", quote=True))) + + # queue + if grouped_submission: + content.append("queue law_job_postfix, arguments from (") + for i in range(len(c.arguments)): + pf = log = "''" + if c.postfix_output_files: + pf = c.postfix[i] + if c.custom_log_file: + log = c.custom_log_file + content.append(" {0}, {0} {1}".format(pf, log)) + content.append(")") + elif c.arguments: + for _arguments in c.arguments: content.append(("arguments", _arguments)) content.append("queue") else: @@ -576,9 +723,6 @@ def prepare_input(f): @classmethod def create_line(cls, key, value=None): - if isinstance(value, (list, tuple)): - value = ",".join(str(v) for v in value) if value is None: return str(key) - else: - return "{} = {}".format(key, value) + return "{} = {}".format(key, value) diff --git a/law/contrib/htcondor/workflow.py b/law/contrib/htcondor/workflow.py index c694ddd3..e76ea115 100644 --- a/law/contrib/htcondor/workflow.py +++ b/law/contrib/htcondor/workflow.py @@ -14,12 +14,12 @@ import luigi from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy -from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles +from law.job.base import JobArguments, JobInputFile from law.task.proxy import ProxyCommand from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget from law.parameter import NO_STR -from law.util import law_src_path, merge_dicts, DotDict +from law.util import law_src_path, merge_dicts, DotDict, rel_path from law.logger import get_logger from law.contrib.htcondor.job import HTCondorJobManager, HTCondorJobFileFactory @@ -38,28 +38,38 @@ def create_job_manager(self, **kwargs): def create_job_file_factory(self, **kwargs): return self.task.htcondor_create_job_file_factory(**kwargs) - def create_job_file(self, job_num, branches): + def create_job_file(self, *args): task = self.task - # the file postfix is pythonic range made from branches, e.g. [0, 1, 2, 4] -> "_0To5" - postfix = "_{}To{}".format(branches[0], branches[-1] + 1) + grouped_submission = len(args) == 1 + if grouped_submission: + submit_jobs = args[0] + else: + job_num, branches = args # create the config c = self.job_file_factory.get_config() - c.input_files = DeprecatedInputFiles() + c.input_files = {} c.output_files = [] c.render_variables = {} c.custom_content = [] - # get the actual wrapper file that will be executed by the remote job - wrapper_file = task.htcondor_wrapper_file() + # get the actual wrapper and job file that will be executed by the remote job law_job_file = task.htcondor_job_file() - if wrapper_file and get_path(wrapper_file) != get_path(law_job_file): + c.input_files["job_file"] = law_job_file + if grouped_submission: + # grouped wrapper file + wrapper_file = task.htcondor_group_wrapper_file() c.input_files["executable_file"] = wrapper_file c.executable = wrapper_file else: - c.executable = law_job_file - c.input_files["job_file"] = law_job_file + # standard wrapper file + wrapper_file = task.htcondor_wrapper_file() + if wrapper_file and get_path(wrapper_file) != get_path(law_job_file): + c.input_files["executable_file"] = wrapper_file + c.executable = wrapper_file + else: + c.executable = law_job_file # collect task parameters exclude_args = ( @@ -70,7 +80,7 @@ def create_job_file(self, job_num, branches): {"workflow", "effective_workflow"} ) proxy_cmd = ProxyCommand( - task.as_branch(branches[0]), + task.as_branch(0 if grouped_submission else branches[0]), exclude_task_args=exclude_args, exclude_global_args=["workers", "local-scheduler", task.task_family + "-*"], ) @@ -79,17 +89,34 @@ def create_job_file(self, job_num, branches): for key, value in OrderedDict(task.htcondor_cmdline_args()).items(): proxy_cmd.add_arg(key, value, overwrite=True) - # job script arguments - job_args = JobArguments( - task_cls=task.__class__, - task_params=proxy_cmd.build(skip_run=True), - branches=branches, - workers=task.job_workers, - auto_retry=False, - dashboard_data=self.dashboard.remote_hook_data( - job_num, self.job_data.attempts.get(job_num, 0)), - ) - c.arguments = job_args.join() + # the file postfix is pythonic range made from branches, e.g. [0, 1, 2, 4] -> "_0To5" + if grouped_submission: + c.postfix = [ + "_{}To{}".format(branches[0], branches[-1] + 1) + for branches in submit_jobs.values() + ] + else: + c.postfix = "_{}To{}".format(branches[0], branches[-1] + 1) + + # job script arguments per job number + def get_job_args(job_num, branches): + return JobArguments( + task_cls=task.__class__, + task_params=proxy_cmd.build(skip_run=True), + branches=branches, + workers=task.job_workers, + auto_retry=False, + dashboard_data=self.dashboard.remote_hook_data( + job_num, self.job_data.attempts.get(job_num, 0)), + ) + + if grouped_submission: + c.arguments = [ + get_job_args(job_num, branches).join() + for job_num, branches in submit_jobs.items() + ] + else: + c.arguments = get_job_args(job_num, branches).join() # add the bootstrap file bootstrap_file = task.htcondor_bootstrap_file() @@ -128,14 +155,17 @@ def create_job_file(self, job_num, branches): c.custom_content.append(("initialdir", output_dir.abspath)) # task hook - c = task.htcondor_job_config(c, job_num, branches) + if grouped_submission: + c = task.htcondor_job_config(c, list(submit_jobs.keys()), list(submit_jobs.values())) + else: + c = task.htcondor_job_config(c, job_num, branches) # when the output dir is not local, direct output files are not possible if not output_dir_is_local: del c.output_files[:] # build the job file and get the sanitized config - job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__) + job_file, c = self.job_file_factory(grouped_submission=grouped_submission, **c.__dict__) # get the location of the custom local log file if any abs_log_file = None @@ -145,6 +175,26 @@ def create_job_file(self, job_num, branches): # return job and log files return {"job": job_file, "config": c, "log": abs_log_file} + def _submit_group(self, *args, **kwargs): + job_ids, submission_data = super(HTCondorWorkflowProxy, self)._submit_group(*args, **kwargs) + + # when a log file is present, replace certain htcondor variables + for i, (job_id, data) in enumerate(zip(job_ids, submission_data.values())): + log = data.get("log") + if not log: + continue + # replace Cluster, ClusterId, Process, ProcId + c, p = job_id.split(".") + log = log.replace("$(Cluster)", c).replace("$(ClusterId)", c) + log = log.replace("$(Process)", p).replace("$(ProcId)", p) + # replace law_job_postfix + if data["config"].postfix_output_files and data["config"].postfix: + log = log.replace("$(law_job_postfix)", data["config"].postfix[i]) + # add back + data["log"] = log + + return job_ids, submission_data + def destination_info(self): info = super(HTCondorWorkflowProxy, self).destination_info() @@ -199,11 +249,24 @@ def htcondor_workflow_requires(self): def htcondor_bootstrap_file(self): return None + def htcondor_group_wrapper_file(self): + # only used for grouped submissions + return JobInputFile( + path=rel_path(__file__, "htcondor_wrapper.sh"), + copy=True, + render_local=True, + ) + def htcondor_wrapper_file(self): return None def htcondor_job_file(self): - return JobInputFile(law_src_path("job", "law_job.sh")) + return JobInputFile( + path=law_src_path("job", "law_job.sh"), + copy=True, + share=True, + render_job=True, + ) def htcondor_stageout_file(self): return None diff --git a/law/job/base.py b/law/job/base.py index ed2fa3c4..aeeb57b0 100644 --- a/law/job/base.py +++ b/law/job/base.py @@ -111,41 +111,63 @@ class BaseJobManager(six.with_metaclass(ABCMeta, object)): A dictionary that defines to coloring styles per job status that is used in :py:meth:`status_line`. - .. py:classattribute:: job_grouping + .. py:classattribute:: job_grouping_submit type: bool - Whether this manager implementation groups jobs into single interactions for submission and - status queries. In general, this means that the submission of a single job file can result - in multiple jobs on the remote batch system. + Whether this manager implementation groups jobs into single interactions for submission. In + general, this means that the submission of a single job file can result in multiple jobs on + the remote batch system. + + .. py:classattribute:: job_grouping_cancel + + type: bool + + Whether this manager implementation groups jobs into single interactions for cancelling + jobs. + + .. py:classattribute:: job_grouping_cleanup + + type: bool + + Whether this manager implementation groups jobs into single interactions for cleaning up + jobs. + + .. py:classattribute:: job_grouping_query + + type: bool + + Whether this manager implementation groups jobs into single interactions for querying job + statuses. .. py:classattribute:: chunk_size_submit type: int - The default chunk size value when no value is given in :py:meth:`submit_batch`. When the - value evaluates to *False*, no chunking is allowed. + The default chunk size value when no value is given in :py:meth:`submit_batch`. If the value + evaluates to *False*, no chunking is allowed. + .. py:classattribute:: chunk_size_cancel type: int - The default chunk size value when no value is given in :py:meth:`cancel_batch`. When the - value evaluates to *False*, no chunking is allowed. + The default chunk size value when no value is given in :py:meth:`cancel_batch`. If the value + evaluates to *False*, no chunking is allowed. .. py:classattribute:: chunk_size_cleanup type: int - The default chunk size value when no value is given in :py:meth:`cleanup_batch`. When the + The default chunk size value when no value is given in :py:meth:`cleanup_batch`. If the value evaluates to *False*, no chunking is allowed. .. py:classattribute:: chunk_size_query type: int - The default chunk size value when no value is given in :py:meth:`query_batch`. When the - value evaluates to *False*, no chunking is allowed. + The default chunk size value when no value is given in :py:meth:`query_batch`. If the value + evaluates to *False*, no chunking is allowed. """ PENDING = "pending" @@ -165,8 +187,11 @@ class BaseJobManager(six.with_metaclass(ABCMeta, object)): FAILED: ({}, {}, {"color": "red", "style": "bright"}), } - # job grouping settings - job_grouping = False + # job grouping settings per method + job_grouping_submit = False + job_grouping_cancel = False + job_grouping_cleanup = False + job_grouping_query = False # chunking settings for unbatched methods # disabled by default @@ -238,10 +263,10 @@ def query(self): def group_job_ids(self, job_ids): """ Hook that needs to be implemented if the job mananger supports grouping of jobs, i.e., when - :py:attr:`job_grouping` is *True*, and potentially used during status queries, job - cancellation and removal. If so, it should take a sequence of *job_ids* and return a - dictionary mapping ids of group jobs (used for queries etc) to the corresponding lists of - original job ids, with an arbitrary grouping mechanism. + :py:attr:`job_grouping_submit`, :py:attr:`job_grouping_query`, etc. is *True*, and + potentially used during status queries, job cancellation and removal. If so, it should take + a sequence of *job_ids* and return a dictionary mapping ids of group jobs (used for queries + etc) to the corresponding lists of original job ids, with an arbitrary grouping mechanism. """ raise NotImplementedError( "internal error, {}.group_job_ids not implemented".format(self.__class__.__name__), diff --git a/law/job/law_job.sh b/law/job/law_job.sh index f3a35e84..16238f1d 100755 --- a/law/job/law_job.sh +++ b/law/job/law_job.sh @@ -602,10 +602,10 @@ start_law_job() { law_job "$@" elif command -v tee &> /dev/null; then set -o pipefail - echo -e "" > "${log_file}" + echo "---" >> "${log_file}" law_job "$@" 2>&1 | tee -a "${log_file}" else - echo -e "" > "${log_file}" + echo "---" >> "${log_file}" law_job "$@" &>> "${log_file}" fi fi diff --git a/law/workflow/remote.py b/law/workflow/remote.py index 7450415d..d32ef8b4 100644 --- a/law/workflow/remote.py +++ b/law/workflow/remote.py @@ -277,11 +277,11 @@ def create_job_file_factory(self, **kwargs): def create_job_file(self, *args, **kwargs): """ Creates a job file using the :py:attr:`job_file_factory`. The expected arguments depend on - wether the job manager supports job grouping (:py:attr:`BaseJobManager.job_grouping`). If it - does, two arguments containing the job number (*job_num*) and the list of branch numbers - (*branches*) covered by the job. If job grouping is supported, a single dictionary mapping - job numbers to covered branch values must be passed. In any case, the path(s) of job files - are returned. + whether the job manager supports job grouping during submission + (:py:attr:`BaseJobManager.job_grouping_submit`). If it does, two arguments containing the + job number (*job_num*) and the list of branch numbers (*branches*) covered by the job. If + job grouping is supported, a single dictionary mapping job numbers to covered branch values + must be passed. In any case, the path(s) of job files are returned. This method must be implemented by inheriting classes. """ @@ -307,10 +307,11 @@ def _destination_info_postfix(self): dst_info = ", {}".format(dst_info) return dst_info - def get_extra_submission_data(self, job_file, config, log=None): + def get_extra_submission_data(self, job_file, job_id, config, log=None): """ - Hook that is called after job submission with the *job_file*, the submission *config* and - an optional *log* file to return extra data that is saved in the central job data. + Hook that is called after job submission with the *job_file*, the returned *job_id*, the + submission *config* and an optional *log* file to return extra data that is saved in the + central job data. """ extra = {} if log: @@ -419,9 +420,7 @@ def _status_error_pairs(self, job_num, job_data): ("code", job_data["code"]), ("error", job_data.get("error")), ("job script error", self.job_error_messages.get(job_data["code"], no_value)), - # ("log", job_data["extra"].get("log", no_value)), - # backwards compatibility for some limited time - ("log", job_data.get("extra", {}).get("log", no_value)), + ("log", job_data["extra"].get("log", no_value)), ]) def _print_status_errors(self, failed_jobs): @@ -675,7 +674,7 @@ def cancel(self): # cancel jobs task.publish_message("going to cancel {} jobs".format(len(job_ids))) - if self.job_manager.job_grouping: + if self.job_manager.job_grouping_cancel: errors = self.job_manager.cancel_group(job_ids, **cancel_kwargs) else: errors = self.job_manager.cancel_batch(job_ids, **cancel_kwargs) @@ -720,7 +719,7 @@ def cleanup(self): # cleanup jobs task.publish_message("going to cleanup {} jobs".format(len(job_ids))) - if self.job_manager.job_grouping: + if self.job_manager.job_grouping_cleanup: errors = self.job_manager.cleanup_group(job_ids, **cleanup_kwargs) else: errors = self.job_manager.cleanup_batch(job_ids, **cleanup_kwargs) @@ -813,7 +812,7 @@ def submit(self, retry_jobs=None): len(submit_jobs), self.workflow_type, dst_info)) # job file preparation and submission - if self.job_manager.job_grouping: + if self.job_manager.job_grouping_submit: job_ids, submission_data = self._submit_group(submit_jobs) else: job_ids, submission_data = self._submit_batch(submit_jobs) @@ -829,7 +828,8 @@ def submit(self, retry_jobs=None): # set the job id in the job data job_data = self.job_data.jobs[job_num] job_data["job_id"] = job_id - extra = self.get_extra_submission_data(data["job"], data["config"], log=data.get("log")) + extra = self.get_extra_submission_data(data["job"], job_id, data["config"], + log=data.get("log")) job_data["extra"].update(extra) new_submission_data[job_num] = copy.deepcopy(job_data) @@ -978,7 +978,7 @@ def poll(self): if i > 0: time.sleep(task.poll_interval * 60) - # handle scheduler messages, which could change task some parameters + # handle scheduler messages, which could change some task parameters task._handle_scheduler_messages() # walltime exceeded? @@ -1018,7 +1018,7 @@ def poll(self): # query job states job_ids = [self.job_data.jobs[job_num]["job_id"] for job_num in active_jobs] - if self.job_manager.job_grouping: + if self.job_manager.job_grouping_query: query_data = self.job_manager.query_group(job_ids, **query_kwargs) else: query_data = self.job_manager.query_batch(job_ids, **query_kwargs) @@ -1251,11 +1251,11 @@ class BaseRemoteWorkflow(BaseWorkflow): """ Opinionated base class for remote workflows that works in 2 phases: - 1. Create and submit *m* jobs that process *n* tasks. Submission information (mostly job ids) - is stored in the so-called *jobs* file, which is an output target of this workflow. + 1. Create and submit *m* jobs that process *n* tasks. Submission information (mostly job ids) + is stored in the so-called *jobs* file, which is an output target of this workflow. - 2. Use the job data and start status polling. When done, status data is stored alongside the - submission information in the same *jobs* file. + 2. Use the job data and start status polling. When done, status data is stored alongside the + submission information in the same *jobs* file. .. py:classattribute:: check_unreachable_acceptance