Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moving from deepcopy to pickle files in to_job method #246

Merged
merged 19 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c5b9a1d
editing to_job, so it doesnt create a deep copy of a task when the ta…
djarecka May 8, 2020
8efa963
removing load_and_run from task and Workflow, creating standalone fun…
djarecka May 29, 2020
adef23f
Merge https://github.com/nipype/pydra into mnt/memory_tojob
djarecka May 30, 2020
fb54adf
simplifying load_and_run, adding a tets
djarecka May 31, 2020
d8d7353
changing pytest setting - running only slurm if sbatch available (jus…
djarecka May 31, 2020
c417698
adjusting slurm worker to the current version of to_job (without deep…
djarecka May 31, 2020
cc07170
small changes in load_and_run, to be able to use in create_pyscript
djarecka May 31, 2020
4149055
simplifying create_pyscript
djarecka May 31, 2020
ca65aac
removing to_job and adding pickle_task - uses helpers.save to save th…
djarecka May 31, 2020
ecfe481
removing create_pyscript, using load_and_run in slurm script
djarecka May 31, 2020
d582ec8
fixing load and run, so it works also for workflows
djarecka Jun 1, 2020
fad92c9
always using lockfile in save
djarecka Jun 1, 2020
485a9f0
adding crashfile to load_and_run
djarecka Jun 1, 2020
3412931
Adding name_prefix to save
djarecka Jun 1, 2020
68996ac
small edit to the test to fix windows
djarecka Jun 1, 2020
50b409a
allowing failure for task, i'm getting Timed out trying to connecttoo…
djarecka Jun 1, 2020
3ede787
removing report_crash and modyfing record_error to be used in load_an…
djarecka Jun 2, 2020
e0b3552
catching error in load_task; returning resultfile instead of task in …
djarecka Jun 2, 2020
d17b593
small changes to make_class, so result is properly formatted, closes …
djarecka Jun 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from pathlib import Path
import typing as ty
from copy import deepcopy
from copy import deepcopy, copy

import cloudpickle as cp
from filelock import SoftFileLock
Expand Down Expand Up @@ -359,6 +359,16 @@ def __call__(self, submitter=None, plugin=None, rerun=False, **kwargs):
res = self._run(rerun=rerun, **kwargs)
return res

def _load_and_run(self, ind, task_pkl, input_pkl, rerun=False, **kwargs):
""" loading the task and inputs from pickle files,
settings proper input for specific index before running the task
"""
task_orig = cp.loads(input_pkl.read_bytes())
task = cp.loads(task_pkl.read_bytes())
_, inputs_dict = task_orig.get_input_el(ind)
task.inputs = attr.evolve(task.inputs, **inputs_dict)
return task._run(rerun=rerun, **kwargs)

def _run(self, rerun=False, **kwargs):
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()
Expand Down Expand Up @@ -499,15 +509,35 @@ def get_input_el(self, ind):
return None, inputs_dict

def to_job(self, ind):
"""Run interface one element generated from node_state."""
""" Pickling tasks and inputs, so don't have to load the input to teh memory """
djarecka marked this conversation as resolved.
Show resolved Hide resolved
# logger.debug("Run interface el, name={}, ind={}".format(self.name, ind))
el = deepcopy(self)
el.state = None
# dj might be needed
# el._checksum = None
_, inputs_dict = self.get_input_el(ind)
el.inputs = attr.evolve(el.inputs, **inputs_dict)
return el
# copy the task and setting input/state to None

pkl_files = self.cache_dir / "pkl_files"
pkl_files.mkdir(exist_ok=True)
task_path = pkl_files / f"task_{self.name}.pklz"
input_path = pkl_files / f"task_orig_input_{self.name}.pklz"

# the pickle files should be independent on index, so could be saved once only
if ind == 0:
djarecka marked this conversation as resolved.
Show resolved Hide resolved
task_copy = copy(self)
task_copy.state = None
task_copy.inputs = attr.evolve(
self.inputs, **{k: None for k in self.input_names}
)

# saving the task object (no input)
with task_path.open("wb") as fp:
cp.dump(task_copy, fp)

# saving the original task with the full input
# so can be later used to set input to all of the tasks
with input_path.open("wb") as fp:
cp.dump(self, fp)

# index, path to the pkl task, path to the pkl original task with input,
# and self (to be able to check properties when needed)
return (ind, task_path, input_path, self)

@property
def done(self):
Expand Down Expand Up @@ -823,6 +853,18 @@ def create_connections(self, task):
combiner=combiner,
)

async def _load_and_run(
self, ind, task_pkl, input_pkl, submitter=None, rerun=False, **kwargs
):
""" loading the workflow and inputs from pickle files,
settings proper input for specific index before running the workflow
"""
task_orig = cp.loads(input_pkl.read_bytes())
task = cp.loads(task_pkl.read_bytes())
_, inputs_dict = task_orig.get_input_el(ind)
task.inputs = attr.evolve(task.inputs, **inputs_dict)
await task._run(submitter=submitter, rerun=rerun, **kwargs)

async def _run(self, submitter=None, rerun=False, **kwargs):
# self.inputs = dc.replace(self.inputs, **kwargs) don't need it?
checksum = self.checksum
Expand Down
24 changes: 19 additions & 5 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,25 @@ def __call__(self, runnable, cache_locations=None, rerun=False):

async def submit_workflow(self, workflow, rerun=False):
"""Distribute or initiate workflow execution."""
if workflow.plugin and workflow.plugin != self.plugin:
# dj: this is not tested!!!
await self.worker.run_el(workflow, rerun=rerun)
else:
await workflow._run(self, rerun=rerun)
if is_workflow(workflow):
if workflow.plugin and workflow.plugin != self.plugin:
# dj: this is not tested!!! TODO
await self.worker.run_el(workflow, rerun=rerun)
else:
await workflow._run(self, rerun=rerun)
else: # could be a tuple with paths to pickle files wiith tasks and inputs
ind, wf_pkl, input_pkl, wf_orig = workflow
if wf_orig.plugin and wf_orig.plugin != self.plugin:
# dj: this is not tested!!! TODO
await self.worker.run_el(workflow, rerun=rerun)
else:
await wf_orig._load_and_run(
ind=ind,
task_pkl=wf_pkl,
input_pkl=input_pkl,
submitter=self,
rerun=rerun,
)

async def submit(self, runnable, wait=False, rerun=False):
"""
Expand Down
9 changes: 8 additions & 1 deletion pydra/engine/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import concurrent.futures as cf

from .core import TaskBase, is_workflow
from .helpers import create_pyscript, get_available_cpus, read_and_display_async, save

import logging
Expand Down Expand Up @@ -177,7 +178,13 @@ def run_el(self, runnable, rerun=False, **kwargs):

async def exec_as_coro(self, runnable, rerun=False):
"""Run a task (coroutine wrapper)."""
res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
if isinstance(runnable, TaskBase):
res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
else: # it could be tuple that includes pickle files with tasks and inputs
ind, task_pkl, input_pkl, task_orig = runnable
res = await self.loop.run_in_executor(
self.pool, task_orig._load_and_run, ind, task_pkl, input_pkl, rerun
djarecka marked this conversation as resolved.
Show resolved Hide resolved
)
return res

def close(self):
Expand Down