Skip to content

Commit

Permalink
Port latest changes from master.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Dec 16, 2024
1 parent 0f0dd12 commit df34f22
Show file tree
Hide file tree
Showing 15 changed files with 754 additions and 294 deletions.
21 changes: 13 additions & 8 deletions law/cli/completion.sh
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,17 @@ _law_complete() {
fi
}

# run bashcompinit in zsh, export the completion function in bash
if [ ! -z "${ZSH_VERSION}" ]; then
autoload -Uz +X compinit && compinit
autoload -Uz +X bashcompinit && bashcompinit
else
export -f _law_complete
fi
# export the completion function in bash
[ ! -z "${BASH_VERSION}" ] && export -f _law_complete

# enable the completion if not explicitly disabled
if [ "${LAW_CLI_SKIP_COMPLETION}" != "1" ]; then
# in zsh, run bashcompinit in zsh, export the completion function in bash
if [ ! -z "${ZSH_VERSION}" ]; then
autoload -Uz +X compinit && compinit
autoload -Uz +X bashcompinit && bashcompinit
fi

complete -o bashdefault -o default -F _law_complete law
# add to all known executables
complete -o bashdefault -o default -F _law_complete law
fi
63 changes: 55 additions & 8 deletions law/contrib/arc/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
__all__ = ["ARCWorkflow"]

import os
import pathlib
import abc
import contextlib
import pathlib

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, PollData
from law.job.base import JobArguments, JobInputFile
from law.task.proxy import ProxyCommand
from law.target.file import get_path
from law.target.local import LocalFileTarget
from law.parameter import CSVParameter
from law.util import law_src_path, merge_dicts, DotDict, InsertableDict
from law.util import no_value, law_src_path, merge_dicts, DotDict, InsertableDict
from law.logger import get_logger
from law._types import Type
from law._types import Type, Generator

from law.contrib.wlcg import WLCGDirectoryTarget
from law.contrib.arc.job import ARCJobManager, ARCJobFileFactory
Expand Down Expand Up @@ -125,10 +126,10 @@ def create_job_file(
if dashboard_file:
c.input_files["dashboard_file"] = dashboard_file

# log files
c.log = None
c.stdout = None
c.stderr = None
# initialize logs with empty values and defer to defaults later
c.log = no_value
c.stdout = no_value
c.stderr = no_value
if task.transfer_logs:
log_file = "stdall.txt"
c.stdout = log_file
Expand All @@ -145,6 +146,12 @@ def create_job_file(
# build the job file and get the sanitized config
job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__) # type: ignore[misc]

# logging defaults
c.log = c.log or None
c.stdout = c.stdout or None
c.stderr = c.stderr or None
c.custom_log_file = c.custom_log_file or None

# determine the custom log file uri if set
abs_log_file = None
if c.custom_log_file:
Expand Down Expand Up @@ -193,6 +200,15 @@ class ARCWorkflow(BaseRemoteWorkflow):
def arc_output_directory(self) -> WLCGDirectoryTarget:
...

@contextlib.contextmanager
def arc_workflow_run_context(self) -> Generator[None, None, None]:
"""
Hook to provide a context manager in which the workflow run implementation is placed. This
can be helpful in situations where resurces should be acquired before and released after
running a workflow.
"""
yield

def arc_workflow_requires(self) -> DotDict:
return DotDict()

Expand All @@ -214,6 +230,13 @@ def arc_output_postfix(self) -> str:
def arc_output_uri(self) -> str:
return self.arc_output_directory().uri(return_all=False) # type: ignore[return-value]

def arc_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]:
"""
Hook to define resources for a specific job with number *job_num*, processing *branches*.
This method should return a dictionary.
"""
return {}

def arc_job_manager_cls(self) -> Type[ARCJobManager]:
return ARCJobManager

Expand Down Expand Up @@ -254,12 +277,36 @@ def arc_job_config(
) -> ARCJobFileFactory.Config:
return config

def arc_dump_intermediate_job_data(self) -> bool:
"""
Whether to dump intermediate job data to the job submission file while jobs are being
submitted.
"""
return True

def arc_post_submit_delay(self) -> int | float:
"""
Configurable delay in seconds to wait after submitting jobs and before starting the status
polling.
"""
return self.poll_interval * 60

def arc_check_job_completeness(self) -> bool:
return False

def arc_check_job_completeness_delay(self) -> float | int:
return 0.0

def arc_poll_callback(self, poll_data: PollData) -> None:
"""
Configurable callback that is called after each job status query and before potential
resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`)
that can be changed within this method.
If *False* is returned, the polling loop is gracefully terminated. Returning any other value
does not have any effect.
"""
return

