Skip to content

Commit

Permalink
Merge pull request #246 from djarecka/mnt/memory_tojob
Browse files Browse the repository at this point in the history
moving from deepcopy to pickle files in to_job method
  • Loading branch information
djarecka authored Jun 2, 2020
2 parents f042738 + d17b593 commit 63d2bd6
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 126 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ matrix:
allow_failures:
- python: 3.7
env: INSTALL_DEPENDS="pip==10.0.1 setuptools==30.3.0"
- python: 3.7
env:
- INSTALL_TYPE="develop"
- CHECK_TYPE="test_dask"



before_install:
Expand Down
4 changes: 2 additions & 2 deletions pydra/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def pytest_addoption(parser):
def pytest_generate_tests(metafunc):
if "plugin_dask_opt" in metafunc.fixturenames:
if bool(shutil.which("sbatch")):
Plugins = ["cf", "slurm"]
Plugins = ["slurm"]
else:
Plugins = ["cf"]
if metafunc.config.getoption("dask"):
Expand All @@ -19,7 +19,7 @@ def pytest_generate_tests(metafunc):
if metafunc.config.getoption("dask"):
Plugins = []
elif bool(shutil.which("sbatch")):
Plugins = ["cf", "slurm"]
Plugins = ["slurm"]
else:
Plugins = ["cf"]
metafunc.parametrize("plugin", Plugins)
19 changes: 8 additions & 11 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from pathlib import Path
import typing as ty
from copy import deepcopy
from copy import deepcopy, copy

import cloudpickle as cp
from filelock import SoftFileLock
Expand Down Expand Up @@ -503,16 +503,13 @@ def get_input_el(self, ind):
inputs_dict = {inp: getattr(self.inputs, inp) for inp in self.input_names}
return None, inputs_dict

def to_job(self, ind):
"""Run interface one element generated from node_state."""
# logger.debug("Run interface el, name={}, ind={}".format(self.name, ind))
el = deepcopy(self)
el.state = None
# dj might be needed
# el._checksum = None
_, inputs_dict = self.get_input_el(ind)
el.inputs = attr.evolve(el.inputs, **inputs_dict)
return el
def pickle_task(self):
""" Pickling the tasks with full inputs"""
pkl_files = self.cache_dir / "pkl_files"
pkl_files.mkdir(exist_ok=True, parents=True)
task_main_path = pkl_files / f"{self.name}_{self.checksum}_task.pklz"
save(task_path=pkl_files, task=self, name_prefix=f"{self.name}_{self.checksum}")
return task_main_path

@property
def done(self):
Expand Down
168 changes: 111 additions & 57 deletions pydra/engine/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
import attr
import cloudpickle as cp
from pathlib import Path
from filelock import SoftFileLock
import os
import sys
from hashlib import sha256
import subprocess as sp
import getpass
import uuid
from time import strftime
from traceback import format_exception

from .specs import Runtime, File, Directory, attr_fields

from .specs import Runtime, File, Directory, attr_fields, Result
from .helpers_file import hash_file, hash_dir, copyfile, is_existing_file


Expand Down Expand Up @@ -94,7 +100,7 @@ def load_result(checksum, cache_locations):
return None


def save(task_path: Path, result=None, task=None):
def save(task_path: Path, result=None, task=None, name_prefix=None):
"""
Save a :class:`~pydra.engine.core.TaskBase` object and/or results.
Expand All @@ -106,20 +112,28 @@ def save(task_path: Path, result=None, task=None):
Result to pickle and write
task : :class:`~pydra.engine.core.TaskBase`
Task to pickle and write
"""

if task is None and result is None:
raise ValueError("Nothing to be saved")

