Skip to content

Commit

Permalink
Handle file collisions for grouped job submissions, fix #183.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Oct 15, 2024
1 parent e17bfd0 commit d60ea71
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 22 deletions.
1 change: 1 addition & 0 deletions law/contrib/cms/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ def prepare_input(f):
src=abs_path,
dir=c.dir,
skip_existing=f.share,
increment_existing=f.increment and not f.share,
)
return abs_path

Expand Down
3 changes: 2 additions & 1 deletion law/contrib/htcondor/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,10 @@ def prepare_input(f):
# copy the file
abs_path = self.provide_input(
src=abs_path,
postfix=c.postfix if not grouped_submission and f.postfix and not f.share else None,
postfix=c.postfix if f.postfix and not f.share and not grouped_submission else None,
dir=c.dir,
skip_existing=f.share,
increment_existing=f.increment and not f.share and grouped_submission,
)
return abs_path

Expand Down
1 change: 1 addition & 0 deletions law/contrib/htcondor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def htcondor_group_wrapper_file(self):
path=rel_path(__file__, "htcondor_wrapper.sh"),
copy=True,
render_local=True,
increment=True,
)

def htcondor_wrapper_file(self):
Expand Down
52 changes: 35 additions & 17 deletions law/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from law.target.file import get_scheme, get_path
from law.target.remote.base import RemoteTarget
from law.util import (
colored, make_list, make_tuple, iter_chunks, makedirs, create_hash, empty_context,
create_random_string,
colored, make_list, make_tuple, iter_chunks, makedirs, create_hash, create_random_string,
increment_path,
)
from law.logger import get_logger

Expand Down Expand Up @@ -980,30 +980,38 @@ def _expand_template_path(cls, path, variables=None):
return path

def provide_input(self, src, postfix=None, dir=None, render_variables=None,
skip_existing=False):
skip_existing=False, increment_existing=True):
"""
Convenience method that copies an input file to a target directory *dir* which defaults to
the :py:attr:`dir` attribute of this instance. The provided file has the same basename,
which is optionally postfixed with *postfix*. Essentially, this method calls
:py:meth:`render_file` when *render_variables* is set, or simply ``shutil.copy2`` otherwise.
If the file to create is already existing, it is overwritten unless *skip_existing* is
*True*.
*True*. If *skip_existing* is *False* but *increment_existing* is *True*, the target path is
incremented when the file already exists.
"""
# create the destination path
src, dir = str(src), dir and str(dir)
postfixed_src = self.postfix_input_file(src, postfix=postfix)
dst = os.path.join(os.path.realpath(dir or self.dir), os.path.basename(postfixed_src))

# thread-safe check for the existince of the file in a thread-safe
context = self.file_locks[dst] if skip_existing else empty_context()
with context:
# create if not existing or if overwriting
if not skip_existing or not os.path.exists(dst):
# provide the file
if render_variables:
self.render_file(src, dst, render_variables, postfix=postfix)
else:
shutil.copy2(src, dst)
# check if the file exists but should be skipped
if skip_existing:
with self.file_locks[dst]:
if os.path.exists(dst):
return dst

# check if the path needs to be incremented
elif increment_existing:
with self.file_locks[dst]:
dst = increment_path(dst)

# provide the file
with self.file_locks[dst]:
if render_variables:
self.render_file(src, dst, render_variables, postfix=postfix)
else:
shutil.copy2(src, dst)

return dst

