From df34f227af0d13363f5c125cbafb581af9426376 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 16 Dec 2024 09:23:44 +0100 Subject: [PATCH] Port latest changes from master. --- law/cli/completion.sh | 21 ++- law/contrib/arc/workflow.py | 63 ++++++- law/contrib/cms/workflow.py | 64 ++++++-- law/contrib/gfal/target.py | 2 - law/contrib/glite/workflow.py | 65 +++++++- law/contrib/htcondor/htcondor_wrapper.sh | 7 + law/contrib/htcondor/job.py | 65 ++++++-- law/contrib/htcondor/workflow.py | 121 +++++++++++--- law/contrib/lsf/workflow.py | 98 ++++++++--- law/contrib/slurm/workflow.py | 119 +++++++++++--- law/job/base.py | 3 +- law/job/law_job.sh | 8 +- law/patches.py | 11 +- law/target/collection.py | 201 ++++++++++++++--------- law/workflow/remote.py | 200 +++++++++++----------- 15 files changed, 754 insertions(+), 294 deletions(-) diff --git a/law/cli/completion.sh b/law/cli/completion.sh index 0b93f453..f9217969 100755 --- a/law/cli/completion.sh +++ b/law/cli/completion.sh @@ -215,12 +215,17 @@ _law_complete() { fi } -# run bashcompinit in zsh, export the completion function in bash -if [ ! -z "${ZSH_VERSION}" ]; then - autoload -Uz +X compinit && compinit - autoload -Uz +X bashcompinit && bashcompinit -else - export -f _law_complete -fi +# export the completion function in bash +[ ! -z "${BASH_VERSION}" ] && export -f _law_complete + +# enable the completion if not explicitly disabled +if [ "${LAW_CLI_SKIP_COMPLETION}" != "1" ]; then + # in zsh, run bashcompinit in zsh, export the completion function in bash + if [ ! -z "${ZSH_VERSION}" ]; then + autoload -Uz +X compinit && compinit + autoload -Uz +X bashcompinit && bashcompinit + fi -complete -o bashdefault -o default -F _law_complete law + # add to all known executables + complete -o bashdefault -o default -F _law_complete law +fi diff --git a/law/contrib/arc/workflow.py b/law/contrib/arc/workflow.py index 12d573cd..64e31e6f 100644 --- a/law/contrib/arc/workflow.py +++ b/law/contrib/arc/workflow.py @@ -9,19 +9,20 @@ __all__ = ["ARCWorkflow"] import os -import pathlib import abc +import contextlib +import pathlib from law.config import Config -from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy +from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, PollData from law.job.base import JobArguments, JobInputFile from law.task.proxy import ProxyCommand from law.target.file import get_path from law.target.local import LocalFileTarget from law.parameter import CSVParameter -from law.util import law_src_path, merge_dicts, DotDict, InsertableDict +from law.util import no_value, law_src_path, merge_dicts, DotDict, InsertableDict from law.logger import get_logger -from law._types import Type +from law._types import Type, Generator from law.contrib.wlcg import WLCGDirectoryTarget from law.contrib.arc.job import ARCJobManager, ARCJobFileFactory @@ -125,10 +126,10 @@ def create_job_file( if dashboard_file: c.input_files["dashboard_file"] = dashboard_file - # log files - c.log = None - c.stdout = None - c.stderr = None + # initialize logs with empty values and defer to defaults later + c.log = no_value + c.stdout = no_value + c.stderr = no_value if task.transfer_logs: log_file = "stdall.txt" c.stdout = log_file @@ -145,6 +146,12 @@ def create_job_file( # build the job file and get the sanitized config job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__) # type: ignore[misc] + # logging defaults + c.log = c.log or None + c.stdout = c.stdout or None + c.stderr = c.stderr or None + c.custom_log_file = c.custom_log_file or None + # determine the custom log file uri if set abs_log_file = None if c.custom_log_file: @@ -193,6 +200,15 @@ class ARCWorkflow(BaseRemoteWorkflow): def arc_output_directory(self) -> WLCGDirectoryTarget: ... + @contextlib.contextmanager + def arc_workflow_run_context(self) -> Generator[None, None, None]: + """ + Hook to provide a context manager in which the workflow run implementation is placed. This + can be helpful in situations where resurces should be acquired before and released after + running a workflow. + """ + yield + def arc_workflow_requires(self) -> DotDict: return DotDict() @@ -214,6 +230,13 @@ def arc_output_postfix(self) -> str: def arc_output_uri(self) -> str: return self.arc_output_directory().uri(return_all=False) # type: ignore[return-value] + def arc_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]: + """ + Hook to define resources for a specific job with number *job_num*, processing *branches*. + This method should return a dictionary. + """ + return {} + def arc_job_manager_cls(self) -> Type[ARCJobManager]: return ARCJobManager @@ -254,12 +277,36 @@ def arc_job_config( ) -> ARCJobFileFactory.Config: return config + def arc_dump_intermediate_job_data(self) -> bool: + """ + Whether to dump intermediate job data to the job submission file while jobs are being + submitted. + """ + return True + + def arc_post_submit_delay(self) -> int | float: + """ + Configurable delay in seconds to wait after submitting jobs and before starting the status + polling. + """ + return self.poll_interval * 60 + def arc_check_job_completeness(self) -> bool: return False def arc_check_job_completeness_delay(self) -> float | int: return 0.0 + def arc_poll_callback(self, poll_data: PollData) -> None: + """ + Configurable callback that is called after each job status query and before potential + resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) + that can be changed within this method. + If *False* is returned, the polling loop is gracefully terminated. Returning any other value + does not have any effect. + """ + return + def arc_use_local_scheduler(self) -> bool: return True diff --git a/law/contrib/cms/workflow.py b/law/contrib/cms/workflow.py index 466dcba0..bf0b33b3 100644 --- a/law/contrib/cms/workflow.py +++ b/law/contrib/cms/workflow.py @@ -9,19 +9,20 @@ __all__ = ["CrabWorkflow"] -import pathlib import uuid import abc +import contextlib +import pathlib from law.config import Config -from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, JobData +from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, JobData, PollData from law.job.base import JobArguments, JobInputFile from law.target.file import get_path, get_scheme, remove_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget, LocalFileTarget from law.task.proxy import ProxyCommand from law.util import no_value, law_src_path, merge_dicts, human_duration, DotDict, InsertableDict from law.logger import get_logger -from law._types import Any, Type +from law._types import Any, Type, Generator from law.contrib.wlcg import check_vomsproxy_validity, get_myproxy_info from law.contrib.cms.job import CrabJobManager, CrabJobFileFactory @@ -165,8 +166,7 @@ def create_job_file_group( # log file if task.transfer_logs: - log_file = "stdall.txt" - c.custom_log_file = log_file + c.custom_log_file = "stdall.txt" # task hook c = task.crab_job_config(c, list(submit_jobs.keys()), list(submit_jobs.values())) # type: ignore[call-arg, arg-type] # noqa @@ -262,6 +262,22 @@ def crab_work_area(self) -> str | LocalDirectoryTarget: # relative to the job file directory return "" + @contextlib.contextmanager + def crab_workflow_run_context(self) -> Generator[None, None, None]: + """ + Hook to provide a context manager in which the workflow run implementation is placed. This + can be helpful in situations where resurces should be acquired before and released after + running a workflow. + """ + yield + + def crab_workflow_requires(self) -> DotDict: + """ + Hook to define requirements for the workflow itself and that need to be resolved before any + submission can happen. + """ + return DotDict() + def crab_job_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile: """ Hook to return the location of the job file that is executed on job nodes. @@ -284,13 +300,6 @@ def crab_stageout_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputF """ return None - def crab_workflow_requires(self) -> DotDict: - """ - Hook to define requirements for the workflow itself and that need to be resolved before any - submission can happen. - """ - return DotDict() - def crab_output_postfix(self) -> str: """ Hook to define the postfix of outputs, for instance such that workflows with different @@ -304,6 +313,13 @@ def crab_output_uri(self) -> str: """ return self.crab_output_directory().uri(return_all=False) # type: ignore[return-value] + def crab_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]: + """ + Hook to define resources for a specific job with number *job_num*, processing *branches*. + This method should return a dictionary. + """ + return {} + def crab_job_manager_cls(self) -> Type[CrabJobManager]: """ Hook to define a custom job managet class to use. @@ -359,6 +375,20 @@ def crab_job_config( """ return config + def crab_dump_intermediate_job_data(self) -> bool: + """ + Whether to dump intermediate job data to the job submission file while jobs are being + submitted. + """ + return True + + def crab_post_submit_delay(self) -> float | int: + """ + Configurable delay in seconds to wait after submitting jobs and before starting the status + polling. + """ + return self.poll_interval * 60 + def crab_check_job_completeness(self) -> bool: """ Hook to define whether after job report successful completion, the job manager should check @@ -374,6 +404,16 @@ def crab_check_job_completeness_delay(self) -> float | int: """ return 0.0 + def crab_poll_callback(self, poll_data: PollData) -> None: + """ + Configurable callback that is called after each job status query and before potential + resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) + that can be changed within this method. + If *False* is returned, the polling loop is gracefully terminated. Returning any other value + does not have any effect. + """ + return + def crab_cmdline_args(self) -> dict[str, str]: """ Hook to add additional cli parameters to "law run" commands executed on job nodes. diff --git a/law/contrib/gfal/target.py b/law/contrib/gfal/target.py index 2b6bb5e3..53179310 100644 --- a/law/contrib/gfal/target.py +++ b/law/contrib/gfal/target.py @@ -10,7 +10,6 @@ import os import sys -import gc import pathlib import contextlib import stat as _stat @@ -121,7 +120,6 @@ def context(self) -> Iterator[gfal2.Gfal2Context]: finally: if self.atomic_contexts and pid in self._contexts: del self._contexts[pid] - gc.collect() @contextlib.contextmanager def transfer_parameters(self, ctx: gfal2.Gfal2Context) -> Iterator[gfal2.TransferParameters]: diff --git a/law/contrib/glite/workflow.py b/law/contrib/glite/workflow.py index 82375fe9..a511b449 100644 --- a/law/contrib/glite/workflow.py +++ b/law/contrib/glite/workflow.py @@ -12,19 +12,20 @@ import os import sys import abc +import contextlib import pathlib import law from law.config import Config -from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy +from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, PollData from law.job.base import JobArguments, JobInputFile from law.task.proxy import ProxyCommand from law.target.file import get_path from law.target.local import LocalFileTarget from law.parameter import CSVParameter -from law.util import law_src_path, merge_dicts, DotDict, InsertableDict +from law.util import no_value, law_src_path, merge_dicts, DotDict, InsertableDict from law.logger import get_logger -from law._types import Type, Any +from law._types import Type, Any, Generator from law.contrib.wlcg import WLCGDirectoryTarget, delegate_vomsproxy_glite from law.contrib.glite.job import GLiteJobManager, GLiteJobFileFactory @@ -144,9 +145,9 @@ def create_job_file( if dashboard_file: c.input_files["dashboard_file"] = dashboard_file - # log file - c.stdout = None - c.stderr = None + # initialize logs with empty values and defer to defaults later + c.stdout = no_value + c.stderr = no_value if task.transfer_logs: log_file = "stdall.txt" c.stdout = log_file @@ -162,6 +163,11 @@ def create_job_file( # build the job file and get the sanitized config job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__) # type: ignore[misc] + # logging defaults + c.stdout = c.stdout or None + c.stderr = c.stderr or None + c.custom_log_file = c.custom_log_file or None + # determine the custom log file uri if set abs_log_file = None if c.custom_log_file: @@ -224,6 +230,15 @@ def glite_job_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile: def glite_stageout_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile | None: return None + @contextlib.contextmanager + def glite_workflow_run_context(self) -> Generator[None, None, None]: + """ + Hook to provide a context manager in which the workflow run implementation is placed. This + can be helpful in situations where resurces should be acquired before and released after + running a workflow. + """ + yield + def glite_workflow_requires(self) -> DotDict: return DotDict() @@ -233,6 +248,13 @@ def glite_output_postfix(self) -> str: def glite_output_uri(self) -> str: return self.glite_output_directory().uri(return_all=False) # type: ignore[return-value] + def glite_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]: + """ + Hook to define resources for a specific job with number *job_num*, processing *branches*. + This method should return a dictionary. + """ + return {} + def glite_delegate_proxy(self, endpoint: str) -> str: return delegate_vomsproxy_glite( # type: ignore[attr-defined] endpoint, @@ -273,15 +295,44 @@ def glite_create_job_file_factory(self, **kwargs) -> GLiteJobFileFactory: return factory_cls(**kwargs) - def glite_job_config(self, config, job_num, branches): + def glite_job_config( + self, + config: GLiteJobFileFactory.Config, + job_num: int, + branches: list[int], + ) -> GLiteJobFileFactory.Config: return config + def glite_dump_intermediate_job_data(self) -> bool: + """ + Whether to dump intermediate job data to the job submission file while jobs are being + submitted. + """ + return True + + def glite_post_submit_delay(self) -> int | float: + """ + Configurable delay in seconds to wait after submitting jobs and before starting the status + polling. + """ + return self.poll_interval * 60 + def glite_check_job_completeness(self) -> bool: return False def glite_check_job_completeness_delay(self) -> float | int: return 0.0 + def glite_poll_callback(self, poll_data: PollData) -> None: + """ + Configurable callback that is called after each job status query and before potential + resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) + that can be changed within this method. + If *False* is returned, the polling loop is gracefully terminated. Returning any other value + does not have any effect. + """ + return + def glite_use_local_scheduler(self) -> bool: return True diff --git a/law/contrib/htcondor/htcondor_wrapper.sh b/law/contrib/htcondor/htcondor_wrapper.sh index 6b9af908..a9b065bf 100644 --- a/law/contrib/htcondor/htcondor_wrapper.sh +++ b/law/contrib/htcondor/htcondor_wrapper.sh @@ -142,6 +142,13 @@ action() { local file_postfix="$1" local log_file="$2" + # create log directory + if [ ! -z "${log_file}" ]; then + local log_dir="$( dirname "${log_file}" )" + [ ! -d "${log_dir}" ] && mkdir -p "${log_dir}" + fi + + # run the wrapper function if [ -z "${log_file}" ]; then htcondor_wrapper "$@" elif command -v tee &> /dev/null; then diff --git a/law/contrib/htcondor/job.py b/law/contrib/htcondor/job.py index 98b22ed1..2b366eb9 100644 --- a/law/contrib/htcondor/job.py +++ b/law/contrib/htcondor/job.py @@ -538,7 +538,7 @@ def __init__( executable: str | None = None, arguments: str | Sequence[str] | None = None, input_files: dict[str, str | pathlib.Path | JobInputFile] | None = None, - output_files: Sequence[str] | None = None, + output_files: dict[str | pathlib.Path, str | pathlib.Path] | None = None, log: str = "log.txt", stdout: str = "stdout.txt", stderr: str = "stderr.txt", @@ -575,7 +575,7 @@ def __init__( self.executable = executable self.arguments = arguments self.input_files = input_files or {} - self.output_files = output_files or [] + self.output_files = output_files or {} self.log = log self.stdout = stdout self.stderr = stderr @@ -611,23 +611,36 @@ def create( if not c.universe: raise ValueError("universe must not be empty") + # ensure that output_files is a dict mapping remote paths on the job node + # to local paths on the submission node + # (relative local paths will be resolved relative to the initial dir) + c.output_files = { + str(k): str(v) + for k, v in ( + c.output_files.items() + if isinstance(c.output_files, dict) + else zip(c.output_files, c.output_files) + ) + } + # ensure that the custom log file is an output file - if c.custom_log_file and c.custom_log_file not in c.output_files: - c.output_files.append(c.custom_log_file) + if c.custom_log_file: + c.custom_log_file = str(c.custom_log_file) + custom_log_file_base = os.path.basename(c.custom_log_file) + if custom_log_file_base not in c.output_files: + c.output_files[custom_log_file_base] = c.custom_log_file + c.custom_log_file = custom_log_file_base # postfix certain output files postfix = "$(law_job_postfix)" if grouped_submission else c.postfix - c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: skip_postfix_cre = re.compile(r"^(/dev/).*$") skip_postfix = lambda s: bool(skip_postfix_cre.match(str(s))) - c.output_files = [ - path if skip_postfix(path) else self.postfix_output_file(path, postfix) - for path in c.output_files - ] + add_postfix = lambda s: s if skip_postfix(s) else self.postfix_output_file(s, postfix) + c.output_files = {add_postfix(k): add_postfix(v) for k, v in c.output_files.items()} for attr in ["log", "stdout", "stderr", "custom_log_file"]: - if c[attr] and not skip_postfix(c[attr]): - c[attr] = self.postfix_output_file(c[attr], postfix) + if c[attr]: + c[attr] = add_postfix(c[attr]) # ensure that all input files are JobInputFile objects c.input_files = { @@ -762,9 +775,17 @@ def encode_list(items: Any, sep: str = " ", quote: bool = True) -> str: s = "\"{s}\"" return s + # helper to encode dicts + def encode_dict(d: dict, sep: str = " ; ", quote: bool = True) -> str: + s = sep.join(f"{k} = {v}" for k, v in d.items()) + if quote: + s = f"\"{s}\"" # noqa: Q003 + return s + # job file content content: list[str | tuple[str, Any]] = [] content.append(("universe", c.universe)) + output_remaps = {} if c.command: cmd = quote_cmd(c.command) if isinstance(c.command, (list, tuple)) else c.command content.append(("executable", cmd)) @@ -773,9 +794,17 @@ def encode_list(items: Any, sep: str = " ", quote: bool = True) -> str: if c.log: content.append(("log", c.log)) if c.stdout: - content.append(("output", c.stdout)) + c.stdout = str(c.stdout) + stdout_base = os.path.basename(c.stdout) + content.append(("output", stdout_base)) + if stdout_base != c.stdout: + output_remaps[stdout_base] = c.stdout if c.stderr: - content.append(("error", c.stderr)) + c.stderr = str(c.stderr) + stderr_base = os.path.basename(c.stderr) + content.append(("error", stderr_base)) + if stderr_base != c.stderr: + output_remaps[stderr_base] = c.stderr if c.input_files or c.output_files: content.append(("should_transfer_files", "YES")) if c.input_files: @@ -790,11 +819,19 @@ def encode_list(items: Any, sep: str = " ", quote: bool = True) -> str: ))) if c.output_files: content.append(("transfer_output_files", encode_list( - make_unique(c.output_files), + c.output_files.keys(), sep=",", quote=False, ))) + # add mapping to local paths when different + output_remaps.update({ + remote_path: local_path + for remote_path, local_path in c.output_files.items() + if remote_path != local_path + }) content.append(("when_to_transfer_output", "ON_EXIT")) + if output_remaps: + content.append(("transfer_output_remaps", encode_dict(output_remaps))) if c.notification: content.append(("notification", c.notification)) diff --git a/law/contrib/htcondor/workflow.py b/law/contrib/htcondor/workflow.py index 253e5a66..b2855a19 100644 --- a/law/contrib/htcondor/workflow.py +++ b/law/contrib/htcondor/workflow.py @@ -10,20 +10,21 @@ import os import abc +import contextlib import pathlib import luigi # type: ignore[import-untyped] from law.config import Config -from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy +from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, PollData from law.job.base import JobArguments, JobInputFile from law.task.proxy import ProxyCommand from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget, LocalFileTarget from law.parameter import NO_STR -from law.util import law_src_path, rel_path, merge_dicts, DotDict, InsertableDict +from law.util import no_value, law_src_path, rel_path, merge_dicts, DotDict, InsertableDict from law.logger import get_logger -from law._types import Type, Any +from law._types import Type, Any, Generator from law.contrib.htcondor.job import HTCondorJobManager, HTCondorJobFileFactory @@ -56,7 +57,7 @@ def create_job_file( # create the config c = self.job_file_factory.get_config() # type: ignore[union-attr] c.input_files = {} - c.output_files = [] + c.output_files = {} c.render_variables = {} c.custom_content = [] @@ -140,26 +141,39 @@ def get_job_args(job_num, branches): if dashboard_file: c.input_files["dashboard_file"] = dashboard_file - # logging - # we do not use htcondor's logging mechanism since it might require that the submission - # directory is present when it retrieves logs, and therefore we use a custom log file - c.log = None - c.stdout = None - c.stderr = None + # initialize logs with empty values and defer to defaults later + c.log = no_value + c.stdout = no_value + c.stderr = no_value if task.transfer_logs: c.custom_log_file = "stdall.txt" + # helper to cast directory paths to local directory targets if possible + def cast_dir( + output_dir: FileSystemDirectoryTarget | str | pathlib.Path, + touch: bool = True, + ) -> FileSystemDirectoryTarget | str: + if not isinstance(output_dir, FileSystemDirectoryTarget): + path = get_path(output_dir) + if get_scheme(path) not in (None, "file"): + return str(output_dir) + output_dir = LocalDirectoryTarget(path) + if touch: + output_dir.touch() + return output_dir + # when the output dir is local, we can run within this directory for easier output file # handling and use absolute paths for input files - output_dir = task.htcondor_output_directory() - if not isinstance(output_dir, FileSystemDirectoryTarget): - output_dir = get_path(output_dir) - if get_scheme(output_dir) in (None, "file"): - output_dir = LocalDirectoryTarget(output_dir) + output_dir = cast_dir(task.htcondor_output_directory()) output_dir_is_local = isinstance(output_dir, LocalDirectoryTarget) if output_dir_is_local: c.absolute_paths = True - c.custom_content.append(("initialdir", output_dir.abspath)) + c.custom_content.append(("initialdir", output_dir.abspath)) # type: ignore[union-attr] # noqa + + # prepare the log dir + log_dir_orig = task.htcondor_log_directory() + log_dir = cast_dir(log_dir_orig) if log_dir_orig else output_dir + log_dir_is_local = isinstance(log_dir, LocalDirectoryTarget) # task hook if grouped_submission: @@ -167,17 +181,35 @@ def get_job_args(job_num, branches): else: c = task.htcondor_job_config(c, job_num, branches) + # logging defaults + # we do not use htcondor's logging mechanism since it might require that the submission + # directory is present when it retrieves logs, and therefore we use a custom log file + # also, stderr and stdout can be remapped (moved) by htcondor, so use a different behavior + def log_path(path: str | pathlib.Path) -> str | None: + if not path or not log_dir_is_local: + return None + log_target = log_dir.child(path, type="f") # type: ignore[union-attr] + if log_target.parent != log_dir: + log_target.parent.touch() # type: ignore[union-attr,call-arg] + return log_target.abspath + + c.log = c.log or None + c.stdout = log_path(c.stdout) + c.stderr = log_path(c.stderr) + c.custom_log_file = log_path(c.custom_log_file) + # when the output dir is not local, direct output files are not possible - if not output_dir_is_local: - del c.output_files[:] + if not output_dir_is_local and c.output_files: + c.output_files.clear() # build the job file and get the sanitized config job_file, c = self.job_file_factory(grouped_submission=grouped_submission, **c.__dict__) # type: ignore[misc] # noqa - # get the location of the custom local log file if any + # get the finale, absolute location of the custom log file + # (note that c.custom_log_file is always just a basename after the factory hook) abs_log_file = None - if output_dir_is_local and c.custom_log_file: - abs_log_file = os.path.join(output_dir.abspath, c.custom_log_file) + if log_dir_is_local and c.custom_log_file: + abs_log_file = os.path.join(log_dir.abspath, c.custom_log_file) # type: ignore[union-attr] # noqa # return job and log files return {"job": job_file, "config": c, "log": abs_log_file} @@ -187,6 +219,9 @@ def _submit_group(self, *args, **kwargs) -> tuple[list[Any], dict[int, dict]]: # when a log file is present, replace certain htcondor variables for i, (job_id, (job_num, data)) in enumerate(zip(job_ids, submission_data.items())): + # skip exceptions + if isinstance(job_id, Exception): + continue log = data.get("log") if not log: continue @@ -254,9 +289,27 @@ class HTCondorWorkflow(BaseRemoteWorkflow): exclude_index = True @abc.abstractmethod - def htcondor_output_directory(self) -> FileSystemDirectoryTarget: + def htcondor_output_directory(self) -> str | pathlib.Path | FileSystemDirectoryTarget: ... + def htcondor_log_directory(self) -> str | pathlib.Path | FileSystemDirectoryTarget | None: + """ + Hook to define the location of log files if any are written. When set, it has precedence + over :py:meth:`htcondor_output_directory` for log files. + This method should return a :py:class:`FileSystemDirectoryTarget` or a value that evaluates + to *False* in case no custom log directory is desired. + """ + return None + + @contextlib.contextmanager + def htcondor_workflow_run_context(self) -> Generator[None, None, None]: + """ + Hook to provide a context manager in which the workflow run implementation is placed. This + can be helpful in situations where resurces should be acquired before and released after + running a workflow. + """ + yield + def htcondor_workflow_requires(self) -> DotDict: return DotDict() @@ -335,12 +388,36 @@ def htcondor_job_config( ) -> HTCondorJobFileFactory.Config: return config + def htcondor_dump_intermediate_job_data(self) -> bool: + """ + Whether to dump intermediate job data to the job submission file while jobs are being + submitted. + """ + return True + + def htcondor_post_submit_delay(self) -> int | float: + """ + Configurable delay in seconds to wait after submitting jobs and before starting the status + polling. + """ + return self.poll_interval * 60 + def htcondor_check_job_completeness(self) -> bool: return False def htcondor_check_job_completeness_delay(self) -> float | int: return 0.0 + def htcondor_poll_callback(self, poll_data: PollData) -> None: + """ + Configurable callback that is called after each job status query and before potential + resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) + that can be changed within this method. + If *False* is returned, the polling loop is gracefully terminated. Returning any other value + does not have any effect. + """ + return + def htcondor_use_local_scheduler(self) -> bool: return False diff --git a/law/contrib/lsf/workflow.py b/law/contrib/lsf/workflow.py index c91da11c..c17583dc 100644 --- a/law/contrib/lsf/workflow.py +++ b/law/contrib/lsf/workflow.py @@ -8,22 +8,22 @@ __all__ = ["LSFWorkflow"] -import os import abc +import contextlib import pathlib import luigi # type: ignore[import-untyped] from law.config import Config -from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy +from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, PollData from law.job.base import JobArguments, JobInputFile from law.task.proxy import ProxyCommand from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget, LocalFileTarget from law.parameter import NO_STR -from law.util import law_src_path, merge_dicts, DotDict, InsertableDict +from law.util import no_value, law_src_path, merge_dicts, DotDict, InsertableDict from law.logger import get_logger -from law._types import Type +from law._types import Type, Generator from law.contrib.lsf.job import LSFJobManager, LSFJobFileFactory @@ -119,25 +119,33 @@ def create_job_file( if dashboard_file: c.input_files["dashboard_file"] = dashboard_file - # logging - # we do not use lsf's logging mechanism since it might require that the submission - # directory is present when it retrieves logs, and therefore we use a custom log file - c.stdout = None - c.stderr = None + # initialize logs with empty values and defer to defaults later + c.stdout = no_value + c.stderr = no_value if task.transfer_logs: c.custom_log_file = "stdall.txt" - # we can use lsf's file stageout only when the output directory is local - # otherwise, one should use the stageout_file and stageout manually - output_dir = task.lsf_output_directory() - if not isinstance(output_dir, FileSystemDirectoryTarget): - output_dir = get_path(output_dir) - if get_scheme(output_dir) in (None, "file"): - output_dir = LocalDirectoryTarget(output_dir) + # helper to cast directory paths to local directory targets if possible + def cast_dir( + output_dir: FileSystemDirectoryTarget | str | pathlib.Path, + touch: bool = True, + ) -> FileSystemDirectoryTarget | str: + if not isinstance(output_dir, FileSystemDirectoryTarget): + path = get_path(output_dir) + if get_scheme(path) not in (None, "file"): + return str(output_dir) + output_dir = LocalDirectoryTarget(path) + if touch: + output_dir.touch() + return output_dir + + # when the output dir is local, we can run within this directory for easier output file + # handling and use absolute paths for input files + output_dir = cast_dir(task.lsf_output_directory()) output_dir_is_local = isinstance(output_dir, LocalDirectoryTarget) if output_dir_is_local: c.absolute_paths = True - c.cwd = output_dir.abspath + c.cwd = output_dir.abspath # type: ignore[union-attr] # job name c.job_name = f"{task.live_task_id}{postfix}" @@ -152,10 +160,17 @@ def create_job_file( # build the job file and get the sanitized config job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__) # type: ignore[misc] + # logging defaults + # we do not use lsf's logging mechanism since it might require that the submission + # directory is present when it retrieves logs, and therefore we use a custom log file + c.stdout = c.stdout or None + c.stderr = c.stderr or None + c.custom_log_file = c.custom_log_file or None + # get the location of the custom local log file if any abs_log_file = None if output_dir_is_local and c.custom_log_file: - abs_log_file = os.path.join(output_dir.abspath, c.custom_log_file) + abs_log_file = output_dir.child(c.custom_log_file, type="f").abspath # type: ignore[union-attr] # noqa # return job and log files return {"job": job_file, "config": c, "log": abs_log_file} @@ -198,9 +213,23 @@ class LSFWorkflow(BaseRemoteWorkflow): exclude_index = True @abc.abstractmethod - def lsf_output_directory(self) -> FileSystemDirectoryTarget: + def lsf_output_directory(self) -> str | pathlib.Path | FileSystemDirectoryTarget: + """ + Hook to define the location of submission output files, such as the json files containing + job data, and optional log files. + This method should return a :py:class:`FileSystemDirectoryTarget`. + """ ... + @contextlib.contextmanager + def lsf_workflow_run_context(self) -> Generator[None, None, None]: + """ + Hook to provide a context manager in which the workflow run implementation is placed. This + can be helpful in situations where resurces should be acquired before and released after + running a workflow. + """ + yield + def lsf_workflow_requires(self) -> DotDict: return DotDict() @@ -219,6 +248,13 @@ def lsf_stageout_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFi def lsf_output_postfix(self) -> str: return "" + def lsf_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]: + """ + Hook to define resources for a specific job with number *job_num*, processing *branches*. + This method should return a dictionary. + """ + return {} + def lsf_job_manager_cls(self) -> Type[LSFJobManager]: return LSFJobManager @@ -259,12 +295,36 @@ def lsf_job_config( ) -> LSFJobFileFactory.Config: return config + def lsf_dump_intermediate_job_data(self) -> bool: + """ + Whether to dump intermediate job data to the job submission file while jobs are being + submitted. + """ + return True + + def lsf_post_submit_delay(self) -> float | int: + """ + Configurable delay in seconds to wait after submitting jobs and before starting the status + polling. + """ + return self.poll_interval * 60 + def lsf_check_job_completeness(self) -> bool: return False def lsf_check_job_completeness_delay(self) -> float | int: return 0.0 + def lsf_poll_callback(self, poll_data: PollData) -> None: + """ + Configurable callback that is called after each job status query and before potential + resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) + that can be changed within this method. + If *False* is returned, the polling loop is gracefully terminated. Returning any other value + does not have any effect. + """ + return + def lsf_use_local_scheduler(self) -> bool: return True diff --git a/law/contrib/slurm/workflow.py b/law/contrib/slurm/workflow.py index 15f36bc0..dc89241f 100644 --- a/law/contrib/slurm/workflow.py +++ b/law/contrib/slurm/workflow.py @@ -10,20 +10,21 @@ import os import abc +import contextlib import pathlib import luigi # type: ignore[import-untyped] from law.config import Config -from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy +from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, PollData from law.job.base import JobArguments, JobInputFile from law.task.proxy import ProxyCommand from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget, LocalFileTarget from law.parameter import NO_STR -from law.util import law_src_path, merge_dicts, DotDict, InsertableDict +from law.util import no_value, law_src_path, merge_dicts, DotDict, InsertableDict from law.logger import get_logger -from law._types import Type +from law._types import Type, Generator from law.contrib.slurm.job import SlurmJobManager, SlurmJobFileFactory @@ -118,25 +119,37 @@ def create_job_file( if dashboard_file: c.input_files["dashboard_file"] = dashboard_file - # logging - # we do not use slurm's logging mechanism since it might require that the submission - # directory is present when it retrieves logs, and therefore we use a custom log file - c.stdout = "/dev/null" - c.stderr = None + # initialize logs with empty values and defer to defaults later + c.stdout = no_value + c.stderr = no_value if task.transfer_logs: c.custom_log_file = "stdall.txt" + def cast_dir( + output_dir: FileSystemDirectoryTarget | str | pathlib.Path, + touch: bool = True, + ) -> FileSystemDirectoryTarget | str: + if not isinstance(output_dir, FileSystemDirectoryTarget): + path = get_path(output_dir) + if get_scheme(path) not in (None, "file"): + return str(output_dir) + output_dir = LocalDirectoryTarget(path) + if touch: + output_dir.touch() + return output_dir + # when the output dir is local, we can run within this directory for easier output file # handling and use absolute paths for input files - output_dir = task.slurm_output_directory() - if not isinstance(output_dir, FileSystemDirectoryTarget): - output_dir = get_path(output_dir) - if get_scheme(output_dir) in (None, "file"): - output_dir = LocalDirectoryTarget(output_dir) + output_dir = cast_dir(task.slurm_output_directory()) output_dir_is_local = isinstance(output_dir, LocalDirectoryTarget) if output_dir_is_local: c.absolute_paths = True - c.custom_content.append(("chdir", output_dir.abspath)) + c.custom_content.append(("chdir", output_dir.abspath)) # type: ignore[union-attr] + + # prepare the log dir + log_dir_orig = task.htcondor_log_directory() + log_dir = cast_dir(log_dir_orig) if log_dir_orig else output_dir + log_dir_is_local = isinstance(log_dir, LocalDirectoryTarget) # job name c.job_name = f"{task.live_task_id}{postfix}" @@ -150,18 +163,32 @@ def create_job_file( # python's default multiprocessing puts socket files into that tmp directory which comes # with the restriction of less then 80 characters that would be violated, and potentially # would also overwhelm the submission directory - c.render_variables["law_job_tmp"] = "/tmp/law_$( basename \"$LAW_JOB_HOME\" )" + if not c.render_variables.get("law_job_tmp"): + c.render_variables["law_job_tmp"] = "/tmp/law_$( basename \"$LAW_JOB_HOME\" )" # task hook c = task.slurm_job_config(c, job_num, branches) + # logging defaults + def log_path(path): + if not path or path.startswith("/dev/"): + return path or None + log_target = log_dir.child(path, type="f") + if log_target.parent != log_dir: + log_target.parent.touch() + return log_target.abspath + + c.stdout = log_path(c.stdout) + c.stderr = log_path(c.stderr) + c.custom_log_file = log_path(c.custom_log_file) + # build the job file and get the sanitized config job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__) # type: ignore[misc] - # get the location of the custom local log file if any + # get the finale, absolute location of the custom log file abs_log_file = None - if output_dir_is_local and c.custom_log_file: - abs_log_file = os.path.join(output_dir.abspath, c.custom_log_file) + if log_dir_is_local and c.custom_log_file: + abs_log_file = os.path.join(log_dir.abspath, c.custom_log_file) # type: ignore[union-attr] # noqa # return job and log files return {"job": job_file, "config": c, "log": abs_log_file} @@ -200,12 +227,42 @@ class SlurmWorkflow(BaseRemoteWorkflow): exclude_index = True @abc.abstractmethod - def slurm_output_directory(self) -> FileSystemDirectoryTarget: + def slurm_output_directory(self) -> str | pathlib.Path | FileSystemDirectoryTarget: + """ + Hook to define the location of submission output files, such as the json files containing + job data, and optional log files. + This method should return a :py:class:`FileSystemDirectoryTarget`. + """ ... + def slurm_log_directory(self) -> str | pathlib.Path | FileSystemDirectoryTarget | None: + """ + Hook to define the location of log files if any are written. When set, it has precedence + over :py:meth:`slurm_output_directory` for log files. + This method should return a :py:class:`FileSystemDirectoryTarget` or a value that evaluates + to *False* in case no custom log directory is desired. + """ + return None + + @contextlib.contextmanager + def slurm_workflow_run_context(self) -> Generator[None, None, None]: + """ + Hook to provide a context manager in which the workflow run implementation is placed. This + can be helpful in situations where resurces should be acquired before and released after + running a workflow. + """ + yield + def slurm_workflow_requires(self) -> DotDict: return DotDict() + def slurm_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]: + """ + Hook to define resources for a specific job with number *job_num*, processing *branches*. + This method should return a dictionary. + """ + return {} + def slurm_bootstrap_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile | None: return None @@ -261,12 +318,36 @@ def slurm_job_config( ) -> SlurmJobFileFactory.Config: return config + def slurm_dump_intermediate_job_data(self) -> bool: + """ + Whether to dump intermediate job data to the job submission file while jobs are being + submitted. + """ + return True + + def slurm_post_submit_delay(self) -> int | float: + """ + Configurable delay in seconds to wait after submitting jobs and before starting the status + polling. + """ + return self.poll_interval * 60 + def slurm_check_job_completeness(self) -> bool: return False def slurm_check_job_completeness_delay(self) -> float | int: return 0.0 + def slurm_poll_callback(self, poll_data: PollData) -> None: + """ + Configurable callback that is called after each job status query and before potential + resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) + that can be changed within this method. + If *False* is returned, the polling loop is gracefully terminated. Returning any other value + does not have any effect. + """ + return + def slurm_use_local_scheduler(self) -> bool: return False diff --git a/law/job/base.py b/law/job/base.py index b6515cc8..d675afac 100644 --- a/law/job/base.py +++ b/law/job/base.py @@ -1488,7 +1488,6 @@ def maybe_set(current: bool | None, default: bool) -> bool: render_local = maybe_set(render_local, False) # store attributes, apply residual defaults - # TODO: move to job rendering by default self.path = os.path.abspath(os.path.expandvars(os.path.expanduser(get_path(path)))) self.copy = True if copy is None else bool(copy) self.share = False if share is None else bool(share) @@ -1520,7 +1519,7 @@ def maybe_set(current: bool | None, default: bool) -> bool: "directory, but rendering is enabled which has no effect", ) if self.share and self.render_local: - logger.warning( + logger.error( f"input file at {self.path} is configured to be shared across jobs but local " "rendering is active, potentially resulting in wrong file content", ) diff --git a/law/job/law_job.sh b/law/job/law_job.sh index c043d1f1..ba8ba92a 100755 --- a/law/job/law_job.sh +++ b/law/job/law_job.sh @@ -598,7 +598,13 @@ start_law_job() { local this_file="$( ${shell_is_zsh} && echo "${(%):-%x}" || echo "${BASH_SOURCE[0]}" )" LAW_JOB_WRAPPED_BASH="1" bash "${this_file}" $@ else - # already wrapped, start and optionally log + # create log directory + if [ ! -z "${log_file}" ]; then + local log_dir="$( dirname "${log_file}" )" + [ ! -d "${log_dir}" ] && mkdir -p "${log_dir}" + fi + + # start the job and optionally log if [ -z "${log_file}" ]; then law_job "$@" elif command -v tee &> /dev/null; then diff --git a/law/patches.py b/law/patches.py index f9cdfc33..3e4ddf1d 100644 --- a/law/patches.py +++ b/law/patches.py @@ -466,6 +466,8 @@ def patch_parameter_copy() -> None: functionality will eventually be moved to luigi, but the patch might be kept for versions of luigi where it was not addded yet. """ + default_cre = re.compile(r"(.+)(;|,)\s*((empty|no|without) default|default: [^\;]+)\s*$") + def _copy(self, add_default_to_description=False, **kwargs): # copy the instance inst = copy.copy(self) @@ -482,8 +484,13 @@ def _copy(self, add_default_to_description=False, **kwargs): # amend the description if add_default_to_description: - prefix = "; " if inst.description else "" - inst.description += f"{prefix}default: {inst._default}" + # remove default from description + if inst.description: + m = default_cre.match(inst.description) + if m: + inst.description = m.group(1) + inst.description += "; " + inst.description += "default: {}".format(inst._default) return inst diff --git a/law/target/collection.py b/law/target/collection.py index a68cafa0..2ebce17d 100644 --- a/law/target/collection.py +++ b/law/target/collection.py @@ -12,8 +12,9 @@ import random import pathlib -from abc import abstractmethod +import functools import contextlib +from collections import deque, defaultdict from law.config import Config from law.target.base import Target @@ -93,22 +94,24 @@ def _iter_state( optional_existing: bool | None = None, keys: bool = False, unpack: bool = True, + exists_func: Callable[[Target], bool] | None = None, ) -> Iterator[tuple[Any, Any] | Any]: existing = bool(existing) if optional_existing is not None: optional_existing = bool(optional_existing) # helper to check for existence - def exists(t: Target) -> bool: - if optional_existing is not None and t.optional: - return optional_existing - if isinstance(t, TargetCollection): - return t.exists(optional_existing=optional_existing) - return t.exists() + if exists_func is None: + def exists_func(t: Target) -> bool: + if optional_existing is not None and t.optional: + return optional_existing + if isinstance(t, TargetCollection): + return t.exists(optional_existing=optional_existing) + return t.exists() # loop and yield for key, targets in self._iter_flat(): - state = all(exists(t) for t in targets) + state = all(map(exists_func, targets)) if state is existing: if unpack: targets = self.targets[key] @@ -169,7 +172,7 @@ def complete(self, **kwargs) -> bool: return self.optional or self.exists(**kwargs) def _exists_fwd(self, **kwargs) -> bool: - fwd = ["optional_existing"] + fwd = ["optional_existing", "exists_func"] return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs}) def exists(self, **kwargs) -> bool: @@ -200,7 +203,7 @@ def count(self, **kwargs) -> int | tuple[int, list[Any]]: target_keys = [key for key, _ in self._iter_state(**kwargs)] n = len(target_keys) - return n if not keys else (n, target_keys) + return (n, target_keys) if keys else n def map(self, func: Callable[[Target], Target]) -> TargetCollection: """ @@ -293,6 +296,44 @@ class SiblingFileCollectionBase(FileCollection): Base class for file collections whose elements are located in the same directory (siblings). """ + @classmethod + def _exists_in_basenames( + cls, + target: Target, + basenames: set[str] | dict[str, set[str]] | None, + optional_existing: bool | None, + target_dirs: dict[Target, str] | None, + ) -> bool: + if optional_existing is not None and target.optional: + return optional_existing + if isinstance(target, SiblingFileCollectionBase): + return target._exists_fwd( + basenames=basenames, + optional_existing=optional_existing, + ) + if isinstance(target, TargetCollection): + return target.exists(exists_func=functools.partial( + cls._exists_in_basenames, + basenames=basenames, + optional_existing=optional_existing, + target_dirs=target_dirs, + )) + if isinstance(basenames, dict): + if target_dirs and target in target_dirs: + basenames = basenames[target_dirs[target]] + else: + # need to find find the collection manually, that could possibly contain the target, + # then use its basenames + for col_absdir, _basenames in basenames.items(): + if _target_path_in_dir(target, col_absdir): + basenames = _basenames + break + else: + return False + if not basenames: + return False + return target.basename in basenames + def remove(self, *, silent: bool = True, **kwargs) -> bool: removed_any = False for targets in self.iter_existing(unpack=False): @@ -300,10 +341,6 @@ def remove(self, *, silent: bool = True, **kwargs) -> bool: removed_any |= t.remove(silent=silent) return removed_any - @abstractmethod - def _exists_fwd(self, **kwargs) -> bool: - ... - class SiblingFileCollection(SiblingFileCollectionBase): """ @@ -347,21 +384,9 @@ def __init__(self, *args, **kwargs) -> None: # check that targets are in fact located in the same directory for t in flatten_collections(self._flat_target_list): - if not self._exists_in_dir(t): # type: ignore[arg-type] + if not _target_path_in_dir(t, self.dir): raise Exception(f"{t} is not located in common directory {self.dir}") - def _exists_in_dir(self, target: FileSystemTarget) -> bool: - # comparisons of dirnames are transparently possible for most target classes since their - # paths are consistent, but implement a custom check for mirrored targets - sub_target = target.remote_target if isinstance(target, MirroredTarget) else target - dir_target = ( - self.dir.remote_target - if isinstance(self.dir, MirroredDirectoryTarget) - else self.dir - ) - # do the check - return sub_target.absdirname == dir_target.abspath - def _repr_pairs(self) -> list[tuple[str, Any]]: expand = Config.instance().get_expanded_bool("target", "expand_path_repr") dir_path = self.dir.path if expand else self.dir.unexpanded_path @@ -372,45 +397,45 @@ def _iter_state( *, existing: bool = True, optional_existing: bool | None = None, - basenames: Sequence[str] | None = None, + basenames: Sequence[str] | set[str] | None = None, keys: bool = False, unpack: bool = True, + exists_func: Callable[[Target], bool] | None = None, ) -> Iterator[tuple[Any, Any] | Any]: - existing = bool(existing) - if optional_existing is not None: - optional_existing = bool(optional_existing) - # the directory must exist if not self.dir.exists(): return - # get the basenames of all elements of the directory + existing = bool(existing) + if optional_existing is not None: + optional_existing = bool(optional_existing) + + # get all basenames if basenames is None: basenames = self.dir.listdir() + basenames = self.dir.listdir() if self.dir.exists() else [] + # convert to set for faster lookup + basenames = set(basenames) if basenames else set() # helper to check for existence - def exists(t) -> bool: - if optional_existing is not None and t.optional: - return optional_existing - if isinstance(t, SiblingFileCollectionBase): - return t._exists_fwd( - basenames=basenames, - optional_existing=optional_existing, - ) - if isinstance(t, TargetCollection): - return all(exists(_t) for _t in flatten_collections(t)) - return t.basename in basenames + if exists_func is None: + exists_func = functools.partial( + self._exists_in_basenames, + basenames=basenames, + optional_existing=optional_existing, + target_dirs=None, + ) # loop and yield for key, targets in self._iter_flat(): - state = all(exists(t) for t in targets) + state = all(map(exists_func, targets)) if state is existing: if unpack: targets = self.targets[key] yield (key, targets) if keys else targets def _exists_fwd(self, **kwargs) -> bool: - fwd = ["basenames", "optional_existing"] + fwd = ["optional_existing", "basenames", "exists_func"] return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs}) @@ -433,78 +458,100 @@ def __init__(self, *args, **kwargs) -> None: # _flat_target_list attributes, but store them again in sibling file collections to speed up # some methods by grouping them into targets in the same physical directory self.collections: list[SiblingFileCollection] = [] - self._flat_target_collections = {} - grouped_targets: dict[str, list[Target]] = {} + self._flat_target_dirs = {} + grouped_targets = defaultdict(list) for t in flatten_collections(self._flat_target_list): - grouped_targets.setdefault(t.parent.uri(), []).append(t) # type: ignore[attr-defined] + grouped_targets[t.parent.uri()].append(t) for targets in grouped_targets.values(): # create and store the collection collection = SiblingFileCollection(targets) self.collections.append(collection) - # remember the collection per target + # remember the absolute collection dir per target for faster loopups later for t in targets: - self._flat_target_collections[t] = collection + self._flat_target_dirs[t] = collection.dir.abspath def _repr_pairs(self) -> list[tuple[str, Any]]: return super()._repr_pairs() + [("collections", len(self.collections))] - def _get_basenames(self): - return { - collection: (collection.dir.listdir() if collection.dir.exists() else []) - for collection in self.collections - } - def _iter_state( self, *, existing: bool = True, optional_existing: bool | None = None, - basenames: Sequence[str] | None = None, + basenames: dict[str, Sequence[str] | set[str]] | None = None, keys: bool = False, unpack: bool = True, + exists_func: Callable[[Target], bool] | None = None, ) -> Iterator[tuple[Any, Any] | Any]: existing = bool(existing) if optional_existing is not None: optional_existing = bool(optional_existing) - # get the dict of all basenames + # get all basenames if basenames is None: - basenames = self._get_basenames() + basenames = { + col.dir.abspath: (col.dir.listdir() if col.dir.exists() else []) + for col in self.collections + } + # convert to sets for faster lookups + basenames = {k: (set(v) if v else set()) for k, v in basenames.items()} # helper to check for existence - def exists(t, _basenames) -> bool: - if optional_existing is not None and t.optional: - return optional_existing - if isinstance(t, SiblingFileCollectionBase): - return t._exists_fwd( - basenames=_basenames, - optional_existing=optional_existing, - ) - if isinstance(t, TargetCollection): - return all(_t.exists() for _t in flatten_collections(t)) - return t.basename in _basenames + if exists_func is None: + exists_func = functools.partial( + self._exists_in_basenames, + basenames=basenames, # type: ignore[arg-type] + optional_existing=optional_existing, + target_dirs=self._flat_target_dirs, + ) # loop and yield for key, targets in self._iter_flat(): - state = all(exists(t, basenames[self._flat_target_collections[t]]) for t in targets) # type: ignore[call-overload] # noqa + state = all(map(exists_func, targets)) if state is existing: if unpack: targets = self.targets[key] yield (key, targets) if keys else targets def _exists_fwd(self, **kwargs) -> bool: - fwd = [("basenames", "basenames_dict"), ("optional_existing", "optional_existing")] - return self.exists(**{dst: kwargs[src] for dst, src in fwd if src in kwargs}) + fwd = ["optional_existing", "basenames", "exists_func"] + return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs}) + + +def _target_path_in_dir( + target: FileSystemTarget | str | pathlib.Path, + directory: FileSystemDirectoryTarget | str | pathlib.Path, +) -> bool: + # comparisons of dirnames are transparently possible for most target classes since their + # paths are consistent, but implement a custom check for mirrored targets + if not isinstance(target, FileSystemTarget): + target_absdir = str(target) + else: + target_absdir = ( + target.remote_target + if isinstance(target, MirroredTarget) + else target + ).absdirname + if not isinstance(directory, FileSystemDirectoryTarget): + dir_abspath = str(directory) + else: + dir_abspath = ( + directory.remote_target + if isinstance(directory, MirroredDirectoryTarget) + else directory + ).abspath + # do the comparison + return target_absdir == dir_abspath def flatten_collections(*targets) -> list[Target]: - lookup = flatten(targets) + lookup = deque(flatten(targets)) _targets: list[Target] = [] while lookup: - t = lookup.pop(0) + t = lookup.popleft() if isinstance(t, TargetCollection): - lookup[:0] = t._flat_target_list + lookup.extendleft(t._flat_target_list) else: _targets.append(t) diff --git a/law/workflow/remote.py b/law/workflow/remote.py index 28cdc274..6a145aa2 100644 --- a/law/workflow/remote.py +++ b/law/workflow/remote.py @@ -8,6 +8,7 @@ __all__ = ["JobData", "BaseRemoteWorkflowProxy", "BaseRemoteWorkflow"] +import os import sys import time import re @@ -15,7 +16,6 @@ import random import threading import pathlib -import contextlib from collections import defaultdict from abc import abstractmethod @@ -24,13 +24,14 @@ from law.workflow.base import BaseWorkflow, BaseWorkflowProxy from law.job.base import BaseJobManager, BaseJobFileFactory from law.job.dashboard import NoJobDashboard, BaseJobDashboard +from law.target.collection import TargetCollection from law.parameter import NO_FLOAT, NO_INT, get_param, DurationParameter from law.util import ( NoValue, no_value, is_number, colored, iter_chunks, merge_dicts, human_duration, DotDict, ShorthandDict, InsertableDict, ) from law.logger import get_logger -from law._types import Any, Type, Iterator, AbstractContextManager +from law._types import Any, Type, AbstractContextManager logger = get_logger(__name__) @@ -227,19 +228,19 @@ def __init__(self, *args, **kwargs) -> None: self._job_manager_setup_kwargs: dict[str, Any] | NoValue = no_value # boolean per job num denoting if a job should be / was skipped - self.skip_jobs: dict[int, bool] = {} + self._skip_jobs: dict[int, bool] = {} # retry counts per job num - self.job_retries: dict[int, int] = defaultdict(int) + self._job_retries: dict[int, int] = defaultdict(int) - # cached output() return value, set in run() - self._outputs: dict | None = None + # cached output() return value + self._cached_output: dict | None = None # flag that denotes whether a submission was done befire, set in run() self._submitted = False - # initially existing keys of the "collection" output (= complete branch tasks), set in run() - self._initially_existing_branches: set[int] = set() + # set of existing branches that is kept track of during processing + self._existing_branches: set[int] | None = None # flag denoting if jobs were cancelled or cleaned up (i.e. controlled) self._controlled_jobs = False @@ -393,27 +394,48 @@ def _cleanup_jobs(self) -> bool: task: BaseRemoteWorkflow = self.task # type: ignore[assignment] return isinstance(getattr(task, "cleanup_jobs", None), bool) and task.cleanup_jobs # type: ignore[return-value] # noqa + def _get_cached_output(self) -> dict: + if self._cached_output is None: + self._cached_output = self.output() + return self._cached_output + + def _get_existing_branches( + self, + sync: bool = False, + collection: TargetCollection | None = None, + ) -> set[int]: + if self._existing_branches is None: + sync = True + + if sync: + # initialize with set + self._existing_branches = set() + # add initial branches existing in output collection + if collection is None: + collection = self._get_cached_output().get("collection") + if collection is not None: + keys = collection.count(existing=True, keys=True)[1] # type: ignore[index] + self._existing_branches |= set(keys) + + return self._existing_branches # type: ignore[return-value] + def _can_skip_job(self, job_num: int, branches: list[int]) -> bool: """ Returns *True* when a job can be potentially skipped, which is the case when all branch tasks given by *branches* are complete. """ - task: BaseRemoteWorkflow = self.task # type: ignore[assignment] - - if job_num not in self.skip_jobs: - self.skip_jobs[job_num] = all( - (b in self._initially_existing_branches) or task.as_branch(b).complete() - for b in branches - ) + if job_num not in self._skip_jobs: + existing_branches = self._get_existing_branches() + self._skip_jobs[job_num] = all((b in existing_branches) for b in branches) # when the job is skipped, ensure that a job data entry exists and set the status - if self.skip_jobs[job_num]: + if self._skip_jobs[job_num]: if job_num not in self.job_data.jobs: self.job_data.jobs[job_num] = self.job_data_cls.job_data(branches=branches) if not self.job_data.jobs[job_num]["status"]: self.job_data.jobs[job_num]["status"] = self.job_manager.FINISHED - return self.skip_jobs[job_num] + return self._skip_jobs[job_num] def _get_job_kwargs(self, name: str) -> dict[str, Any]: attr = f"{self.workflow_type}_job_kwargs_{name}" @@ -590,17 +612,18 @@ def _maximum_resources( def process_resources(self, force: bool = False) -> dict[str, int]: task: BaseRemoteWorkflow = self.task # type: ignore[assignment] - if self._initial_process_resources is None or force: - job_resources = {} + # collect resources over all branches if not just controlling running jobs + if ( + not task.is_controlling_remote_jobs() and + (self._initial_process_resources is None or force) + ): get_job_resources = self._get_task_attribute("job_resources") - branch_chunks = iter_chunks(task.branch_map.keys(), task.tasks_per_job) # type: ignore[arg-type] # noqa - for job_num, branches in enumerate(branch_chunks, 1): - if self._can_skip_job(job_num, branches): - continue - job_resources[job_num] = get_job_resources(job_num, branches) - - self._initial_process_resources = job_resources + self._initial_process_resources = { + job_num: get_job_resources(job_num, branches) + for job_num, branches in enumerate(branch_chunks, 1) + if not self._can_skip_job(job_num, branches) + } if not self._initial_process_resources: return {} @@ -668,14 +691,14 @@ def dump_job_data(self) -> None: self.job_data["dashboard_config"] = self.dashboard.get_persistent_config() # write the job data to the output file - if self._outputs is not None: + output = self._get_cached_output() + if output is not None: with self._dump_lock: - self._outputs["jobs"].dump(self.job_data, formatter="json", indent=4) - - logger.debug("job data dumped") + output["jobs"].dump(self.job_data, formatter="json", indent=4) + logger.debug("job data dumped") def get_run_context(self) -> AbstractContextManager: - return self._get_task_attribute("workflow_run_context", fallback=True)() + return self._get_task_attribute("workflow_run_context")() def run(self) -> None: with self.get_run_context(): @@ -689,18 +712,18 @@ def _run_impl(self) -> None: """ task: BaseRemoteWorkflow = self.task # type: ignore[assignment] - self._outputs = self.output() - if not isinstance(self._outputs, dict): - raise TypeError(f"workflow output must be a dict, got '{self._outputs}'") + output = self._get_cached_output() + if not isinstance(output, dict): + raise TypeError(f"workflow output must be a dict, got '{output}'") # create the job dashboard interface self.dashboard = task.create_job_dashboard() or NoJobDashboard() # read job data and reset some values - self._submitted = not task.ignore_submission and self._outputs["jobs"].exists() + self._submitted = not task.ignore_submission and output["jobs"].exists() if self._submitted: # load job data and cast job ids - self.job_data.update(self._outputs["jobs"].load(formatter="json")) + self.job_data.update(output["jobs"].load(formatter="json")) for job_data in self.job_data.jobs.values(): job_data["job_id"] = self.job_manager.cast_job_id(job_data["job_id"]) @@ -708,12 +731,11 @@ def _run_impl(self) -> None: task.tasks_per_job = self.job_data.tasks_per_job self.dashboard.apply_config(self.job_data.dashboard_config) - # store the initially complete branches + # store initially complete branches outputs_existing = False - if "collection" in self._outputs: - collection = self._outputs["collection"] - count, keys = collection.count(keys=True) - self._initially_existing_branches = keys + if "collection" in output: + collection = output["collection"] + count = len(self._get_existing_branches(collection=collection)) outputs_existing = count >= collection._abs_threshold() # cancel jobs? @@ -741,7 +763,7 @@ def _run_impl(self) -> None: # ensure the output directory exists if not self._submitted: - self._outputs["jobs"].parent.touch() + output["jobs"].parent.touch() try: # instantiate the configured job file factory @@ -760,7 +782,7 @@ def _run_impl(self) -> None: # sleep once to give the job interface time to register the jobs if not self._submitted and not task.no_poll: - post_submit_delay = self._get_task_attribute("post_submit_delay", fallback=True)() + post_submit_delay = self._get_task_attribute("post_submit_delay")() if post_submit_delay > 0: logger.debug(f"sleep for {post_submit_delay} second(s) due to post_submit_delay") time.sleep(post_submit_delay) @@ -1007,7 +1029,7 @@ def _submit_batch( job_files = [str(f["job"]) for f in all_job_files.values()] # prepare objects for dumping intermediate job data - dump_freq = self._get_task_attribute("dump_intermediate_job_data", fallback=True)() + dump_freq = self._get_task_attribute("dump_intermediate_job_data")() if dump_freq and not is_number(dump_freq): dump_freq = 50 @@ -1086,14 +1108,14 @@ def poll(self) -> None: Initiates the job status polling loop. """ task: BaseRemoteWorkflow = self.task # type: ignore[assignment] - dump_intermediate_job_data = task.dump_intermediate_job_data() + dump_intermediate_job_data = self._get_task_attribute("dump_intermediate_job_data")() # total job count n_jobs = len(self.job_data) # track finished and failed jobs in dicts holding status data - finished_jobs = [] - failed_jobs = [] + finished_jobs = set() + failed_jobs = set() # the resources of yet unfinished jobs as claimed initially and reported to the scheduler # and the maximum amount resources potentially claimed by the jobs @@ -1152,7 +1174,7 @@ def poll(self) -> None: if self._can_skip_job(job_num, data["branches"]): data["status"] = self.job_manager.FINISHED data["code"] = 0 - finished_jobs.append(job_num) + finished_jobs.add(job_num) continue # mark as active or unknown @@ -1235,7 +1257,7 @@ def poll(self) -> None: # when the task picked up an existing submission file, then in the first polling # iteration it might happen that a job is finished, but outputs of its tasks are - # not existing, e.g. when they were removed externaly and the job id is still known + # not existing, e.g. when they were removed externally and the job id is still known # to the batch system; in this case, mark it as unknown and to be retried if self._submitted and i == 0: is_finished = data["status"] == self.job_manager.FINISHED @@ -1255,15 +1277,15 @@ def poll(self) -> None: time.sleep(check_completeness_delay) # store jobs per status and take further actions depending on the status - pending_jobs = [] - running_jobs = [] - newly_failed_jobs = [] - retry_jobs = [] + pending_jobs = set() + running_jobs = set() + retry_jobs = set() + newly_failed_jobs = [] # need to preserve order for job_num in active_jobs: data = self.job_data.jobs[job_num] if data["status"] == self.job_manager.PENDING: - pending_jobs.append(job_num) + pending_jobs.add(job_num) task.forward_dashboard_event( self.dashboard, data.copy(), @@ -1273,7 +1295,7 @@ def poll(self) -> None: continue if data["status"] == self.job_manager.RUNNING: - running_jobs.append(job_num) + running_jobs.add(job_num) task.forward_dashboard_event( self.dashboard, data.copy(), @@ -1288,7 +1310,8 @@ def poll(self) -> None: task.as_branch(b).complete() for b in data["branches"] ): - finished_jobs.append(job_num) + finished_jobs.add(job_num) + self._existing_branches |= set(data["branches"]) # type: ignore[operator] self.poll_data.n_active -= 1 data["job_id"] = self.job_data.dummy_job_id task.forward_dashboard_event( @@ -1297,6 +1320,14 @@ def poll(self) -> None: "status.finished", job_num, ) + # potentially clear logs + if task.clear_logs: + log_file = data["extra"].get("log") + if isinstance(log_file, str) and os.path.exists(log_file): + try: + os.remove(log_file) + except FileNotFoundError: + pass continue # the job is marked as finished but not all branches are complete @@ -1308,12 +1339,12 @@ def poll(self) -> None: self.poll_data.n_active -= 1 # retry or ultimately failed? - if self.job_retries[job_num] < retries: - self.job_retries[job_num] += 1 + if self._job_retries[job_num] < retries: + self._job_retries[job_num] += 1 self.job_data.attempts.setdefault(job_num, 0) self.job_data.attempts[job_num] += 1 data["status"] = self.job_manager.RETRY - retry_jobs.append(job_num) + retry_jobs.add(job_num) task.forward_dashboard_event( self.dashboard, data.copy(), @@ -1321,7 +1352,7 @@ def poll(self) -> None: job_num, ) else: - failed_jobs.append(job_num) + failed_jobs.add(job_num) task.forward_dashboard_event( self.dashboard, data.copy(), @@ -1357,7 +1388,7 @@ def poll(self) -> None: self.last_status_counts = counts # remove resources of finished and failed jobs - for job_num in finished_jobs + failed_jobs: + for job_num in finished_jobs | failed_jobs: job_resources.pop(job_num, None) # check if the maximum possible resources decreased and report to the scheduler new_max_resources = self._maximum_resources(job_resources, self.poll_data.n_parallel) @@ -1397,8 +1428,7 @@ def poll(self) -> None: # complain when failed if failed: - failed_nums = (job_num for job_num in failed_jobs if job_num not in retry_jobs) - failed_nums_str = ",".join(map(str, failed_nums)) + failed_nums_str = ",".join(map(str, sorted(failed_jobs - retry_jobs))) raise Exception(f"tolerance exceeded for job(s) {failed_nums_str}") # stop early if unreachable @@ -1411,7 +1441,7 @@ def poll(self) -> None: raise Exception(msg) # invoke the poll callback - poll_callback_res = self._get_task_attribute("poll_callback", fallback=True)(self.poll_data) # noqa + poll_callback_res = self._get_task_attribute("poll_callback")(self.poll_data) if poll_callback_res is False: logger.debug( "job polling loop gracefully stopped due to False returned by poll_callback", @@ -1635,6 +1665,11 @@ class BaseRemoteWorkflow(BaseWorkflow): significant=False, description="transfer job logs to the output directory; default: False", ) + clear_logs = luigi.BoolParameter( + default=False, + significant=False, + description="when --transfer-logs is set, remove logs of successful jobs; default: False", + ) check_unreachable_acceptance = False align_polling_status_line = False @@ -1665,22 +1700,6 @@ def process_resources(self) -> dict[str, int]: resources.update(self.resources) return resources - def job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]: - """ - Hook to define resources for a specific job with number *job_num*, processing *branches*. - This method should return a dictionary. - """ - return {} - - @contextlib.contextmanager - def workflow_run_context(self) -> Iterator[None]: - """ - Hook to provide a context manager in which the workflow run implementation is placed. This - can be helpful in situations where resurces should be acquired before and released after - running a workflow. - """ - yield - def is_controlling_remote_jobs(self) -> bool: """ Returns *True* if the remote workflow is only controlling remote jobs instead of handling @@ -1694,27 +1713,6 @@ def control_output_postfix(self) -> str: """ return self.get_branches_repr() - def poll_callback(self, poll_data: PollData) -> bool | Any: - """ - Configurable callback that is called after each job status query and before potential - resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) - that can be changed within this method. - - If *False* is returned, the polling loop is gracefully terminated. Returning any other value - does not have any effect. - """ - return True - - def post_submit_delay(self) -> int | float: - """ - Configurable delay in seconds to wait after submitting jobs and before starting the status - polling. - """ - return self.poll_interval * 60 # type: ignore[operator] - - def dump_intermediate_job_data(self) -> bool: - return True - def create_job_dashboard(self) -> BaseJobDashboard | None: """ Hook method to return a configured :py:class:`law.job.BaseJobDashboard` instance that will