Skip to content

Commit

Permalink
Handle potential pathlib.Path's throughout law, close #167.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Oct 3, 2023
1 parent ce82c5f commit dca607b
Show file tree
Hide file tree
Showing 47 changed files with 287 additions and 200 deletions.
4 changes: 2 additions & 2 deletions law/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

def law_home_path(*paths):
home = os.getenv("LAW_HOME") or os.path.expandvars(os.path.expanduser("$HOME/.law"))
return os.path.normpath(os.path.join(home, *(str(path) for path in paths)))
return os.path.normpath(os.path.join(home, *map(str, paths)))


class Config(ConfigParser):
Expand Down Expand Up @@ -186,7 +186,7 @@ def __init__(self, config_file="", skip_defaults=False, skip_fallbacks=False,
if not skip_fallbacks:
config_files += self._config_files
for cf in config_files:
cf = os.path.expandvars(os.path.expanduser(cf))
cf = os.path.expandvars(os.path.expanduser(str(cf)))
cf = os.path.normpath(os.path.abspath(cf))
if os.path.isfile(cf):
self.config_file = cf
Expand Down
14 changes: 10 additions & 4 deletions law/contrib/arc/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from law.config import Config
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles
from law.target.file import get_path
from law.util import interruptable_popen, make_list, make_unique, quote_cmd
from law.logger import get_logger

Expand Down Expand Up @@ -64,7 +65,7 @@ def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, sil
# when this is the case, we have to make the assumption that their input files are all
# absolute, or they are relative but all in the same directory
chunking = isinstance(job_file, (list, tuple))
job_files = make_list(job_file)
job_files = list(map(str, make_list(job_file)))
job_file_dir = os.path.dirname(os.path.abspath(job_files[0]))
job_file_names = [os.path.basename(jf) for jf in job_files]

Expand Down Expand Up @@ -348,6 +349,7 @@ def create(self, postfix=None, **kwargs):
c.output_files.append(c[attr])

# postfix certain output files
c.output_files = list(map(str, c.output_files))
if c.postfix_output_files:
c.output_files = [self.postfix_output_file(path, postfix) for path in c.output_files]
for attr in ["log", "stdout", "stderr", "custom_log_file"]:
Expand All @@ -367,7 +369,11 @@ def create(self, postfix=None, **kwargs):

# ensure that the executable is an input file, remember the key to access it
if c.executable:
executable_keys = [k for k, v in c.input_files.items() if v == c.executable]
executable_keys = [
k
for k, v in c.input_files.items()
if get_path(v) == get_path(c.executable)
]
if executable_keys:
executable_key = executable_keys[0]
else:
Expand Down Expand Up @@ -454,7 +460,7 @@ def prepare_input(f):
render_variables = self.linearize_render_variables(c.render_variables)

# prepare the job file
job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name), postfix)
job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix)

# render copied, non-remote input files
for key, f in c.input_files.items():
Expand All @@ -475,7 +481,7 @@ def prepare_input(f):

# prepare the executable when given
if c.executable:
c.executable = c.input_files[executable_key].path_job_post_render
c.executable = get_path(c.input_files[executable_key].path_job_post_render)
# make the file executable for the user and group
path = os.path.join(c.dir, os.path.basename(c.executable))
if os.path.exists(path):
Expand Down
4 changes: 2 additions & 2 deletions law/contrib/arc/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _arcproxy_info(args=None, proxy_file=None, silent=False):
if proxy_file is None:
proxy_file = get_arcproxy_file()
if proxy_file:
proxy_file = os.path.expandvars(os.path.expanduser(proxy_file))
proxy_file = os.path.expandvars(os.path.expanduser(str(proxy_file)))
cmd.extend(["--proxy", proxy_file])