def arc_use_local_scheduler(self) -> bool:
return True

Expand Down
64 changes: 52 additions & 12 deletions law/contrib/cms/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@

__all__ = ["CrabWorkflow"]

import pathlib
import uuid
import abc
import contextlib
import pathlib

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, JobData
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, JobData, PollData
from law.job.base import JobArguments, JobInputFile
from law.target.file import get_path, get_scheme, remove_scheme, FileSystemDirectoryTarget
from law.target.local import LocalDirectoryTarget, LocalFileTarget
from law.task.proxy import ProxyCommand
from law.util import no_value, law_src_path, merge_dicts, human_duration, DotDict, InsertableDict
from law.logger import get_logger
from law._types import Any, Type
from law._types import Any, Type, Generator

from law.contrib.wlcg import check_vomsproxy_validity, get_myproxy_info
from law.contrib.cms.job import CrabJobManager, CrabJobFileFactory
Expand Down Expand Up @@ -165,8 +166,7 @@ def create_job_file_group(

# log file
if task.transfer_logs:
log_file = "stdall.txt"
c.custom_log_file = log_file
c.custom_log_file = "stdall.txt"

# task hook
c = task.crab_job_config(c, list(submit_jobs.keys()), list(submit_jobs.values())) # type: ignore[call-arg, arg-type] # noqa
Expand Down Expand Up @@ -262,6 +262,22 @@ def crab_work_area(self) -> str | LocalDirectoryTarget:
# relative to the job file directory
return ""

@contextlib.contextmanager
def crab_workflow_run_context(self) -> Generator[None, None, None]:
"""
Hook to provide a context manager in which the workflow run implementation is placed. This
can be helpful in situations where resurces should be acquired before and released after
running a workflow.
"""
yield

def crab_workflow_requires(self) -> DotDict:
"""
Hook to define requirements for the workflow itself and that need to be resolved before any
submission can happen.
"""
return DotDict()

def crab_job_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile:
"""
Hook to return the location of the job file that is executed on job nodes.
Expand All @@ -284,13 +300,6 @@ def crab_stageout_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputF
"""
return None

def crab_workflow_requires(self) -> DotDict:
"""
Hook to define requirements for the workflow itself and that need to be resolved before any
submission can happen.
"""
return DotDict()

def crab_output_postfix(self) -> str:
"""
Hook to define the postfix of outputs, for instance such that workflows with different
Expand All @@ -304,6 +313,13 @@ def crab_output_uri(self) -> str:
"""
return self.crab_output_directory().uri(return_all=False) # type: ignore[return-value]

def crab_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]:
"""
Hook to define resources for a specific job with number *job_num*, processing *branches*.
This method should return a dictionary.
"""
return {}

def crab_job_manager_cls(self) -> Type[CrabJobManager]:
"""
Hook to define a custom job managet class to use.
Expand Down Expand Up @@ -359,6 +375,20 @@ def crab_job_config(
"""
return config

def crab_dump_intermediate_job_data(self) -> bool:
"""
Whether to dump intermediate job data to the job submission file while jobs are being
submitted.
"""
return True

def crab_post_submit_delay(self) -> float | int:
"""
Configurable delay in seconds to wait after submitting jobs and before starting the status
polling.
"""
return self.poll_interval * 60

def crab_check_job_completeness(self) -> bool:
"""
Hook to define whether after job report successful completion, the job manager should check
Expand All @@ -374,6 +404,16 @@ def crab_check_job_completeness_delay(self) -> float | int:
"""
return 0.0

def crab_poll_callback(self, poll_data: PollData) -> None:
"""
Configurable callback that is called after each job status query and before potential
resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`)
that can be changed within this method.
If *False* is returned, the polling loop is gracefully terminated. Returning any other value
does not have any effect.
"""
return

def crab_cmdline_args(self) -> dict[str, str]:
"""
Hook to add additional cli parameters to "law run" commands executed on job nodes.
Expand Down
2 changes: 0 additions & 2 deletions law/contrib/gfal/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import os
import sys
import gc
import pathlib
import contextlib
import stat as _stat
Expand Down Expand Up @@ -121,7 +120,6 @@ def context(self) -> Iterator[gfal2.Gfal2Context]:
finally:
if self.atomic_contexts and pid in self._contexts:
del self._contexts[pid]
gc.collect()

@contextlib.contextmanager
def transfer_parameters(self, ctx: gfal2.Gfal2Context) -> Iterator[gfal2.TransferParameters]:
Expand Down
Loading

0 comments on commit df34f22

Please sign in to comment.