if not isinstance(task_path, Path):
task_path = Path(task_path)
task_path.mkdir(parents=True, exist_ok=True)
if result:
if Path(task_path).name.startswith("Workflow"):
# copy files to the workflow directory
result = copyfile_workflow(wf_path=task_path, result=result)
with (task_path / "_result.pklz").open("wb") as fp:
cp.dump(result, fp)
if task:
with (task_path / "_task.pklz").open("wb") as fp:
cp.dump(task, fp)
if name_prefix is None:
name_prefix = ""

lockfile = task_path.parent / (task_path.name + "_save.lock")
with SoftFileLock(lockfile):
if result:
if task_path.name.startswith("Workflow"):
# copy files to the workflow directory
result = copyfile_workflow(wf_path=task_path, result=result)
with (task_path / f"{name_prefix}_result.pklz").open("wb") as fp:
cp.dump(result, fp)
if task:
with (task_path / f"{name_prefix}_task.pklz").open("wb") as fp:
cp.dump(task, fp)


def copyfile_workflow(wf_path, result):
Expand Down Expand Up @@ -221,7 +235,7 @@ def make_klass(spec):
if isinstance(item[1], attr._make._CountingAttr):
newfields[item[0]] = item[1]
else:
newfields[item[0]] = attr.ib(repr=False, type=item[1])
newfields[item[0]] = attr.ib(type=item[1])
else:
if (
any([isinstance(ii, attr._make._CountingAttr) for ii in item])
Expand Down Expand Up @@ -369,8 +383,33 @@ def create_checksum(name, inputs):

def record_error(error_path, error):
"""Write an error file."""

error_message = str(error)

resultfile = error_path / "_result.pklz"
if not resultfile.exists():
error_message += """\n
When creating this error file, the results file corresponding
to the task could not be found."""

name_checksum = str(error_path.name)
timeofcrash = strftime("%Y%m%d-%H%M%S")
try:
login_name = getpass.getuser()
except KeyError:
login_name = "UID{:d}".format(os.getuid())

full_error = {
"time of crash": timeofcrash,
"login name": login_name,
"name with checksum": name_checksum,
"error message": error,
}

with (error_path / "_error.pklz").open("wb") as fp:
cp.dump(error, fp)
cp.dump(full_error, fp)

return error_path / "_error.pklz"


def get_open_loop():
Expand All @@ -397,49 +436,6 @@ def get_open_loop():
return loop


def create_pyscript(script_path, checksum, rerun=False):
"""
Create standalone script for task execution in a different environment.
Parameters
----------
script_path : :obj:`os.pathlike`
Path to the script.
checksum : str
Task checksum.
Returns
-------
pyscript : :obj:`File`
Execution script
"""
task_pkl = script_path / "_task.pklz"
if not task_pkl.exists() or not task_pkl.stat().st_size:
raise Exception("Missing or empty task!")

content = f"""import cloudpickle as cp
from pathlib import Path
cache_path = Path("{str(script_path)}")
task_pkl = (cache_path / "_task.pklz")
task = cp.loads(task_pkl.read_bytes())
# submit task
task(rerun={rerun})
if not task.result():
raise Exception("Something went wrong")
print("Completed", task.checksum, task)
task_pkl.unlink()
"""
pyscript = script_path / f"pyscript_{checksum}.py"
with pyscript.open("wt") as fp:
fp.writelines(content)
return pyscript


def hash_function(obj):
"""Generate hash of object."""
return sha256(str(obj).encode()).hexdigest()
Expand Down Expand Up @@ -544,3 +540,61 @@ def get_available_cpus():

# Last resort
return os.cpu_count()


def load_and_run(
task_pkl, ind=None, rerun=False, submitter=None, plugin=None, **kwargs
):
"""
loading a task from a pickle file, settings proper input
and running the task
"""
try:
task = load_task(task_pkl=task_pkl, ind=ind)
except Exception as excinfo:
if task_pkl.parent.exists():
etype, eval, etr = sys.exc_info()
traceback = format_exception(etype, eval, etr)
errorfile = record_error(task_pkl.parent, error=traceback)
result = Result(output=None, runtime=None, errored=True)
save(task_pkl.parent, result=result)
raise

resultfile = task.output_dir / "_result.pklz"
try:
task(rerun=rerun, plugin=plugin, submitter=submitter, **kwargs)
except Exception as excinfo:
# creating result and error files if missing
errorfile = task.output_dir / "_error.pklz"
if not resultfile.exists():
etype, eval, etr = sys.exc_info()
traceback = format_exception(etype, eval, etr)
errorfile = record_error(task.output_dir, error=traceback)
result = Result(output=None, runtime=None, errored=True)
save(task.output_dir, result=result)
raise type(excinfo)(
str(excinfo.with_traceback(None)),
f" full crash report is here: {errorfile}",
)
return resultfile


async def load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs):
"""
loading a task from a pickle file, settings proper input
and running the workflow
"""
task = load_task(task_pkl=task_pkl, ind=ind)
await task._run(submitter=submitter, rerun=rerun, **kwargs)