code, out, err = interruptable_popen(quote_cmd(cmd), shell=True, executable="/bin/bash",
Expand Down Expand Up @@ -138,7 +138,7 @@ def renew_arcproxy(password="", lifetime="8 days", proxy_file=None):

args = "--constraint=validityPeriod={}".format(lifetime_seconds)
if proxy_file:
proxy_file = os.path.expandvars(os.path.expanduser(proxy_file))
proxy_file = os.path.expandvars(os.path.expanduser(str(proxy_file)))
args += " --proxy={}".format(proxy_file)

with tmp_file() as (_, tmp):
Expand Down
2 changes: 1 addition & 1 deletion law/contrib/arc/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def create_job_file(self, job_num, branches):
# determine the custom log file uri if set
abs_log_file = None
if c.custom_log_file:
abs_log_file = os.path.join(c.output_uri, c.custom_log_file)
abs_log_file = os.path.join(str(c.output_uri), c.custom_log_file)

# return job and log files
return {"job": job_file, "config": c, "log": abs_log_file}
Expand Down
2 changes: 2 additions & 0 deletions law/contrib/awkward/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def load(cls, path, *args, **kwargs):
if path.endswith((".parquet", ".parq")):
import awkward as ak
return ak.from_parquet(path, *args, **kwargs)

if path.endswith(".json"):
import awkward as ak
return ak.from_json(path, *args, **kwargs)
Expand All @@ -44,6 +45,7 @@ def dump(cls, path, obj, *args, **kwargs):
if path.endswith((".parquet", ".parq")):
import awkward as ak
return ak.to_parquet(obj, path, *args, **kwargs)

if path.endswith(".json"):
import awkward as ak
return ak.to_json(obj, path, *args, **kwargs)
Expand Down
29 changes: 20 additions & 9 deletions law/contrib/cms/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from law.sandbox.base import Sandbox
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles
from law.job.dashboard import BaseJobDashboard
from law.target.file import get_path
from law.util import (
DotDict, interruptable_popen, make_list, make_unique, quote_cmd, no_value, rel_path,
)
Expand Down Expand Up @@ -143,7 +144,7 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us
instance = self.instance

# get the job file location as the submission command is run it the same directory
job_file_dir, job_file_name = os.path.split(os.path.abspath(job_file))
job_file_dir, job_file_name = os.path.split(os.path.abspath(str(job_file)))

# define the actual submission in a loop to simplify retries
while True:
Expand Down Expand Up @@ -226,7 +227,7 @@ def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_user
job_ids = self._job_ids_from_proj_dir(proj_dir)

# build the command
cmd = ["crab", "kill", "--dir", proj_dir]
cmd = ["crab", "kill", "--dir", str(proj_dir)]
if proxy:
cmd += ["--proxy", proxy]
if instance:
Expand Down Expand Up @@ -254,13 +255,15 @@ def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_use
job_ids = self._job_ids_from_proj_dir(proj_dir)

# just delete the project directory
proj_dir = str(proj_dir)
if os.path.isdir(proj_dir):
shutil.rmtree(proj_dir)

return {job_id: None for job_id in job_ids}

def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None,
skip_transfers=None, silent=False):
proj_dir = str(proj_dir)
log_data = self._parse_log_file(os.path.join(proj_dir, "crab.log"))
if job_ids is None:
job_ids = self._job_ids_from_proj_dir(proj_dir, log_data=log_data)
Expand Down Expand Up @@ -411,6 +414,7 @@ def _proj_dir_from_job_file(cls, job_file, cmssw_env):
work_area = None
request_name = None

job_file = str(job_file)
with open(job_file, "r") as f:
# fast approach: parse the job file
for line in f.readlines():
Expand Down Expand Up @@ -457,7 +461,7 @@ def _proj_dir_from_job_file(cls, job_file, cmssw_env):

@classmethod
def _parse_log_file(cls, log_file):
log_file = os.path.expandvars(os.path.expanduser(log_file))
log_file = os.path.expandvars(os.path.expanduser(str(log_file)))
if not os.path.exists(log_file):
return None

