Skip to content

Commit

Permalink
FIX: Ensure output_dir is not recalculated
Browse files Browse the repository at this point in the history
  • Loading branch information
mgxd committed Apr 1, 2022
1 parent 36ad2df commit af0dff8
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def _modify_inputs(self):
self.inputs = attr.evolve(self.inputs, **modified_inputs)
return orig_inputs

def _populate_filesystem(self):
def _populate_filesystem(self, checksum, output_dir):
"""
Invoked immediately after the lockfile is generated, this function:
- Creates the cache file
Expand All @@ -475,16 +475,18 @@ def _populate_filesystem(self):
# adding info file with the checksum in case the task was cancelled
# and the lockfile has to be removed
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
json.dump({"checksum": self.checksum}, jsonfile)
if not self.can_resume and self.output_dir.exists():
shutil.rmtree(self.output_dir)
self.output_dir.mkdir(parents=False, exist_ok=self.can_resume)
json.dump({"checksum": checksum}, jsonfile)
if not self.can_resume and output_dir.exists():
shutil.rmtree(output_dir)
output_dir.mkdir(parents=False, exist_ok=self.can_resume)

def _run(self, rerun=False, **kwargs):
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()

lockfile = self.cache_dir / (self.checksum + ".lock")
checksum = self.checksum
output_dir = self.output_dir
lockfile = self.cache_dir / (checksum + ".lock")
# Eagerly retrieve cached - see scenarios in __init__()
self.hooks.pre_run(self)
with SoftFileLock(lockfile):
Expand All @@ -493,27 +495,26 @@ def _run(self, rerun=False, **kwargs):
if result is not None and not result.errored:
return result
cwd = os.getcwd()
self._populate_filesystem()
self._populate_filesystem(checksum, output_dir)
orig_inputs = self._modify_inputs()
# the output dir can be changed by _run_task (but should it??)
orig_outdir = self.output_dir
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
self.audit.start_audit(odir=self.output_dir)
self.audit.start_audit(odir=output_dir)
try:
self.audit.monitor()
self._run_task()
result.output = self._collect_outputs(output_dir=orig_outdir)
result.output = self._collect_outputs(output_dir=output_dir)
except Exception:
etype, eval, etr = sys.exc_info()
traceback = format_exception(etype, eval, etr)
record_error(self.output_dir, error=traceback)
record_error(output_dir, error=traceback)
result.errored = True
raise
finally:
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result)
save(orig_outdir, result=result, task=self)
save(output_dir, result=result, task=self)
self.output_ = None
# removing the additional file with the chcksum
(self.cache_dir / f"{self.uid}_info.json").unlink()
Expand Down Expand Up @@ -1056,35 +1057,35 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
)
# creating connections that were defined after adding tasks to the wf
self._connect_and_propagate_to_tasks()
lockfile = self.cache_dir / (self.checksum + ".lock")
output_dir = self.output_dir
checksum = self.checksum
lockfile = self.cache_dir / (checksum + ".lock")
self.hooks.pre_run(self)
async with PydraFileLock(lockfile):
if not (rerun or self.task_rerun):
result = self.result()
if result is not None and not result.errored:
return result
cwd = os.getcwd()
self._populate_filesystem()
# the output dir can be changed by _run_task (but should it??)
orig_outdir = self.output_dir
self._populate_filesystem(checksum, output_dir)
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
self.audit.start_audit(odir=self.output_dir)
self.audit.start_audit(odir=output_dir)
try:
self.audit.monitor()
await self._run_task(submitter, rerun=rerun)
result.output = self._collect_outputs()
except Exception:
etype, eval, etr = sys.exc_info()
traceback = format_exception(etype, eval, etr)
record_error(self.output_dir, error=traceback)
record_error(output_dir, error=traceback)
result.errored = True
self._errored = True
raise
finally:
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result=result)
save(orig_outdir, result=result, task=self)
save(output_dir, result=result, task=self)
# removing the additional file with the chcksum
(self.cache_dir / f"{self.uid}_info.json").unlink()
os.chdir(cwd)
Expand Down

0 comments on commit af0dff8

Please sign in to comment.