def load_task(task_pkl, ind=None):
""" loading a task from a pickle file, settings proper input for the specific ind"""
if isinstance(task_pkl, str):
task_pkl = Path(task_pkl)
task = cp.loads(task_pkl.read_bytes())
if ind is not None:
_, inputs_dict = task.get_input_el(ind)
task.inputs = attr.evolve(task.inputs, **inputs_dict)
task.state = None
return task
33 changes: 21 additions & 12 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import asyncio
from .workers import SerialWorker, ConcurrentFuturesWorker, SlurmWorker, DaskWorker
from .core import is_workflow
from .helpers import get_open_loop
from .helpers import get_open_loop, load_and_run_async

import logging

Expand Down Expand Up @@ -64,11 +64,21 @@ def __call__(self, runnable, cache_locations=None, rerun=False):

async def submit_workflow(self, workflow, rerun=False):
"""Distribute or initiate workflow execution."""
if workflow.plugin and workflow.plugin != self.plugin:
# dj: this is not tested!!!
await self.worker.run_el(workflow, rerun=rerun)
else:
await workflow._run(self, rerun=rerun)
if is_workflow(workflow):
if workflow.plugin and workflow.plugin != self.plugin:
# dj: this is not tested!!! TODO
await self.worker.run_el(workflow, rerun=rerun)
else:
await workflow._run(self, rerun=rerun)
else: # could be a tuple with paths to pickle files wiith tasks and inputs
ind, wf_main_pkl, wf_orig = workflow
if wf_orig.plugin and wf_orig.plugin != self.plugin:
# dj: this is not tested!!! TODO
await self.worker.run_el(workflow, rerun=rerun)
else:
await load_and_run_async(
task_pkl=wf_main_pkl, ind=ind, submitter=self, rerun=rerun
)

async def submit(self, runnable, wait=False, rerun=False):
"""
Expand Down Expand Up @@ -100,17 +110,16 @@ async def submit(self, runnable, wait=False, rerun=False):
logger.debug(
f"Expanding {runnable} into {len(runnable.state.states_val)} states"
)
task_pkl = runnable.pickle_task()

for sidx in range(len(runnable.state.states_val)):
job = runnable.to_job(sidx)
logger.debug(
f'Submitting runnable {job}{str(sidx) if sidx is not None else ""}'
)
job_tuple = (sidx, task_pkl, runnable)
if is_workflow(runnable):
# job has no state anymore
futures.add(self.submit_workflow(job, rerun=rerun))
futures.add(self.submit_workflow(job_tuple, rerun=rerun))
else:
# tasks are submitted to worker for execution
futures.add(self.worker.run_el(job, rerun=rerun))
futures.add(self.worker.run_el(job_tuple, rerun=rerun))
else:
if is_workflow(runnable):
await self._run_workflow(runnable, rerun=rerun)
Expand Down
Loading

0 comments on commit 63d2bd6

Please sign in to comment.