From dca607be325ab8cd623ab65aeb1b4e39bb6ef025 Mon Sep 17 00:00:00 2001 From: Marcel R Date: Tue, 3 Oct 2023 10:21:40 +0200 Subject: [PATCH] Handle potential pathlib.Path's throughout law, close #167. --- law/config.py | 4 +-- law/contrib/arc/job.py | 14 +++++++--- law/contrib/arc/util.py | 4 +-- law/contrib/arc/workflow.py | 2 +- law/contrib/awkward/formatter.py | 2 ++ law/contrib/cms/job.py | 29 +++++++++++++------- law/contrib/cms/sandbox.py | 9 ++++--- law/contrib/cms/tasks.py | 9 ++++--- law/contrib/cms/workflow.py | 13 ++++++--- law/contrib/coffea/formatter.py | 3 +-- law/contrib/docker/sandbox.py | 8 +++--- law/contrib/gfal/target.py | 11 ++++---- law/contrib/git/__init__.py | 6 ++--- law/contrib/glite/job.py | 14 +++++++--- law/contrib/glite/workflow.py | 2 +- law/contrib/htcondor/job.py | 16 +++++++---- law/contrib/htcondor/workflow.py | 9 ++++--- law/contrib/keras/formatter.py | 18 ++++++++----- law/contrib/lsf/job.py | 16 +++++++---- law/contrib/lsf/workflow.py | 9 ++++--- law/contrib/mercurial/__init__.py | 6 ++--- law/contrib/pyarrow/formatter.py | 9 +++---- law/contrib/pyarrow/util.py | 11 ++++---- law/contrib/root/util.py | 9 +++++-- law/contrib/singularity/sandbox.py | 8 +++--- law/contrib/slurm/job.py | 14 +++++++--- law/contrib/slurm/workflow.py | 9 ++++--- law/contrib/tasks/__init__.py | 4 +-- law/contrib/wlcg/util.py | 12 ++++++--- law/decorator.py | 2 ++ law/job/base.py | 20 ++++++++++---- law/sandbox/base.py | 10 +++---- law/sandbox/bash.py | 9 ++++--- law/sandbox/venv.py | 9 ++++--- law/target/collection.py | 8 +++--- law/target/file.py | 21 ++++++++++----- law/target/formatter.py | 24 +++++++---------- law/target/local.py | 10 ++++--- law/target/luigi_shims.py | 2 +- law/target/remote/base.py | 5 +++- law/target/remote/cache.py | 12 ++++++--- law/target/remote/interface.py | 5 ++-- law/task/base.py | 8 ------ law/task/interactive.py | 7 +++-- law/task/proxy.py | 10 ++----- law/util.py | 43 ++++++++++++++++-------------- law/workflow/remote.py | 2 +- 47 files changed, 287 insertions(+), 200 deletions(-) diff --git a/law/config.py b/law/config.py index 9abdc12c..1d664d24 100644 --- a/law/config.py +++ b/law/config.py @@ -30,7 +30,7 @@ def law_home_path(*paths): home = os.getenv("LAW_HOME") or os.path.expandvars(os.path.expanduser("$HOME/.law")) - return os.path.normpath(os.path.join(home, *(str(path) for path in paths))) + return os.path.normpath(os.path.join(home, *map(str, paths))) class Config(ConfigParser): @@ -186,7 +186,7 @@ def __init__(self, config_file="", skip_defaults=False, skip_fallbacks=False, if not skip_fallbacks: config_files += self._config_files for cf in config_files: - cf = os.path.expandvars(os.path.expanduser(cf)) + cf = os.path.expandvars(os.path.expanduser(str(cf))) cf = os.path.normpath(os.path.abspath(cf)) if os.path.isfile(cf): self.config_file = cf diff --git a/law/contrib/arc/job.py b/law/contrib/arc/job.py index 59e46925..f3ca5d20 100644 --- a/law/contrib/arc/job.py +++ b/law/contrib/arc/job.py @@ -17,6 +17,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles +from law.target.file import get_path from law.util import interruptable_popen, make_list, make_unique, quote_cmd from law.logger import get_logger @@ -64,7 +65,7 @@ def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, sil # when this is the case, we have to make the assumption that their input files are all # absolute, or they are relative but all in the same directory chunking = isinstance(job_file, (list, tuple)) - job_files = make_list(job_file) + job_files = list(map(str, make_list(job_file))) job_file_dir = os.path.dirname(os.path.abspath(job_files[0])) job_file_names = [os.path.basename(jf) for jf in job_files] @@ -348,6 +349,7 @@ def create(self, postfix=None, **kwargs): c.output_files.append(c[attr]) # postfix certain output files + c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: c.output_files = [self.postfix_output_file(path, postfix) for path in c.output_files] for attr in ["log", "stdout", "stderr", "custom_log_file"]: @@ -367,7 +369,11 @@ def create(self, postfix=None, **kwargs): # ensure that the executable is an input file, remember the key to access it if c.executable: - executable_keys = [k for k, v in c.input_files.items() if v == c.executable] + executable_keys = [ + k + for k, v in c.input_files.items() + if get_path(v) == get_path(c.executable) + ] if executable_keys: executable_key = executable_keys[0] else: @@ -454,7 +460,7 @@ def prepare_input(f): render_variables = self.linearize_render_variables(c.render_variables) # prepare the job file - job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name), postfix) + job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) # render copied, non-remote input files for key, f in c.input_files.items(): @@ -475,7 +481,7 @@ def prepare_input(f): # prepare the executable when given if c.executable: - c.executable = c.input_files[executable_key].path_job_post_render + c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): diff --git a/law/contrib/arc/util.py b/law/contrib/arc/util.py index 2b0e1b87..518afc99 100644 --- a/law/contrib/arc/util.py +++ b/law/contrib/arc/util.py @@ -48,7 +48,7 @@ def _arcproxy_info(args=None, proxy_file=None, silent=False): if proxy_file is None: proxy_file = get_arcproxy_file() if proxy_file: - proxy_file = os.path.expandvars(os.path.expanduser(proxy_file)) + proxy_file = os.path.expandvars(os.path.expanduser(str(proxy_file))) cmd.extend(["--proxy", proxy_file]) code, out, err = interruptable_popen(quote_cmd(cmd), shell=True, executable="/bin/bash", @@ -138,7 +138,7 @@ def renew_arcproxy(password="", lifetime="8 days", proxy_file=None): args = "--constraint=validityPeriod={}".format(lifetime_seconds) if proxy_file: - proxy_file = os.path.expandvars(os.path.expanduser(proxy_file)) + proxy_file = os.path.expandvars(os.path.expanduser(str(proxy_file))) args += " --proxy={}".format(proxy_file) with tmp_file() as (_, tmp): diff --git a/law/contrib/arc/workflow.py b/law/contrib/arc/workflow.py index 776876e8..b571c3d2 100644 --- a/law/contrib/arc/workflow.py +++ b/law/contrib/arc/workflow.py @@ -133,7 +133,7 @@ def create_job_file(self, job_num, branches): # determine the custom log file uri if set abs_log_file = None if c.custom_log_file: - abs_log_file = os.path.join(c.output_uri, c.custom_log_file) + abs_log_file = os.path.join(str(c.output_uri), c.custom_log_file) # return job and log files return {"job": job_file, "config": c, "log": abs_log_file} diff --git a/law/contrib/awkward/formatter.py b/law/contrib/awkward/formatter.py index dbada2e2..8f4ed33d 100644 --- a/law/contrib/awkward/formatter.py +++ b/law/contrib/awkward/formatter.py @@ -30,6 +30,7 @@ def load(cls, path, *args, **kwargs): if path.endswith((".parquet", ".parq")): import awkward as ak return ak.from_parquet(path, *args, **kwargs) + if path.endswith(".json"): import awkward as ak return ak.from_json(path, *args, **kwargs) @@ -44,6 +45,7 @@ def dump(cls, path, obj, *args, **kwargs): if path.endswith((".parquet", ".parq")): import awkward as ak return ak.to_parquet(obj, path, *args, **kwargs) + if path.endswith(".json"): import awkward as ak return ak.to_json(obj, path, *args, **kwargs) diff --git a/law/contrib/cms/job.py b/law/contrib/cms/job.py index a3b2a523..b0280693 100644 --- a/law/contrib/cms/job.py +++ b/law/contrib/cms/job.py @@ -25,6 +25,7 @@ from law.sandbox.base import Sandbox from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles from law.job.dashboard import BaseJobDashboard +from law.target.file import get_path from law.util import ( DotDict, interruptable_popen, make_list, make_unique, quote_cmd, no_value, rel_path, ) @@ -143,7 +144,7 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us instance = self.instance # get the job file location as the submission command is run it the same directory - job_file_dir, job_file_name = os.path.split(os.path.abspath(job_file)) + job_file_dir, job_file_name = os.path.split(os.path.abspath(str(job_file))) # define the actual submission in a loop to simplify retries while True: @@ -226,7 +227,7 @@ def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_user job_ids = self._job_ids_from_proj_dir(proj_dir) # build the command - cmd = ["crab", "kill", "--dir", proj_dir] + cmd = ["crab", "kill", "--dir", str(proj_dir)] if proxy: cmd += ["--proxy", proxy] if instance: @@ -254,6 +255,7 @@ def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_use job_ids = self._job_ids_from_proj_dir(proj_dir) # just delete the project directory + proj_dir = str(proj_dir) if os.path.isdir(proj_dir): shutil.rmtree(proj_dir) @@ -261,6 +263,7 @@ def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_use def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None, skip_transfers=None, silent=False): + proj_dir = str(proj_dir) log_data = self._parse_log_file(os.path.join(proj_dir, "crab.log")) if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir, log_data=log_data) @@ -411,6 +414,7 @@ def _proj_dir_from_job_file(cls, job_file, cmssw_env): work_area = None request_name = None + job_file = str(job_file) with open(job_file, "r") as f: # fast approach: parse the job file for line in f.readlines(): @@ -457,7 +461,7 @@ def _proj_dir_from_job_file(cls, job_file, cmssw_env): @classmethod def _parse_log_file(cls, log_file): - log_file = os.path.expandvars(os.path.expanduser(log_file)) + log_file = os.path.expandvars(os.path.expanduser(str(log_file))) if not os.path.exists(log_file): return None @@ -481,6 +485,7 @@ def _parse_log_file(cls, log_file): @classmethod def _job_ids_from_proj_dir(cls, proj_dir, log_data=None): # read log data + proj_dir = str(proj_dir) if not log_data: log_data = cls._parse_log_file(os.path.join(proj_dir, "crab.log")) if not log_data or "n_jobs" not in log_data or "task_name" not in log_data: @@ -634,6 +639,7 @@ def create(self, **kwargs): for attr in ["custom_log_file"]: if c[attr] and c[attr] not in c.output_files: c.output_files.append(c[attr]) + c.output_files = list(map(str, c.output_files)) # ensure that all input files are JobInputFile's c.input_files = { @@ -643,7 +649,11 @@ def create(self, **kwargs): # ensure that the executable is an input file, remember the key to access it if c.executable: - executable_keys = [k for k, v in c.input_files.items() if v == c.executable] + executable_keys = [ + k + for k, v in c.input_files.items() + if get_path(v) == get_path(c.executable) + ] if executable_keys: executable_key = executable_keys[0] else: @@ -720,7 +730,7 @@ def prepare_input(f): render_variables = self.linearize_render_variables(c.render_variables) # prepare the job file - job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name)) + job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name))) # render copied input files for key, f in c.input_files.items(): @@ -734,17 +744,18 @@ def prepare_input(f): # prepare the executable when given if c.executable: - c.executable = c.input_files[executable_key].path_job_post_render + c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): os.chmod(path, os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP) # resolve work_area relative to self.dir - if not c.work_area: - c.work_area = self.dir + if c.work_area: + work_area = os.path.expandvars(os.path.expanduser(str(c.work_area))) + c.work_area = os.path.join(self.dir, work_area) else: - c.work_area = os.path.join(self.dir, os.path.expandvars(os.path.expanduser(c.work_area))) + c.work_area = self.dir # General c.crab.General.requestName = c.request_name diff --git a/law/contrib/cms/sandbox.py b/law/contrib/cms/sandbox.py index 9e048103..de57d700 100644 --- a/law/contrib/cms/sandbox.py +++ b/law/contrib/cms/sandbox.py @@ -151,13 +151,14 @@ def load_env(path): # use the cache path if set if self.env_cache_path: + env_cache_path = str(self.env_cache_path) # write it if it does not exist yet - if not os.path.exists(self.env_cache_path): - makedirs(os.path.dirname(self.env_cache_path)) - write_env(self.env_cache_path) + if not os.path.exists(env_cache_path): + makedirs(os.path.dirname(env_cache_path)) + write_env(env_cache_path) # load it - env = load_env(self.env_cache_path) + env = load_env(env_cache_path) else: # use a temp file diff --git a/law/contrib/cms/tasks.py b/law/contrib/cms/tasks.py index 4a9afd14..b2857a3a 100644 --- a/law/contrib/cms/tasks.py +++ b/law/contrib/cms/tasks.py @@ -57,7 +57,10 @@ def checksum(self): return self.custom_checksum if self._checksum is None: - cmd = [rel_path(__file__, "scripts", "cmssw_checksum.sh"), self.get_cmssw_path()] + cmd = [ + rel_path(__file__, "scripts", "cmssw_checksum.sh"), + get_path(self.get_cmssw_path()), + ] if self.exclude != NO_STR: cmd += [self.exclude] cmd = quote_cmd(cmd) @@ -72,7 +75,7 @@ def checksum(self): return self._checksum def output(self): - base = os.path.basename(self.get_cmssw_path()) + base = os.path.basename(get_path(self.get_cmssw_path())) if self.checksum: base += "{}.".format(self.checksum) base = os.path.abspath(os.path.expandvars(os.path.expanduser(base))) @@ -87,7 +90,7 @@ def run(self): def bundle(self, dst_path): cmd = [ rel_path(__file__, "scripts", "bundle_cmssw.sh"), - self.get_cmssw_path(), + get_path(self.get_cmssw_path()), get_path(dst_path), ] if self.exclude != NO_STR: diff --git a/law/contrib/cms/workflow.py b/law/contrib/cms/workflow.py index 547da547..a17837a5 100644 --- a/law/contrib/cms/workflow.py +++ b/law/contrib/cms/workflow.py @@ -12,13 +12,11 @@ from abc import abstractmethod from collections import OrderedDict -import six - import law from law.config import Config from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy from law.job.base import JobArguments, JobInputFile -from law.target.file import get_path +from law.target.file import get_path, get_scheme, remove_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget from law.task.proxy import ProxyCommand from law.util import no_value, law_src_path, merge_dicts, DotDict, human_duration @@ -242,8 +240,15 @@ def crab_work_area(self): # when job files are cleaned, try to use the output directory when local if self.workflow_proxy.job_file_factory and self.workflow_proxy.job_file_factory.cleanup: out_dir = self.crab_output_directory() - if isinstance(out_dir, six.string_types) or isinstance(out_dir, LocalDirectoryTarget): + # when local, return the directory + if isinstance(out_dir, LocalDirectoryTarget): return out_dir + # when not a target and no remote scheme, return the directory + if ( + not isinstance(out_dir, FileSystemDirectoryTarget) and + get_scheme(out_dir) in (None, "file") + ): + return remove_scheme(out_dir) # relative to the job file directory return "" diff --git a/law/contrib/coffea/formatter.py b/law/contrib/coffea/formatter.py index 254ed708..7f4ffd2e 100644 --- a/law/contrib/coffea/formatter.py +++ b/law/contrib/coffea/formatter.py @@ -43,5 +43,4 @@ def load(cls, path, *args, **kwargs): def dump(cls, path, out, *args, **kwargs): from coffea.util import save - path = get_path(path) - save(out, path, *args, **kwargs) + save(out, get_path(path), *args, **kwargs) diff --git a/law/contrib/docker/sandbox.py b/law/contrib/docker/sandbox.py index 21c2c6c7..c67ed848 100644 --- a/law/contrib/docker/sandbox.py +++ b/law/contrib/docker/sandbox.py @@ -56,8 +56,10 @@ def load_env(target): ) # load the env when the cache file is configured and existing - if self.env_cache_path and self.env_cache_path: - return load_env(LocalFileTarget(self.env_cache_path)) + if self.env_cache_path: + env_cache_target = LocalFileTarget(self.env_cache_path) + if env_cache_target.exists(): + return load_env(env_cache_target) tmp = LocalFileTarget(is_tmp=".env") tmp.touch() @@ -93,7 +95,7 @@ def load_env(target): # copy to the cache path when configured if self.env_cache_path: - tmp.copy_to_local(self.env_cache_path) + tmp.copy_to_local(env_cache_target) # load the env env = load_env(tmp) diff --git a/law/contrib/gfal/target.py b/law/contrib/gfal/target.py index 7f37f6e2..70fb1423 100644 --- a/law/contrib/gfal/target.py +++ b/law/contrib/gfal/target.py @@ -270,6 +270,7 @@ def remove(self, path, base=None, silent=True, **kwargs): # the directory is not empty, so there is no other way than deleting contents recursively # first, and then removing the directory itself + path = str(path) for elem in self.listdir(path, base=base, retries=0): self.remove(os.path.join(path, elem), base=base, silent=silent, retries=0) @@ -363,7 +364,7 @@ class GFALOperationError(RetryException): def __init__(self, uri, exc=None): # store uri and scheme - self.uri = uri + self.uri = str(uri) self.scheme = get_scheme(uri) # get the original error objects and find the error reason @@ -561,8 +562,8 @@ class GFALError_filecopy(GFALOperationError): def __init__(self, src_uri, dst_uri, exc=None): # store uri and scheme - self.src_uri = src_uri - self.dst_uri = dst_uri + self.src_uri = str(src_uri) + self.dst_uri = str(dst_uri) self.src_scheme = get_scheme(src_uri) self.dst_scheme = get_scheme(dst_uri) @@ -691,8 +692,8 @@ def _get_reason(cls, msg, src_uri, dst_uri, src_scheme, dst_scheme): elif (src_scheme, dst_scheme) in (("dav", "root"), ("davs", "root")): # it appears that there is a bug in gfal when copying via davix to xrootd in that # the full dst path is repeated, e.g. "root://url.tld:1090/pnfs/.../root://url..." - # which causes weird behavior, and until this issue persists, there should be no error - # parsing in law + # which causes weird behavior, and as long as this issue persists, there should be no + # error parsing in law pass elif (src_scheme, dst_scheme) in (("dav", "srm"), ("davs", "srm")): diff --git a/law/contrib/git/__init__.py b/law/contrib/git/__init__.py index 539095ea..ac77dd7c 100644 --- a/law/contrib/git/__init__.py +++ b/law/contrib/git/__init__.py @@ -59,7 +59,7 @@ def checksum(self): if self._checksum is None: cmd = quote_cmd([ rel_path(__file__, "scripts", "repository_checksum.sh"), - self.get_repo_path(), + get_path(self.get_repo_path()), ]) code, out, _ = interruptable_popen( @@ -76,7 +76,7 @@ def checksum(self): return self._checksum def output(self): - repo_base = os.path.basename(self.get_repo_path()) + repo_base = os.path.basename(get_path(self.get_repo_path())) repo_base = os.path.abspath(os.path.expandvars(os.path.expanduser(repo_base))) return LocalFileTarget("{}.{}.tgz".format(repo_base, self.checksum)) @@ -89,7 +89,7 @@ def run(self): def bundle(self, dst_path): cmd = [ rel_path(__file__, "scripts", "bundle_repository.sh"), - self.get_repo_path(), + get_path(self.get_repo_path()), get_path(dst_path), " ".join(self.exclude_files), " ".join(self.include_files), diff --git a/law/contrib/glite/job.py b/law/contrib/glite/job.py index 14def639..262fdc90 100644 --- a/law/contrib/glite/job.py +++ b/law/contrib/glite/job.py @@ -16,6 +16,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles +from law.target.file import get_path from law.util import interruptable_popen, make_list, make_unique, quote_cmd from law.logger import get_logger @@ -63,7 +64,7 @@ def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3 len(ce), len(delegation_id))) # get the job file location as the submission command is run it the same directory - job_file_dir, job_file_name = os.path.split(os.path.abspath(job_file)) + job_file_dir, job_file_name = os.path.split(os.path.abspath(str(job_file))) # define the actual submission in a loop to simplify retries while True: @@ -310,6 +311,7 @@ def create(self, postfix=None, render_variables=None, **kwargs): c.output_files.append(c[attr]) # postfix certain output files + c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: c.output_files = [ self.postfix_output_file(path, postfix) @@ -327,7 +329,11 @@ def create(self, postfix=None, render_variables=None, **kwargs): # ensure that the executable is an input file, remember the key to access it if c.executable: - executable_keys = [k for k, v in c.input_files.items() if v == c.executable] + executable_keys = [ + k + for k, v in c.input_files.items() + if get_path(v) == get_path(c.executable) + ] if executable_keys: executable_key = executable_keys[0] else: @@ -410,7 +416,7 @@ def prepare_input(f): render_variables = self.linearize_render_variables(c.render_variables) # prepare the job file - job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name), postfix) + job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) # render copied input files for key, f in c.input_files.items(): @@ -425,7 +431,7 @@ def prepare_input(f): # prepare the executable when given if c.executable: - c.executable = c.input_files[executable_key].path_job_post_render + c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): diff --git a/law/contrib/glite/workflow.py b/law/contrib/glite/workflow.py index 0844350f..d38963ae 100644 --- a/law/contrib/glite/workflow.py +++ b/law/contrib/glite/workflow.py @@ -149,7 +149,7 @@ def create_job_file(self, job_num, branches): # determine the custom log file uri if set abs_log_file = None if c.custom_log_file: - abs_log_file = os.path.join(c.output_uri, c.custom_log_file) + abs_log_file = os.path.join(str(c.output_uri), c.custom_log_file) # return job and log files return {"job": job_file, "config": c, "log": abs_log_file} diff --git a/law/contrib/htcondor/job.py b/law/contrib/htcondor/job.py index ab324cd7..92f6b4a1 100644 --- a/law/contrib/htcondor/job.py +++ b/law/contrib/htcondor/job.py @@ -16,6 +16,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles +from law.target.file import get_path from law.util import interruptable_popen, make_list, make_unique, quote_cmd from law.logger import get_logger @@ -83,7 +84,7 @@ def has_initialdir(job_file): return False chunking = isinstance(job_file, (list, tuple)) - job_files = make_list(job_file) + job_files = list(map(str, make_list(job_file))) job_file_dir = None for i, job_file in enumerate(job_files): dirname, basename = os.path.split(job_file) @@ -392,9 +393,10 @@ def create(self, postfix=None, **kwargs): c.output_files.append(c.custom_log_file) # postfix certain output files + c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: skip_postfix_cre = re.compile(r"^(/dev/).*$") - skip_postfix = lambda s: bool(skip_postfix_cre.match(s)) + skip_postfix = lambda s: bool(skip_postfix_cre.match(str(s))) c.output_files = [ path if skip_postfix(path) else self.postfix_output_file(path, postfix) for path in c.output_files @@ -411,7 +413,11 @@ def create(self, postfix=None, **kwargs): # ensure that the executable is an input file, remember the key to access it if c.executable: - executable_keys = [k for k, v in c.input_files.items() if v == c.executable] + executable_keys = [ + k + for k, v in c.input_files.items() + if get_path(v) == get_path(c.executable) + ] if executable_keys: executable_key = executable_keys[0] else: @@ -492,7 +498,7 @@ def prepare_input(f): render_variables = self.linearize_render_variables(c.render_variables) # prepare the job description file - job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name), postfix) + job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) # render copied, non-forwarded input files for key, f in c.input_files.items(): @@ -507,7 +513,7 @@ def prepare_input(f): # prepare the executable when given if c.executable: - c.executable = c.input_files[executable_key].path_job_post_render + c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): diff --git a/law/contrib/htcondor/workflow.py b/law/contrib/htcondor/workflow.py index 0ef298fe..b1ea51b4 100644 --- a/law/contrib/htcondor/workflow.py +++ b/law/contrib/htcondor/workflow.py @@ -12,12 +12,11 @@ from collections import OrderedDict import luigi -import six from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles from law.task.proxy import ProxyCommand -from law.target.file import get_path, get_scheme +from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget from law.parameter import NO_STR from law.util import law_src_path, merge_dicts, DotDict @@ -119,8 +118,10 @@ def create_job_file(self, job_num, branches): # when the output dir is local, we can run within this directory for easier output file # handling and use absolute paths for input files output_dir = task.htcondor_output_directory() - if isinstance(output_dir, six.string_types) and get_scheme(output_dir) in (None, "file"): - output_dir = LocalDirectoryTarget(output_dir) + if not isinstance(output_dir, FileSystemDirectoryTarget): + output_dir = get_path(output_dir) + if get_scheme(output_dir) in (None, "file"): + output_dir = LocalDirectoryTarget(output_dir) output_dir_is_local = isinstance(output_dir, LocalDirectoryTarget) if output_dir_is_local: c.absolute_paths = True diff --git a/law/contrib/keras/formatter.py b/law/contrib/keras/formatter.py index d4d3dea9..277e78a6 100644 --- a/law/contrib/keras/formatter.py +++ b/law/contrib/keras/formatter.py @@ -31,11 +31,15 @@ def dump(cls, path, model, *args, **kwargs): if path.endswith(".json"): with open(path, "w") as f: f.write(model.to_json(*args, **kwargs)) - elif path.endswith((".yml", ".yaml")): + return + + if path.endswith((".yml", ".yaml")): with open(path, "w") as f: f.write(model.to_yaml(*args, **kwargs)) - else: # .hdf5, .h5, bundle - return model.save(path, *args, **kwargs) + return + + # .hdf5, .h5, bundle + return model.save(path, *args, **kwargs) @classmethod def load(cls, path, *args, **kwargs): @@ -47,11 +51,13 @@ def load(cls, path, *args, **kwargs): if path.endswith(".json"): with open(path, "r") as f: return keras.models.model_from_json(f.read(), *args, **kwargs) - elif path.endswith((".yml", ".yaml")): + + if path.endswith((".yml", ".yaml")): with open(path, "r") as f: return keras.models.model_from_yaml(f.read(), *args, **kwargs) - else: # .hdf5, .h5, bundle - return keras.models.load_model(path, *args, **kwargs) + + # .hdf5, .h5, bundle + return keras.models.load_model(path, *args, **kwargs) class KerasWeightsFormatter(Formatter): diff --git a/law/contrib/lsf/job.py b/law/contrib/lsf/job.py index eb3bca80..eceec96f 100644 --- a/law/contrib/lsf/job.py +++ b/law/contrib/lsf/job.py @@ -17,6 +17,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles +from law.target.file import get_path from law.util import interruptable_popen, make_list, make_unique, quote_cmd from law.logger import get_logger @@ -64,7 +65,7 @@ def submit(self, job_file, queue=None, emails=None, retries=0, retry_delay=3, si emails = self.emails # get the job file location as the submission command is run it the same directory - job_file_dir, job_file_name = os.path.split(os.path.abspath(job_file)) + job_file_dir, job_file_name = os.path.split(os.path.abspath(str(job_file))) # build the command cmd = "LSB_JOB_REPORT_MAIL={} bsub".format("Y" if emails else "N") @@ -282,9 +283,10 @@ def create(self, postfix=None, **kwargs): c.output_files.append(c.custom_log_file) # postfix certain output files + c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: skip_postfix_cre = re.compile(r"^(/dev/).*$") - skip_postfix = lambda s: bool(skip_postfix_cre.match(s)) + skip_postfix = lambda s: bool(skip_postfix_cre.match(str(s))) c.output_files = [ path if skip_postfix(path) else self.postfix_output_file(path, postfix) for path in c.output_files @@ -301,7 +303,11 @@ def create(self, postfix=None, **kwargs): # ensure that the executable is an input file, remember the key to access it if c.executable: - executable_keys = [k for k, v in c.input_files.items() if v == c.executable] + executable_keys = [ + k + for k, v in c.input_files.items() + if get_path(v) == get_path(c.executable) + ] if executable_keys: executable_key = executable_keys[0] else: @@ -382,7 +388,7 @@ def prepare_input(f): render_variables = self.linearize_render_variables(c.render_variables) # prepare the job description file - job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name), postfix) + job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) # render copied, non-forwarded input files for key, f in c.input_files.items(): @@ -397,7 +403,7 @@ def prepare_input(f): # prepare the executable when given if c.executable: - c.executable = c.input_files[executable_key].path_job_post_render + c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): diff --git a/law/contrib/lsf/workflow.py b/law/contrib/lsf/workflow.py index 2713261c..58a15811 100644 --- a/law/contrib/lsf/workflow.py +++ b/law/contrib/lsf/workflow.py @@ -12,12 +12,11 @@ from collections import OrderedDict import luigi -import six from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles from law.task.proxy import ProxyCommand -from law.target.file import get_path, get_scheme +from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget from law.parameter import NO_STR from law.util import law_src_path, merge_dicts, DotDict @@ -118,8 +117,10 @@ def create_job_file(self, job_num, branches): # we can use lsf's file stageout only when the output directory is local # otherwise, one should use the stageout_file and stageout manually output_dir = task.lsf_output_directory() - if isinstance(output_dir, six.string_types) and get_scheme(output_dir) in (None, "file"): - output_dir = LocalDirectoryTarget(output_dir) + if not isinstance(output_dir, FileSystemDirectoryTarget): + output_dir = get_path(output_dir) + if get_scheme(output_dir) in (None, "file"): + output_dir = LocalDirectoryTarget(output_dir) output_dir_is_local = isinstance(output_dir, LocalDirectoryTarget) if output_dir_is_local: c.absolute_paths = True diff --git a/law/contrib/mercurial/__init__.py b/law/contrib/mercurial/__init__.py index 2baa1455..e9674ccd 100644 --- a/law/contrib/mercurial/__init__.py +++ b/law/contrib/mercurial/__init__.py @@ -59,7 +59,7 @@ def checksum(self): if self._checksum is None: cmd = quote_cmd([ rel_path(__file__, "scripts", "repository_checksum.sh"), - self.get_repo_path(), + get_path(self.get_repo_path()), ]) code, out, _ = interruptable_popen( @@ -76,7 +76,7 @@ def checksum(self): return self._checksum def output(self): - repo_base = os.path.basename(self.get_repo_path()) + repo_base = os.path.basename(get_path(self.get_repo_path())) repo_base = os.path.abspath(os.path.expandvars(os.path.expanduser(repo_base))) return LocalFileTarget("{}_{}.tgz".format(repo_base, self.checksum)) @@ -87,7 +87,7 @@ def run(self): def bundle(self, dst_path): bundle_script = rel_path(__file__, "scripts", "bundle_repository.sh") - cmd = [bundle_script, self.get_repo_path(), get_path(dst_path)] + cmd = [bundle_script, get_path(self.get_repo_path()), get_path(dst_path)] cmd += [" ".join(self.exclude_files)] cmd += [" ".join(self.include_files)] diff --git a/law/contrib/pyarrow/formatter.py b/law/contrib/pyarrow/formatter.py index bc2f87ad..01def647 100644 --- a/law/contrib/pyarrow/formatter.py +++ b/law/contrib/pyarrow/formatter.py @@ -27,8 +27,7 @@ def accepts(cls, path, mode): def load(cls, path, *args, **kwargs): import pyarrow.parquet as pq - path = get_path(path) - return pq.ParquetFile(path, *args, **kwargs) + return pq.ParquetFile(get_path(path), *args, **kwargs) class ParquetTableFormatter(Formatter): @@ -43,12 +42,10 @@ def accepts(cls, path, mode): def load(cls, path, *args, **kwargs): import pyarrow.parquet as pq - path = get_path(path) - return pq.read_table(path, *args, **kwargs) + return pq.read_table(get_path(path), *args, **kwargs) @classmethod def dump(cls, path, obj, *args, **kwargs): import pyarrow.parquet as pq - path = get_path(path) - return pq.write_table(obj, path, *args, **kwargs) + return pq.write_table(obj, get_path(path), *args, **kwargs) diff --git a/law/contrib/pyarrow/util.py b/law/contrib/pyarrow/util.py index 6d2d7d51..13547f1a 100644 --- a/law/contrib/pyarrow/util.py +++ b/law/contrib/pyarrow/util.py @@ -9,8 +9,7 @@ import os import shutil -import six - +from law.target.file import FileSystemFileTarget from law.target.local import LocalFileTarget, LocalDirectoryTarget from law.util import map_verbose, human_bytes @@ -39,7 +38,7 @@ def merge_parquet_files(src_paths, dst_path, force=True, callback=None, writer_o callback = lambda i: None # prepare paths - abspath = lambda p: os.path.abspath(os.path.expandvars(os.path.expanduser(p))) + abspath = lambda p: os.path.abspath(os.path.expandvars(os.path.expanduser(str(p)))) src_paths = list(map(abspath, src_paths)) dst_path = abspath(dst_path) @@ -93,16 +92,16 @@ def merge_parquet_task(task, inputs, output, local=False, cwd=None, force=True, *writer_opts* and *copy_single* are forwarded to :py:func:`merge_parquet_files` which is used internally for the actual merging. """ - abspath = lambda path: os.path.abspath(os.path.expandvars(os.path.expanduser(path))) + abspath = lambda path: os.path.abspath(os.path.expandvars(os.path.expanduser(str(path)))) # ensure inputs are targets inputs = [ - LocalFileTarget(abspath(inp)) if isinstance(inp, six.string_types) else inp + inp if isinstance(inp, FileSystemFileTarget) else LocalFileTarget(abspath(inp)) for inp in inputs ] # ensure output is a target - if isinstance(output, six.string_types): + if not isinstance(output, FileSystemFileTarget): output = LocalFileTarget(abspath(output)) def merge(inputs, output): diff --git a/law/contrib/root/util.py b/law/contrib/root/util.py index 0832c2f3..b0ebf5ab 100644 --- a/law/contrib/root/util.py +++ b/law/contrib/root/util.py @@ -11,6 +11,7 @@ import six +from law.target.file import FileSystemFileTarget from law.target.local import LocalFileTarget, LocalDirectoryTarget from law.util import map_verbose, make_list, interruptable_popen, human_bytes, quote_cmd @@ -57,16 +58,20 @@ def hadd_task(task, inputs, output, cwd=None, local=False, force=True, hadd_args localized. When *force* is *True*, any existing output file is overwritten. *hadd_args* can be a sequence of additional arguments that are added to the hadd command. """ - abspath = lambda path: os.path.abspath(os.path.expandvars(os.path.expanduser(path))) + abspath = lambda path: os.path.abspath(os.path.expandvars(os.path.expanduser(str(path)))) # ensure inputs are targets + inputs = [ + inp if isinstance(inp, FileSystemFileTarget) else LocalFileTarget(abspath(inp)) + for inp in inputs + ] inputs = [ LocalFileTarget(abspath(inp)) if isinstance(inp, six.string_types) else inp for inp in inputs ] # ensure output is a target - if isinstance(output, six.string_types): + if not isinstance(output, FileSystemFileTarget): output = LocalFileTarget(abspath(output)) # default cwd diff --git a/law/contrib/singularity/sandbox.py b/law/contrib/singularity/sandbox.py index de63e37e..f3ad823b 100644 --- a/law/contrib/singularity/sandbox.py +++ b/law/contrib/singularity/sandbox.py @@ -50,8 +50,10 @@ def load_env(target): ) # load the env when the cache file is configured and existing - if self.env_cache_path and self.env_cache_path: - return load_env(LocalFileTarget(self.env_cache_path)) + if self.env_cache_path: + env_cache_target = LocalFileTarget(self.env_cache_path) + if env_cache_target.exists(): + return load_env(env_cache_target) # create tmp dir and file tmp_dir = LocalDirectoryTarget(is_tmp=True) @@ -101,7 +103,7 @@ def load_env(target): # copy to the cache path when configured if self.env_cache_path: - tmp.copy_to_local(self.env_cache_path) + tmp.copy_to_local(env_cache_target) # load the env env = load_env(tmp) diff --git a/law/contrib/slurm/job.py b/law/contrib/slurm/job.py index e2ddc290..e45fdc40 100644 --- a/law/contrib/slurm/job.py +++ b/law/contrib/slurm/job.py @@ -15,6 +15,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile +from law.target.file import get_path from law.util import interruptable_popen, make_list, quote_cmd from law.logger import get_logger @@ -57,7 +58,7 @@ def submit(self, job_file, partition=None, retries=0, retry_delay=3, silent=Fals partition = self.partition # get the job file location as the submission command is run it the same directory - job_file_dir, job_file_name = os.path.split(os.path.abspath(job_file)) + job_file_dir, job_file_name = os.path.split(os.path.abspath(str(job_file))) # build the command cmd = ["sbatch"] @@ -336,6 +337,7 @@ def create(self, postfix=None, **kwargs): raise ValueError("shell must not be empty") # postfix certain output files + c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: skip_postfix_cre = re.compile(r"^(/dev/).*$") skip_postfix = lambda s: bool(skip_postfix_cre.match(s)) @@ -351,7 +353,11 @@ def create(self, postfix=None, **kwargs): # ensure that the executable is an input file, remember the key to access it if c.executable: - executable_keys = [k for k, v in c.input_files.items() if v == c.executable] + executable_keys = [ + k + for k, v in c.input_files.items() + if get_path(v) == get_path(c.executable) + ] if executable_keys: executable_key = executable_keys[0] else: @@ -432,7 +438,7 @@ def prepare_input(f): render_variables = self.linearize_render_variables(c.render_variables) # prepare the job description file - job_file = self.postfix_input_file(os.path.join(c.dir, c.file_name), postfix) + job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) # render copied input files for key, f in c.input_files.items(): @@ -447,7 +453,7 @@ def prepare_input(f): # prepare the executable when given if c.executable: - c.executable = c.input_files[executable_key].path_sub_rel + c.executable = get_path(c.input_files[executable_key].path_sub_rel) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): diff --git a/law/contrib/slurm/workflow.py b/law/contrib/slurm/workflow.py index c4b51bec..20dc4c8e 100644 --- a/law/contrib/slurm/workflow.py +++ b/law/contrib/slurm/workflow.py @@ -12,12 +12,11 @@ from collections import OrderedDict import luigi -import six from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy from law.job.base import JobArguments, JobInputFile from law.task.proxy import ProxyCommand -from law.target.file import get_path, get_scheme +from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget from law.target.local import LocalDirectoryTarget from law.parameter import NO_STR from law.util import law_src_path, merge_dicts, DotDict @@ -117,8 +116,10 @@ def create_job_file(self, job_num, branches): # when the output dir is local, we can run within this directory for easier output file # handling and use absolute paths for input files output_dir = task.slurm_output_directory() - if isinstance(output_dir, six.string_types) and get_scheme(output_dir) in (None, "file"): - output_dir = LocalDirectoryTarget(output_dir) + if not isinstance(output_dir, FileSystemDirectoryTarget): + output_dir = get_path(output_dir) + if get_scheme(output_dir) in (None, "file"): + output_dir = LocalDirectoryTarget(output_dir) output_dir_is_local = isinstance(output_dir, LocalDirectoryTarget) if output_dir_is_local: c.absolute_paths = True diff --git a/law/contrib/tasks/__init__.py b/law/contrib/tasks/__init__.py index 3071414f..c4156b2d 100644 --- a/law/contrib/tasks/__init__.py +++ b/law/contrib/tasks/__init__.py @@ -78,8 +78,8 @@ def get_source_target(self): # when self.source_path is set, return a target around it # otherwise assume self.requires() returns a task with a single local target if self.source_path not in (NO_STR, None): - source_path = os.path.abspath(os.path.expandvars(os.path.expanduser(self.source_path))) - return LocalFileTarget(source_path) + source_path = os.path.expandvars(os.path.expanduser(str(self.source_path))) + return LocalFileTarget(os.path.abspath(source_path)) return self.input() @abstractmethod diff --git a/law/contrib/wlcg/util.py b/law/contrib/wlcg/util.py index 99e2b1f0..ecb0d6c5 100644 --- a/law/contrib/wlcg/util.py +++ b/law/contrib/wlcg/util.py @@ -59,6 +59,7 @@ def get_usercert_subject(usercert=None): # get the user certificate file if usercert is None: usercert = get_usercert() + usercert = str(usercert) if not os.path.exists(usercert): raise Exception("usercert does not exist at '{}'".format(usercert)) @@ -90,7 +91,7 @@ def _vomsproxy_info(args=None, proxy_file=None, silent=False): if proxy_file is None: proxy_file = get_vomsproxy_file() if proxy_file: - proxy_file = os.path.expandvars(os.path.expanduser(proxy_file)) + proxy_file = os.path.expandvars(os.path.expanduser(str(proxy_file))) cmd.extend(["--file", proxy_file]) code, out, err = interruptable_popen(quote_cmd(cmd), shell=True, executable="/bin/bash", @@ -201,6 +202,7 @@ def renew_vomsproxy( # when proxy_file is None, get the default if proxy_file is None: proxy_file = get_vomsproxy_file() + proxy_file = str(proxy_file) # build the command cmd = [ @@ -216,7 +218,7 @@ def renew_vomsproxy( # run it, depending on whether a password file is given silent_pipe = subprocess.PIPE if silent else None if password_file: - password_file = os.path.expandvars(os.path.expanduser(password_file)) + password_file = os.path.expandvars(os.path.expanduser(str(password_file))) cmd = "cat \"{}\" | {}".format(password_file, quote_cmd(cmd)) code = interruptable_popen(cmd, shell=True, executable="/bin/bash", stdout=silent_pipe, stderr=silent_pipe)[0] @@ -249,7 +251,7 @@ def delegate_vomsproxy_glite(endpoint, proxy_file=None, stdout=None, stderr=None # get the proxy file if not proxy_file: proxy_file = get_vomsproxy_file() - proxy_file = os.path.expandvars(os.path.expanduser(proxy_file)) + proxy_file = os.path.expandvars(os.path.expanduser(str(proxy_file))) if not os.path.exists(proxy_file): raise Exception("proxy file '{}' does not exist".format(proxy_file)) @@ -347,8 +349,10 @@ def delegate_myproxy( # prepare arguments if not userkey: userkey = get_userkey() + userkey = str(userkey) if not usercert: usercert = get_usercert() + usercert = str(usercert) if not username: username = get_vomsproxy_identity(silent=True) or get_usercert_subject() if encode_username: @@ -378,7 +382,7 @@ def delegate_myproxy( # run it, depending on whether a password file is given silent_pipe = subprocess.PIPE if silent else None if password_file: - password_file = os.path.expandvars(os.path.expanduser(password_file)) + password_file = os.path.expandvars(os.path.expanduser(str(password_file))) cmd = "{}cat \"{}\" | {} -S".format(rfc_export, password_file, quote_cmd(cmd)) code = interruptable_popen(cmd, shell=True, executable="/bin/bash", stdout=silent_pipe, stderr=silent_pipe)[0] diff --git a/law/decorator.py b/law/decorator.py index 80734aa7..ed9b28b9 100644 --- a/law/decorator.py +++ b/law/decorator.py @@ -265,6 +265,8 @@ def log(fn, opts, task, *args, **kwargs): """ _task = get_task(task) log = get_param(_task.log_file, _task.default_log_file) + if log and not isinstance(log, LocalFileTarget): + log = str(log) if log == "-" or not log: return fn(task, *args, **kwargs) diff --git a/law/job/base.py b/law/job/base.py index 85b662a4..ed2fa3c4 100644 --- a/law/job/base.py +++ b/law/job/base.py @@ -24,7 +24,8 @@ import six from law.config import Config -from law.target.file import get_scheme +from law.target.file import get_scheme, get_path +from law.target.remote.base import RemoteTarget from law.util import ( colored, make_list, make_tuple, iter_chunks, makedirs, create_hash, empty_context, ) @@ -752,7 +753,7 @@ def __init__(self, dir=None, render_variables=None, custom_log_file=None, mkdtem mkdtemp = True # store the directory, default to the job.job_file_dir config - self.dir = dir or cfg.get_expanded("job", "job_file_dir") + self.dir = str(dir or cfg.get_expanded("job", "job_file_dir")) self.dir = os.path.expandvars(os.path.expanduser(self.dir)) # create the directory @@ -800,6 +801,7 @@ def postfix_file(cls, path, postfix=None, add_hash=False): pattern matches the base name of the file, the associated postfix is applied and the path is returned. You might want to use an ordered dictionary to control the first match. """ + path = str(path) dirname, basename = os.path.split(path) # get the actual postfix @@ -843,7 +845,7 @@ def render_string(cls, s, key, value): """ Renders a string *s* by replacing ``{{key}}`` with *value* and returns it. """ - return s.replace("{{" + key + "}}", value) + return s.replace("{{" + key + "}}", str(value)) @classmethod def linearize_render_variables(cls, render_variables): @@ -909,6 +911,7 @@ def render_file(cls, src, dst, render_variables, postfix=None, silent=True): In case the file content is not readable, the method returns unless *silent* is *False* in which case an exception is raised. """ + src, dst = str(src), str(dst) if not os.path.isfile(src): raise IOError("source file for rendering does not exist: {}".format(src)) @@ -946,6 +949,7 @@ def provide_input(self, src, postfix=None, dir=None, render_variables=None, *True*. """ # create the destination path + src, dir = str(src), dir and str(dir) postfixed_src = self.postfix_input_file(src, postfix=postfix) dst = os.path.join(os.path.realpath(dir or self.dir), os.path.basename(postfixed_src)) @@ -1222,6 +1226,11 @@ def __init__(self, path, copy=None, share=None, forward=None, postfix=None, rend render_job = path.render_job path = path.path + # path must not be a remote file target + if isinstance(path, RemoteTarget): + raise ValueError("{}.path should not point to a remote target: {}".format( + self.__class__.__name__, path)) + # convenience if render is not None and render_local is None and render_job is None: render_local = bool(render) @@ -1259,7 +1268,7 @@ def __init__(self, path, copy=None, share=None, forward=None, postfix=None, rend # store attributes, apply residual defaults # TODO: move to job rendering by default - self.path = str(path) + self.path = os.path.abspath(os.path.expandvars(os.path.expanduser(get_path(path)))) self.copy = True if copy is None else bool(copy) self.share = False if share is None else bool(share) self.forward = False if forward is None else bool(forward) @@ -1321,7 +1330,7 @@ def __eq__(self, other): # check equality via path comparison if isinstance(other, JobInputFile): return self.path == other.path - return self.path == other + return self.path == str(other) @property def is_remote(self): @@ -1357,6 +1366,7 @@ def __init__(self, *args, **kwargs): def _append(self, path): # generate a key by taking the basename of the path and strip the file extension + path = str(path) key = os.path.basename(path).split(".", 1)[0] while key in self: key += "_" diff --git a/law/sandbox/base.py b/law/sandbox/base.py index b9b44fc7..1048d257 100644 --- a/law/sandbox/base.py +++ b/law/sandbox/base.py @@ -152,10 +152,10 @@ def __init__(self, name, task=None, env_cache_path=None): if task and not isinstance(task, SandboxTask): raise TypeError("sandbox task must be a SandboxTask instance, got {}".format(task)) - self.name = name + self.name = str(name) self.task = task self.env_cache_path = ( - os.path.abspath(os.path.expandvars(os.path.expanduser(env_cache_path))) + os.path.abspath(os.path.expandvars(os.path.expanduser(str(env_cache_path)))) if env_cache_path else None ) @@ -289,9 +289,9 @@ def replace(vol, name, repl): return vol if bin_dir: - vol = replace(vol, "BIN", bin_dir) + vol = replace(vol, "BIN", str(bin_dir)) if python_dir: - vol = replace(vol, "PY", python_dir) + vol = replace(vol, "PY", str(python_dir)) return vol @@ -692,6 +692,6 @@ def map_collection(func, collection, **kwargs): def create_staged_target(stage_dir, target): if not isinstance(stage_dir, LocalDirectoryTarget): - stage_dir = LocalDirectoryTarget(stage_dir) + stage_dir = LocalDirectoryTarget(str(stage_dir)) return stage_dir.child(target.unique_basename, type=target.type, **target._copy_kwargs()) diff --git a/law/sandbox/bash.py b/law/sandbox/bash.py index e370fe0e..f0584c86 100644 --- a/law/sandbox/bash.py +++ b/law/sandbox/bash.py @@ -74,13 +74,14 @@ def load_env(path): # use the cache path if set if self.env_cache_path: + env_cache_path = str(self.env_cache_path) # write it if it does not exist yet - if not os.path.exists(self.env_cache_path): - makedirs(os.path.dirname(self.env_cache_path)) - write_env(self.env_cache_path) + if not os.path.exists(env_cache_path): + makedirs(os.path.dirname(env_cache_path)) + write_env(env_cache_path) # load it - env = load_env(self.env_cache_path) + env = load_env(env_cache_path) else: # use a temp file diff --git a/law/sandbox/venv.py b/law/sandbox/venv.py index 89e1928b..09f94cef 100644 --- a/law/sandbox/venv.py +++ b/law/sandbox/venv.py @@ -73,13 +73,14 @@ def load_env(path): # use the cache path if set if self.env_cache_path: + env_cache_path = str(self.env_cache_path) # write it if it does not exist yet - if not os.path.exists(self.env_cache_path): - makedirs(os.path.dirname(self.env_cache_path)) - write_env(self.env_cache_path) + if not os.path.exists(env_cache_path): + makedirs(os.path.dirname(env_cache_path)) + write_env(env_cache_path) # load it - env = load_env(self.env_cache_path) + env = load_env(env_cache_path) else: # use a temp file diff --git a/law/target/collection.py b/law/target/collection.py index 11be1b59..c1e1cd7f 100644 --- a/law/target/collection.py +++ b/law/target/collection.py @@ -252,6 +252,8 @@ def localize(self, *args, **kwargs): tmp_dir = kwargs.get("tmp_dir") if not tmp_dir: tmp_dir = LocalDirectoryTarget(is_tmp=True) + elif not isinstance(tmp_dir, LocalDirectoryTarget): + tmp_dir = LocalDirectoryTarget(str(tmp_dir)) kwargs["tmp_dir"] = tmp_dir # enter localize contexts of all targets @@ -287,10 +289,10 @@ class SiblingFileCollection(SiblingFileCollectionBase): def from_directory(cls, directory, **kwargs): # dir should be a FileSystemDirectoryTarget or a string, in which case it is interpreted as # a local path - if isinstance(directory, six.string_types): - d = LocalDirectoryTarget(directory) - elif isinstance(d, FileSystemDirectoryTarget): + if isinstance(directory, FileSystemDirectoryTarget): d = directory + elif directory: + d = LocalDirectoryTarget(str(directory)) else: raise TypeError("directory must either be a string or a FileSystemDirectoryTarget " "object, got '{}'".format(directory)) diff --git a/law/target/file.py b/law/target/file.py index 70a20892..a3e0b3c7 100644 --- a/law/target/file.py +++ b/law/target/file.py @@ -59,10 +59,10 @@ def __repr__(self): return "{}(name={}, {})".format(self.__class__.__name__, self.name, hex(id(self))) def dirname(self, path): - return os.path.dirname(path) if path != "/" else None + return os.path.dirname(str(path)) if path != "/" else None def basename(self, path): - return os.path.basename(path) if path != "/" else "/" + return os.path.basename(str(path)) if path != "/" else "/" def ext(self, path, n=1): # split the path @@ -483,13 +483,21 @@ def get_path(target): path = getattr(target, "abspath", no_value) if path != no_value: return path + + path = getattr(target, "path", no_value) + if path != no_value: + return path + + if target: + return str(target) + return target def get_scheme(uri): # ftp://path/to/file -> ftp # /path/to/file -> None - m = re.match(r"^(\w+)\:\/\/.*$", uri) + m = re.match(r"^(\w+)\:\/\/.*$", str(uri)) return m.group(1) if m else None @@ -499,13 +507,14 @@ def has_scheme(uri): def add_scheme(path, scheme): # adds a scheme to a path, if it does not already contain one + path = str(path) return "{}://{}".format(scheme.rstrip(":/"), path) if not has_scheme(path) else path def remove_scheme(uri): # ftp://path/to/file -> /path/to/file # /path/to/file -> /path/to/file - return re.sub(r"^(\w+\:\/\/)", "", uri) + return re.sub(r"^(\w+\:\/\/)", "", str(uri)) @contextmanager @@ -523,8 +532,8 @@ def enter(target): manager = target.localize(*args, **kwargs) managers.append(manager) return manager.__enter__() - else: - return target + + return target # localize all targets, maintain the structure localized_targets = map_struct(enter, struct) diff --git a/law/target/formatter.py b/law/target/formatter.py index 6b995f1a..4e46cd0f 100644 --- a/law/target/formatter.py +++ b/law/target/formatter.py @@ -35,7 +35,7 @@ def __new__(metacls, classname, bases, classdict): if cls.name in metacls.formatters: raise ValueError("duplicate formatter name '{}' for class {}".format(cls.name, cls)) - elif cls.name == AUTO_FORMATTER: + if cls.name == AUTO_FORMATTER: raise ValueError("formatter class {} must not be named '{}'".format( cls, AUTO_FORMATTER)) @@ -55,8 +55,7 @@ def get_formatter(name, silent=False): formatter = FormatterRegister.formatters.get(name) if formatter or silent: return formatter - else: - raise Exception("cannot find formatter '{}'".format(name)) + raise Exception("cannot find formatter '{}'".format(name)) def find_formatters(path, mode, silent=True): @@ -69,8 +68,7 @@ def find_formatters(path, mode, silent=True): formatters = [f for f in six.itervalues(FormatterRegister.formatters) if f.accepts(path, mode)] if formatters or silent: return formatters - else: - raise Exception("cannot find any '{}' formatter for {}".format(mode, path)) + raise Exception("cannot find any '{}' formatter for {}".format(mode, path)) def find_formatter(path, mode, name=AUTO_FORMATTER): @@ -81,8 +79,7 @@ def find_formatter(path, mode, name=AUTO_FORMATTER): """ if name == AUTO_FORMATTER: return find_formatters(path, mode, silent=False)[0] - else: - return get_formatter(name, silent=False) + return get_formatter(name, silent=False) class Formatter(six.with_metaclass(FormatterRegister, object)): @@ -290,12 +287,11 @@ def infer_compression(cls, path): path = get_path(path) if path.endswith((".tar.gz", ".tgz")): return "gz" - elif path.endswith((".tar.bz2", ".tbz2", ".bz2")): + if path.endswith((".tar.bz2", ".tbz2", ".bz2")): return "bz2" - elif path.endswith((".tar.xz", ".txz", ".lzma")): + if path.endswith((".tar.xz", ".txz", ".lzma")): return "xz" - else: - return None + return None @classmethod def accepts(cls, path, mode): @@ -360,13 +356,11 @@ class PythonFormatter(Formatter): @classmethod def accepts(cls, path, mode): - path = get_path(path) - return path.endswith(".py") + return get_path(path).endswith(".py") @classmethod def load(cls, path, *args, **kwargs): - path = get_path(path) - return import_file(path, *args, **kwargs) + return import_file(get_path(path), *args, **kwargs) # trailing imports diff --git a/law/target/local.py b/law/target/local.py index 80f4b145..781906a7 100644 --- a/law/target/local.py +++ b/law/target/local.py @@ -95,18 +95,19 @@ def __init__(self, section=None, base=None, **kwargs): ) # set the base - self.base = os.path.abspath(base) + self.base = os.path.abspath(str(base)) super(LocalFileSystem, self).__init__(**kwargs) def _unscheme(self, path): + path = str(path) return remove_scheme(path) if get_scheme(path) == "file" else path def abspath(self, path): path = os.path.expandvars(os.path.expanduser(self._unscheme(path))) # join with the base path - base = os.path.expandvars(os.path.expanduser(self.base)) + base = os.path.expandvars(os.path.expanduser(str(self.base))) path = os.path.join(base, path) return os.path.abspath(path) @@ -198,7 +199,7 @@ def listdir(self, path, pattern=None, type=None, **kwargs): def walk(self, path, max_depth=-1, **kwargs): # mimic os.walk with a max_depth and yield the current depth - search_dirs = [(path, 0)] + search_dirs = [(str(path), 0)] while search_dirs: (search_dir, depth) = search_dirs.pop(0) @@ -247,6 +248,7 @@ def _prepare_dst_dir(self, dst, src=None, perm=None, **kwargs): created when :py:attr:`create_file_dir` is *True*, using *perm* to set the directory permission. The absolute path to *dst* is returned. """ + dst, src = str(dst), src and str(src) if self.isdir(dst): full_dst = os.path.join(dst, os.path.basename(src)) if src else dst @@ -324,7 +326,7 @@ def __init__(self, path=None, fs=LocalFileSystem.default_instance, is_tmp=False, if not path: if not is_tmp: raise Exception("when no target path is defined, is_tmp must be set") - if fs.base != "/": + if str(fs.base) != "/": raise Exception( "when is_tmp is set, the base of the underlying file system must be '/', but " "found '{}'".format(fs.base), diff --git a/law/target/luigi_shims.py b/law/target/luigi_shims.py index aa889b89..17b7dc75 100644 --- a/law/target/luigi_shims.py +++ b/law/target/luigi_shims.py @@ -22,7 +22,7 @@ def __init__(self, **kwargs): class FileSystemTarget(luigi.target.FileSystemTarget, Target): def __init__(self, *args, **kwargs): - path = args[0] if args else kwargs["path"] + path = str(args[0] if args else kwargs["path"]) super(FileSystemTarget, self).__init__(path) diff --git a/law/target/remote/base.py b/law/target/remote/base.py index 43ec1333..3050d4dc 100644 --- a/law/target/remote/base.py +++ b/law/target/remote/base.py @@ -166,6 +166,7 @@ def is_local(self, path): def abspath(self, path): # due to the dynamic definition of remote bases, path is supposed to be already absolute, # so just handle leading and trailing slashes when there is no scheme scheme + path = str(path) return ("/" + path.strip("/")) if not get_scheme(path) else path def uri(self, path, **kwargs): @@ -250,6 +251,7 @@ def mkdir(self, path, perm=None, recursive=True, **kwargs): def listdir(self, path, pattern=None, type=None, **kwargs): # forward to local_fs + path = str(path) if self.is_local(path): return self.local_fs.listdir(path, pattern=pattern, type=type) @@ -300,6 +302,7 @@ def walk(self, path, max_depth=-1, **kwargs): def glob(self, pattern, cwd=None, **kwargs): # forward to local_fs + pattern = str(pattern) if self.is_local(pattern): return self.local_fs.glob(pattern, cwd=cwd) @@ -584,7 +587,7 @@ def path(self): @path.setter def path(self, path): - if os.path.normpath(path).startswith(".."): + if os.path.normpath(str(path)).startswith(".."): raise ValueError("path {} forbidden, surpasses file system root".format(path)) path = self.fs.abspath(path) diff --git a/law/target/remote/cache.py b/law/target/remote/cache.py index 30dc0b4a..bfe7e3e3 100644 --- a/law/target/remote/cache.py +++ b/law/target/remote/cache.py @@ -101,7 +101,7 @@ def __init__(self, fs, root=TMP, cleanup=False, max_size=0, mtime_patience=1.0, name = "{}_{}".format(fs.__class__.__name__, create_hash(fs.base[0])) # create the root dir, handle tmp - root = os.path.expandvars(os.path.expanduser(root)) or self.TMP + root = os.path.expandvars(os.path.expanduser(str(root))) or self.TMP if not os.path.exists(root) and root == self.TMP: cfg = Config.instance() tmp_dir = cfg.get_expanded("target", "tmp_dir") @@ -163,11 +163,12 @@ def _cleanup(self): logger.debug("cleanup RemoteCache at '{}'".format(self.base)) def cache_path(self, rpath): + rpath = str(rpath) basename = "{}_{}".format(create_hash(rpath), os.path.basename(rpath)) return os.path.join(self.base, basename) def _lock_path(self, cpath): - return cpath + self.lock_postfix + return str(cpath) + self.lock_postfix def is_locked_global(self): return os.path.exists(self._global_lock_path) @@ -208,6 +209,7 @@ def _await_global(self, delay=None, max_waits=None, silent=False): return True def _await(self, cpath, delay=None, max_waits=None, silent=False, global_lock=None): + cpath = str(cpath) delay = delay if delay is not None else self.wait_delay max_waits = max_waits if max_waits is not None else self.max_waits _max_waits = max_waits @@ -253,6 +255,7 @@ def _lock_global(self, **kwargs): @contextmanager def _lock(self, cpath, **kwargs): + cpath = str(cpath) lock_path = self._lock_path(cpath) self._await(cpath, **kwargs) @@ -329,6 +332,7 @@ def allocate(self, size): return False def _touch(self, cpath, times=None): + cpath = str(cpath) if os.path.exists(cpath): if user_owns_file(cpath): os.chmod(cpath, self.file_perm) @@ -338,7 +342,7 @@ def touch(self, rpath, times=None): self._touch(self.cache_path(rpath), times=times) def _mtime(self, cpath): - return os.stat(cpath).st_mtime + return os.stat(str(cpath)).st_mtime def mtime(self, rpath): return self._mtime(self.cache_path(rpath)) @@ -352,7 +356,7 @@ def check_mtime(self, rpath, rmtime): def _remove(self, cpath, lock=True): def remove(): try: - os.remove(cpath) + os.remove(str(cpath)) except OSError: pass diff --git a/law/target/remote/interface.py b/law/target/remote/interface.py index d6c095e5..6d19a5d9 100644 --- a/law/target/remote/interface.py +++ b/law/target/remote/interface.py @@ -158,8 +158,9 @@ def __init__(self, base=None, bases=None, retries=0, retry_delay=0, random_base= self.__class__.__name__)) # expand variables in base and bases - self.base = list(map(os.path.expandvars, base)) - self.bases = {k: list(map(os.path.expandvars, b)) for k, b in six.iteritems(bases)} + expand = lambda p: os.path.expandvars(str(p)) + self.base = list(map(expand, base)) + self.bases = {k: list(map(expand, b)) for k, b in six.iteritems(bases)} # store other attributes self.retries = retries diff --git a/law/task/base.py b/law/task/base.py index 8d28fd2a..24d64456 100644 --- a/law/task/base.py +++ b/law/task/base.py @@ -218,14 +218,6 @@ def req_params(cls, inst, _exclude=None, _prefer_cli=None, _skip_task_excludes=F return params - @classmethod - def _bind_super_cls_method(cls, super_meth): - return super_meth.__func__.__get__(cls) - - @classmethod - def _call_super_cls_method(cls, super_meth, *args, **kwargs): - return cls._bind_super_cls_method(super_meth)(*args, **kwargs) - def __init__(self, *args, **kwargs): super(BaseTask, self).__init__(*args, **kwargs) diff --git a/law/task/interactive.py b/law/task/interactive.py index 9fd5fb0a..dbc9fcd7 100644 --- a/law/task/interactive.py +++ b/law/task/interactive.py @@ -57,10 +57,9 @@ def _flatten_output(output, depth): if isinstance(output, (list, tuple, set)) or is_lazy_iterable(output): return [(outp, depth, "{}: ".format(i)) for i, outp in enumerate(output)] - elif isinstance(output, dict): + if isinstance(output, dict): return [(outp, depth, "{}: ".format(k)) for k, outp in six.iteritems(output)] - else: - return [(outp, depth, "") for outp in flatten(output)] + return [(outp, depth, "") for outp in flatten(output)] def _iter_output(output, offset, ind=" "): @@ -448,7 +447,7 @@ def fetch_task_output(task, max_depth=0, mode=None, target_dir=".", include_exte max_depth = int(max_depth) print("fetch task output with max_depth {}".format(max_depth)) - target_dir = os.path.normpath(os.path.abspath(target_dir)) + target_dir = os.path.normpath(os.path.abspath(str(target_dir))) print("target directory is {}".format(target_dir)) makedirs(target_dir) diff --git a/law/task/proxy.py b/law/task/proxy.py index 9ecf570b..62a6b93e 100644 --- a/law/task/proxy.py +++ b/law/task/proxy.py @@ -57,11 +57,7 @@ def __init__( exclude_task_args=exclude_task_args, exclude_global_args=exclude_global_args, ) - self.executable = list( - shlex.split(executable) - if isinstance(executable, str) - else executable, - ) + self.executable = shlex.split(str(executable)) if executable else None def load_args(self, exclude_task_args=None, exclude_global_args=None): args = [] @@ -90,9 +86,7 @@ def add_arg(self, key, value, overwrite=False): self.args.append((key, value)) def build_run_cmd(self, executable=None): - exe = executable or self.executable - exe = list(shlex.split(executable) if isinstance(executable, str) else exe) - + exe = shlex.split(str(executable)) if executable else self.executable return exe + ["run", "{}.{}".format(self.task.__module__, self.task.__class__.__name__)] def build(self, skip_run=False, executable=None): diff --git a/law/util.py b/law/util.py index 52e1f605..cee8dbe2 100644 --- a/law/util.py +++ b/law/util.py @@ -101,17 +101,17 @@ def rel_path(anchor, *paths): Returns a path made of framgment *paths* relativ to an *anchor* path. When *anchor* is a file, its absolute directory is used instead. """ - anchor = os.path.abspath(os.path.expandvars(os.path.expanduser(anchor))) + anchor = os.path.abspath(os.path.expandvars(os.path.expanduser(str(anchor)))) if os.path.exists(anchor) and os.path.isfile(anchor): anchor = os.path.dirname(anchor) - return os.path.normpath(os.path.join(anchor, *paths)) + return os.path.normpath(os.path.join(anchor, *map(str, paths))) def law_src_path(*paths): """ Returns the law installation directory, optionally joined with *paths*. """ - return rel_path(__file__, *paths) + return rel_path(__file__, *map(str, paths)) def law_home_path(*paths): @@ -205,7 +205,7 @@ def import_file(path, attr=None): custom code must be loaded. """ # load the package contents - path = os.path.expandvars(os.path.expanduser(path)) + path = os.path.expandvars(os.path.expanduser(str(path))) pkg = DotDict() with open(path, "r") as f: exec(f.read(), pkg) @@ -1047,9 +1047,9 @@ def which(prog): executable = lambda path: os.path.isfile(path) and os.access(path, os.X_OK) # prog can also be a path - dirname, _ = os.path.split(prog) + dirname, _ = os.path.split(str(prog)) if dirname: - if executable(prog): + if executable(str(prog)): return prog elif "PATH" in os.environ: for search_path in os.environ["PATH"].split(os.pathsep): @@ -1449,6 +1449,7 @@ def copy_no_perm(src, dst): """ Copies a file from *src* to *dst* including meta data except for permission bits. """ + src, dst = str(src), str(dst) shutil.copyfile(src, dst) perm = os.stat(dst).st_mode shutil.copystat(src, dst) @@ -1462,6 +1463,7 @@ def makedirs(path, perm=None): to this value. """ # nothing to do when the directory already exists + path = str(path) if os.path.isdir(path): return @@ -1494,7 +1496,7 @@ def user_owns_file(path, uid=None): """ if uid is None: uid = os.getuid() - path = os.path.expandvars(os.path.expanduser(path)) + path = os.path.expandvars(os.path.expanduser(str(path))) return os.stat(path).st_uid == uid @@ -2125,30 +2127,31 @@ def insert_after(self, after_key, key, value=None): self._insert(after_key, key, value, 1) -def open_compat(*args, **kwargs): +def open_compat(path, *args, **kwargs): """ Polyfill for python's ``open`` factory, returning the plain ``open`` in python 3, and ``io.open`` in python 2 with a patched ``write`` method that internally handles unicode conversion of its first argument. All *args* and *kwargs* are forwarded. """ + path = str(path) + if six.PY3: - return open(*args, **kwargs) + return open(path, *args, **kwargs) - else: - f = io.open(*args, **kwargs) + f = io.open(path, *args, **kwargs) - if f.encoding and f.encoding.lower().replace("-", "") == "utf8": - write_orig = f.write + if f.encoding and f.encoding.lower().replace("-", "") == "utf8": + write_orig = f.write - def write(data, *args, **kwargs): - u = unicode # noqa: F821 - if not isinstance(data, u): - data = u(data) - return write_orig(data, *args, **kwargs) + def write(data, *args, **kwargs): + u = unicode # noqa: F821 + if not isinstance(data, u): + data = u(data) + return write_orig(data, *args, **kwargs) - f.write = write + f.write = write - return f + return f @contextlib.contextmanager diff --git a/law/workflow/remote.py b/law/workflow/remote.py index 38511298..780a1fc5 100644 --- a/law/workflow/remote.py +++ b/law/workflow/remote.py @@ -313,7 +313,7 @@ def get_extra_submission_data(self, job_file, config, log=None): """ extra = {} if log: - extra["log"] = log + extra["log"] = str(log) return extra @property