Expand All @@ -481,6 +485,7 @@ def _parse_log_file(cls, log_file):
@classmethod
def _job_ids_from_proj_dir(cls, proj_dir, log_data=None):
# read log data
proj_dir = str(proj_dir)
if not log_data:
log_data = cls._parse_log_file(os.path.join(proj_dir, "crab.log"))
if not log_data or "n_jobs" not in log_data or "task_name" not in log_data:
Expand Down Expand Up @@ -634,6 +639,7 @@ def create(self, **kwargs):
for attr in ["custom_log_file"]:
if c[attr] and c[attr] not in c.output_files:
c.output_files.append(c[attr])
c.output_files = list(map(str, c.output_files))

# ensure that all input files are JobInputFile's
c.input_files = {
Expand All @@ -643,7 +649,11 @@ def create(self, **kwargs):

# ensure that the executable is an input file, remember the key to access it
if c.executable:
executable_keys = [k for k, v in c.input_files.items() if v == c.executable]
executable_keys = [
k
for k, v in c.input_files.items()
if get_path(v) == get_path(c.executable)
]
if executable_keys:
executable_key = executable_keys[0]
else:
Expand Down Expand Up @@ -720,7 +730,7 @@ def prepare_input(f):
render_variables = self.linearize_render_variables(c.render_variables)

# prepare the job file
job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name))
job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)))

# render copied input files
for key, f in c.input_files.items():
Expand All @@ -734,17 +744,18 @@ def prepare_input(f):

# prepare the executable when given
if c.executable:
c.executable = c.input_files[executable_key].path_job_post_render
c.executable = get_path(c.input_files[executable_key].path_job_post_render)
# make the file executable for the user and group
path = os.path.join(c.dir, os.path.basename(c.executable))
if os.path.exists(path):
os.chmod(path, os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP)

# resolve work_area relative to self.dir
if not c.work_area:
c.work_area = self.dir
if c.work_area:
work_area = os.path.expandvars(os.path.expanduser(str(c.work_area)))
c.work_area = os.path.join(self.dir, work_area)
else:
c.work_area = os.path.join(self.dir, os.path.expandvars(os.path.expanduser(c.work_area)))
c.work_area = self.dir

# General
c.crab.General.requestName = c.request_name
Expand Down
9 changes: 5 additions & 4 deletions law/contrib/cms/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,14 @@ def load_env(path):

# use the cache path if set
if self.env_cache_path:
env_cache_path = str(self.env_cache_path)
# write it if it does not exist yet
if not os.path.exists(self.env_cache_path):
makedirs(os.path.dirname(self.env_cache_path))
write_env(self.env_cache_path)
if not os.path.exists(env_cache_path):
makedirs(os.path.dirname(env_cache_path))
write_env(env_cache_path)

# load it
env = load_env(self.env_cache_path)
env = load_env(env_cache_path)

else:
# use a temp file
Expand Down
9 changes: 6 additions & 3 deletions law/contrib/cms/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ def checksum(self):
return self.custom_checksum

if self._checksum is None:
cmd = [rel_path(__file__, "scripts", "cmssw_checksum.sh"), self.get_cmssw_path()]
cmd = [
rel_path(__file__, "scripts", "cmssw_checksum.sh"),
get_path(self.get_cmssw_path()),
]
if self.exclude != NO_STR:
cmd += [self.exclude]
cmd = quote_cmd(cmd)
Expand All @@ -72,7 +75,7 @@ def checksum(self):
return self._checksum

def output(self):
base = os.path.basename(self.get_cmssw_path())
base = os.path.basename(get_path(self.get_cmssw_path()))
if self.checksum:
base += "{}.".format(self.checksum)
base = os.path.abspath(os.path.expandvars(os.path.expanduser(base)))
Expand All @@ -87,7 +90,7 @@ def run(self):
def bundle(self, dst_path):
cmd = [
rel_path(__file__, "scripts", "bundle_cmssw.sh"),
self.get_cmssw_path(),
get_path(self.get_cmssw_path()),
get_path(dst_path),
]
if self.exclude != NO_STR:
Expand Down
13 changes: 9 additions & 4 deletions law/contrib/cms/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
from abc import abstractmethod
from collections import OrderedDict

import six

