Skip to content

Commit

Permalink
Consistent usage of destination_info.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Aug 23, 2023
1 parent fd754a3 commit 02d504c
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 19 deletions.
8 changes: 6 additions & 2 deletions law/contrib/arc/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,13 @@ def create_job_file(self, job_num, branches):
return {"job": job_file, "log": abs_log_file}

def destination_info(self):
info = ["ce: {}".format(",".join(self.task.arc_ce))]
info = super(ARCWorkflowProxy, self).destination_info()

info["ce"] = "ce: {}".format(",".join(self.task.arc_ce))

info = self.task.arc_destination_info(info)
return ", ".join(map(str, info))

return info


class ARCWorkflow(BaseRemoteWorkflow):
Expand Down
6 changes: 3 additions & 3 deletions law/contrib/cms/scripts/setup_cmssw.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,18 @@ setup_cmssw() {
# conditions from multiple processes, guard the setup with a pending_flag_file and
# sleep for a random amount of seconds between 0 and 5 to further reduce the chance of
# simultaneously starting processes reaching this point at the same time
sleep "$( python3 -c 'import random;print(random.random() * 5)')"
sleep "$( python3 -c 'import random;print(random.random() * 5)' )"
# if an existing flag file is older than 25 minutes, consider it a dangling leftover from a
# previously failed installation attempt and delete it
if [ -f "${pending_flag_file}" ]; then
local flag_file_age="$(( $( date +%s ) - $( date +%s -r "${pending_flag_file}" )))"
local flag_file_age="$(( $( date +%s ) - $( date +%s -r "${pending_flag_file}" ) ))"
[ "${flag_file_age}" -ge "1500" ] && rm -f "${pending_flag_file}"
fi
# start the sleep loop
local sleep_counter="0"
while [ -f "${pending_flag_file}" ]; do
# wait at most 20 minutes
sleep_counter="$(( $sleep_counter + 1 ))"
sleep_counter="$(( ${sleep_counter} + 1 ))"
if [ "${sleep_counter}" -ge 120 ]; then
>&2 echo "installation of ${custom_cmssw_dir} is done in different process, but number of sleeps exceeded"
return "6"
Expand Down
20 changes: 20 additions & 0 deletions law/contrib/cms/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ def _status_error_pairs(self, job_num, job_data):

return pairs

def destination_info(self):
info = super(CrabWorkflowProxy, self).destination_info()

info = self.task.crab_destination_info(info)

return info


class CrabWorkflow(BaseRemoteWorkflow):

Expand Down Expand Up @@ -242,6 +249,9 @@ def crab_work_area(self):
return ""

def crab_job_file(self):
"""
Hook to return the location of the job file that is executed on job nodes.
"""
return JobInputFile(law_src_path("job", "law_job.sh"))

def crab_bootstrap_file(self):
Expand Down Expand Up @@ -275,6 +285,9 @@ def crab_output_postfix(self):
return ""

def crab_output_uri(self):
"""
Hook to return the URI of the remote crab output directory.
"""
return self.crab_output_directory().uri()

def crab_job_manager_cls(self):
Expand Down Expand Up @@ -326,7 +339,14 @@ def crab_check_job_completeness_delay(self):
return 0.0

def crab_cmdline_args(self):
"""
Hook to add additional cli parameters to "law run" commands executed on job nodes.
"""
return {}

def crab_destination_info(self, info):
"""
Hook to add additional information behind each job status query line by extending an *info*
dictionary whose values will be shown separated by comma.
"""
return info
8 changes: 6 additions & 2 deletions law/contrib/glite/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,13 @@ def create_job_file(self, job_num, branches):
return {"job": job_file, "log": abs_log_file}

def destination_info(self):
info = ["ce: {}".format(",".join(self.task.glite_ce))]
info = super(GLiteWorkflowProxy, self).destination_info()

info["ce"] = "ce: {}".format(",".join(self.task.glite_ce))

info = self.task.glite_destination_info(info)
return ", ".join(map(str, info))

return info


class GLiteWorkflow(BaseRemoteWorkflow):
Expand Down
12 changes: 8 additions & 4 deletions law/contrib/htcondor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,17 @@ def create_job_file(self, job_num, branches):
return {"job": job_file, "log": abs_log_file}

def destination_info(self):
info = []
info = super(HTCondorWorkflowProxy, self).destination_info()

if self.task.htcondor_pool and self.task.htcondor_pool != NO_STR:
info.append("pool: {}".format(self.task.htcondor_pool))
info["pool"] = "pool: {}".format(self.task.htcondor_pool)

if self.task.htcondor_scheduler and self.task.htcondor_scheduler != NO_STR:
info.append("scheduler: {}".format(self.task.htcondor_scheduler))
info["scheduler"] = "scheduler: {}".format(self.task.htcondor_scheduler)

info = self.task.htcondor_destination_info(info)
return ", ".join(map(str, info))

return info


class HTCondorWorkflow(BaseRemoteWorkflow):
Expand Down
9 changes: 6 additions & 3 deletions law/contrib/lsf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,14 @@ def create_job_file(self, job_num, branches):
return {"job": job_file, "log": abs_log_file}

def destination_info(self):
info = []
info = super(LSFWorkflowProxy, self).destination_info()

if self.task.lsf_queue != NO_STR:
info.append("queue: {}".format(self.task.lsf_queue))
info["queue"] = "queue: {}".format(self.task.lsf_queue)

info = self.task.lsf_destination_info(info)
return ", ".join(map(str, info))

return info


class LSFWorkflow(BaseRemoteWorkflow):
Expand Down
8 changes: 5 additions & 3 deletions law/contrib/slurm/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ def create_job_file(self, job_num, branches):
return {"job": job_file, "log": abs_log_file}

def destination_info(self):
info = []
info.extend(self.task.slurm_destination_info(info))
return ", ".join(map(str, info))
info = super(SlurmWorkflowProxy, self).destination_info()

info = self.task.slurm_destination_info(info)

return info


class SlurmWorkflow(BaseRemoteWorkflow):
Expand Down
2 changes: 1 addition & 1 deletion law/contrib/wlcg/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _vomsproxy_info(args=None, proxy_file=None, silent=False):
def get_vomsproxy_identity(proxy_file=None, silent=False):
"""
Returns the identity information of the voms proxy. When *proxy_file* is *None*, it defaults to
the result o :py:func:`get_vomsproxy_file`. Otherwise, when it evaluates to *False*,
the result of :py:func:`get_vomsproxy_file`. Otherwise, when it evaluates to *False*,
``voms-proxy-info`` is queried without a custom proxy file.
"""
code, out, _ = _vomsproxy_info(args=["--identity"], proxy_file=proxy_file, silent=silent)
Expand Down
4 changes: 3 additions & 1 deletion law/workflow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,15 @@ def destination_info(self):
Hook that can return a string containing information on the location that jobs are submitted
to. The string is appended to submission and status messages.
"""
return ""
return InsertableDict()

def _destination_info_postfix(self):
"""
Returns the destination info ready to be appended to a string.
"""
dst_info = self.destination_info()
if isinstance(dst_info, dict):
dst_info = list(dst_info.values())
if isinstance(dst_info, (list, tuple)):
dst_info = ", ".join(map(str, dst_info))
if dst_info != "":
Expand Down

0 comments on commit 02d504c

Please sign in to comment.