Expand Down Expand Up @@ -1198,6 +1206,13 @@ class JobInputFile(object):
considered if supported by the submission system (e.g. local ones such as htcondor or
slurm).
.. py:attribute:: increment
type: bool
Whether the file path should be incremented when copied if a file with the same name already
exists in the same submission directory.
.. py:attribute:: postfix
type: bool
Expand Down Expand Up @@ -1253,15 +1268,16 @@ class JobInputFile(object):
basename otherwise. Set only during job file creation.
"""

def __init__(self, path, copy=None, share=None, forward=None, postfix=None, render=None,
render_local=None, render_job=None):
def __init__(self, path, copy=None, share=None, forward=None, increment=None, postfix=None,
render=None, render_local=None, render_job=None):
super(JobInputFile, self).__init__()

# when path is a job file instance itself, use its values instead
if isinstance(path, JobInputFile):
copy = path.copy
share = path.share
forward = path.forward
increment = path.increment
postfix = path.postfix
render_local = path.render_local
render_job = path.render_job
Expand Down Expand Up @@ -1313,6 +1329,7 @@ def __init__(self, path, copy=None, share=None, forward=None, postfix=None, rend
self.copy = True if copy is None else bool(copy)
self.share = False if share is None else bool(share)
self.forward = False if forward is None else bool(forward)
self.increment = False if increment is None else bool(increment)
self.postfix = True if postfix is None else bool(postfix)
self.render_local = True if render_local is None else bool(render_local)
self.render_job = False if render_job is None else bool(render_job)
Expand Down Expand Up @@ -1362,7 +1379,8 @@ def __repr__(self):
return "<{}({}) at {}>".format(
self.__class__.__name__,
", ".join("{}={}".format(attr, getattr(self, attr)) for attr in [
"path", "copy", "share", "forward", "postfix", "render_local", "render_job",
"path", "copy", "share", "forward", "increment", "postfix", "render_local",
"render_job",
]),
hex(id(self)),
)
Expand Down
44 changes: 40 additions & 4 deletions law/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
"is_lazy_iterable", "make_list", "make_tuple", "make_set", "make_unique", "is_nested",
"flatten", "merge_dicts", "unzip", "which", "map_verbose", "map_struct", "mask_struct",
"tmp_file", "perf_counter", "interruptable_popen", "readable_popen", "create_hash",
"create_random_string", "copy_no_perm", "makedirs", "user_owns_file", "iter_chunks",
"human_bytes", "parse_bytes", "human_duration", "parse_duration", "is_file_exists_error",
"send_mail", "DotDict", "ShorthandDict", "open_compat", "patch_object", "join_generators",
"quote_cmd", "escape_markdown", "classproperty", "BaseStream", "TeeStream", "FilteredStream",
"create_random_string", "copy_no_perm", "makedirs", "user_owns_file", "increment_path",
"iter_chunks", "human_bytes", "parse_bytes", "human_duration", "parse_duration",
"is_file_exists_error", "send_mail", "DotDict", "ShorthandDict", "open_compat", "patch_object",
"join_generators", "quote_cmd", "escape_markdown", "classproperty", "BaseStream", "TeeStream",
"FilteredStream",
]


Expand Down Expand Up @@ -1522,6 +1523,41 @@ def user_owns_file(path, uid=None):
return os.stat(path).st_uid == uid


def increment_path(path, n=None):
"""
Takes a file path *path* and returns a new path with a counter appended to the basename. When
*n* is a number (and in particular, not *None*), the counter is increased by that number. When
*n* is *None*, a new counter is determined by checking the directory for existing files with the
same basename. The new path is returned.
"""
path = os.path.abspath(os.path.expandvars(os.path.expanduser(str(path))))
dirname, basename = os.path.split(path)
basename, ext = os.path.splitext(basename)

# check if basename already contains a trailing counter
m = re.match(r"^([^\.]+)_(\d+)$", basename)
counter = 0
if m:
basename = m.group(1)
counter = int(m.group(2))

# helper to determine the incremented path
next_path = lambda i: os.path.join(dirname, "{}_{}{}".format(basename, counter + i, ext))

# when a number is given in n, just increase the counter by that number
if n is not None:
return next_path(n)

# when n is none, perform a full collision handling in the directory
_path = path
i = 0
while True:
if not os.path.exists(_path):
return _path
i += 1
_path = next_path(i)


def iter_chunks(l, size):
"""
Returns a generator containing chunks of *size* of a list, integer or generator *l*. A *size*
Expand Down

0 comments on commit d60ea71

Please sign in to comment.