import law
from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile
from law.target.file import get_path
from law.target.file import get_path, get_scheme, remove_scheme, FileSystemDirectoryTarget
from law.target.local import LocalDirectoryTarget
from law.task.proxy import ProxyCommand
from law.util import no_value, law_src_path, merge_dicts, DotDict, human_duration
Expand Down Expand Up @@ -242,8 +240,15 @@ def crab_work_area(self):
# when job files are cleaned, try to use the output directory when local
if self.workflow_proxy.job_file_factory and self.workflow_proxy.job_file_factory.cleanup:
out_dir = self.crab_output_directory()
if isinstance(out_dir, six.string_types) or isinstance(out_dir, LocalDirectoryTarget):
# when local, return the directory
if isinstance(out_dir, LocalDirectoryTarget):
return out_dir
# when not a target and no remote scheme, return the directory
if (
not isinstance(out_dir, FileSystemDirectoryTarget) and
get_scheme(out_dir) in (None, "file")
):
return remove_scheme(out_dir)

# relative to the job file directory
return ""
Expand Down
3 changes: 1 addition & 2 deletions law/contrib/coffea/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,4 @@ def load(cls, path, *args, **kwargs):
def dump(cls, path, out, *args, **kwargs):
from coffea.util import save

path = get_path(path)
save(out, path, *args, **kwargs)
save(out, get_path(path), *args, **kwargs)
8 changes: 5 additions & 3 deletions law/contrib/docker/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ def load_env(target):
)

# load the env when the cache file is configured and existing
if self.env_cache_path and self.env_cache_path:
return load_env(LocalFileTarget(self.env_cache_path))
if self.env_cache_path:
env_cache_target = LocalFileTarget(self.env_cache_path)
if env_cache_target.exists():
return load_env(env_cache_target)

tmp = LocalFileTarget(is_tmp=".env")
tmp.touch()
Expand Down Expand Up @@ -93,7 +95,7 @@ def load_env(target):

# copy to the cache path when configured
if self.env_cache_path:
tmp.copy_to_local(self.env_cache_path)
tmp.copy_to_local(env_cache_target)

# load the env
env = load_env(tmp)
Expand Down
11 changes: 6 additions & 5 deletions law/contrib/gfal/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def remove(self, path, base=None, silent=True, **kwargs):

# the directory is not empty, so there is no other way than deleting contents recursively
# first, and then removing the directory itself
path = str(path)
for elem in self.listdir(path, base=base, retries=0):
self.remove(os.path.join(path, elem), base=base, silent=silent, retries=0)

Expand Down Expand Up @@ -363,7 +364,7 @@ class GFALOperationError(RetryException):

def __init__(self, uri, exc=None):
# store uri and scheme
self.uri = uri
self.uri = str(uri)
self.scheme = get_scheme(uri)

# get the original error objects and find the error reason
Expand Down Expand Up @@ -561,8 +562,8 @@ class GFALError_filecopy(GFALOperationError):

def __init__(self, src_uri, dst_uri, exc=None):
# store uri and scheme
self.src_uri = src_uri
self.dst_uri = dst_uri
self.src_uri = str(src_uri)
self.dst_uri = str(dst_uri)
self.src_scheme = get_scheme(src_uri)
self.dst_scheme = get_scheme(dst_uri)

Expand Down Expand Up @@ -691,8 +692,8 @@ def _get_reason(cls, msg, src_uri, dst_uri, src_scheme, dst_scheme):
elif (src_scheme, dst_scheme) in (("dav", "root"), ("davs", "root")):
# it appears that there is a bug in gfal when copying via davix to xrootd in that
# the full dst path is repeated, e.g. "root://url.tld:1090/pnfs/.../root://url..."
# which causes weird behavior, and until this issue persists, there should be no error
# parsing in law
# which causes weird behavior, and as long as this issue persists, there should be no
# error parsing in law
pass

elif (src_scheme, dst_scheme) in (("dav", "srm"), ("davs", "srm")):
Expand Down
Loading

0 comments on commit dca607b

Please sign in to comment.