diff --git a/.travis.yml b/.travis.yml index f21066212c..a51146d204 100644 --- a/.travis.yml +++ b/.travis.yml @@ -61,6 +61,11 @@ matrix: allow_failures: - python: 3.7 env: INSTALL_DEPENDS="pip==10.0.1 setuptools==30.3.0" + - python: 3.7 + env: + - INSTALL_TYPE="develop" + - CHECK_TYPE="test_dask" + before_install: diff --git a/pydra/conftest.py b/pydra/conftest.py index 940bf9fad8..48125400e3 100644 --- a/pydra/conftest.py +++ b/pydra/conftest.py @@ -8,7 +8,7 @@ def pytest_addoption(parser): def pytest_generate_tests(metafunc): if "plugin_dask_opt" in metafunc.fixturenames: if bool(shutil.which("sbatch")): - Plugins = ["cf", "slurm"] + Plugins = ["slurm"] else: Plugins = ["cf"] if metafunc.config.getoption("dask"): @@ -19,7 +19,7 @@ def pytest_generate_tests(metafunc): if metafunc.config.getoption("dask"): Plugins = [] elif bool(shutil.which("sbatch")): - Plugins = ["cf", "slurm"] + Plugins = ["slurm"] else: Plugins = ["cf"] metafunc.parametrize("plugin", Plugins) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 178a8f4a51..c4a4940cd8 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -6,7 +6,7 @@ import os from pathlib import Path import typing as ty -from copy import deepcopy +from copy import deepcopy, copy import cloudpickle as cp from filelock import SoftFileLock @@ -503,16 +503,13 @@ def get_input_el(self, ind): inputs_dict = {inp: getattr(self.inputs, inp) for inp in self.input_names} return None, inputs_dict - def to_job(self, ind): - """Run interface one element generated from node_state.""" - # logger.debug("Run interface el, name={}, ind={}".format(self.name, ind)) - el = deepcopy(self) - el.state = None - # dj might be needed - # el._checksum = None - _, inputs_dict = self.get_input_el(ind) - el.inputs = attr.evolve(el.inputs, **inputs_dict) - return el + def pickle_task(self): + """ Pickling the tasks with full inputs""" + pkl_files = self.cache_dir / "pkl_files" + pkl_files.mkdir(exist_ok=True, parents=True) + task_main_path = pkl_files / f"{self.name}_{self.checksum}_task.pklz" + save(task_path=pkl_files, task=self, name_prefix=f"{self.name}_{self.checksum}") + return task_main_path @property def done(self): diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 860d5fcd04..0e8e97c50d 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -4,12 +4,18 @@ import attr import cloudpickle as cp from pathlib import Path +from filelock import SoftFileLock import os import sys from hashlib import sha256 import subprocess as sp +import getpass +import uuid +from time import strftime +from traceback import format_exception -from .specs import Runtime, File, Directory, attr_fields + +from .specs import Runtime, File, Directory, attr_fields, Result from .helpers_file import hash_file, hash_dir, copyfile, is_existing_file @@ -94,7 +100,7 @@ def load_result(checksum, cache_locations): return None -def save(task_path: Path, result=None, task=None): +def save(task_path: Path, result=None, task=None, name_prefix=None): """ Save a :class:`~pydra.engine.core.TaskBase` object and/or results. @@ -106,20 +112,28 @@ def save(task_path: Path, result=None, task=None): Result to pickle and write task : :class:`~pydra.engine.core.TaskBase` Task to pickle and write - """ + if task is None and result is None: raise ValueError("Nothing to be saved") + + if not isinstance(task_path, Path): + task_path = Path(task_path) task_path.mkdir(parents=True, exist_ok=True) - if result: - if Path(task_path).name.startswith("Workflow"): - # copy files to the workflow directory - result = copyfile_workflow(wf_path=task_path, result=result) - with (task_path / "_result.pklz").open("wb") as fp: - cp.dump(result, fp) - if task: - with (task_path / "_task.pklz").open("wb") as fp: - cp.dump(task, fp) + if name_prefix is None: + name_prefix = "" + + lockfile = task_path.parent / (task_path.name + "_save.lock") + with SoftFileLock(lockfile): + if result: + if task_path.name.startswith("Workflow"): + # copy files to the workflow directory + result = copyfile_workflow(wf_path=task_path, result=result) + with (task_path / f"{name_prefix}_result.pklz").open("wb") as fp: + cp.dump(result, fp) + if task: + with (task_path / f"{name_prefix}_task.pklz").open("wb") as fp: + cp.dump(task, fp) def copyfile_workflow(wf_path, result): @@ -221,7 +235,7 @@ def make_klass(spec): if isinstance(item[1], attr._make._CountingAttr): newfields[item[0]] = item[1] else: - newfields[item[0]] = attr.ib(repr=False, type=item[1]) + newfields[item[0]] = attr.ib(type=item[1]) else: if ( any([isinstance(ii, attr._make._CountingAttr) for ii in item]) @@ -369,8 +383,33 @@ def create_checksum(name, inputs): def record_error(error_path, error): """Write an error file.""" + + error_message = str(error) + + resultfile = error_path / "_result.pklz" + if not resultfile.exists(): + error_message += """\n + When creating this error file, the results file corresponding + to the task could not be found.""" + + name_checksum = str(error_path.name) + timeofcrash = strftime("%Y%m%d-%H%M%S") + try: + login_name = getpass.getuser() + except KeyError: + login_name = "UID{:d}".format(os.getuid()) + + full_error = { + "time of crash": timeofcrash, + "login name": login_name, + "name with checksum": name_checksum, + "error message": error, + } + with (error_path / "_error.pklz").open("wb") as fp: - cp.dump(error, fp) + cp.dump(full_error, fp) + + return error_path / "_error.pklz" def get_open_loop(): @@ -397,49 +436,6 @@ def get_open_loop(): return loop -def create_pyscript(script_path, checksum, rerun=False): - """ - Create standalone script for task execution in a different environment. - - Parameters - ---------- - script_path : :obj:`os.pathlike` - Path to the script. - checksum : str - Task checksum. - - Returns - ------- - pyscript : :obj:`File` - Execution script - - """ - task_pkl = script_path / "_task.pklz" - if not task_pkl.exists() or not task_pkl.stat().st_size: - raise Exception("Missing or empty task!") - - content = f"""import cloudpickle as cp -from pathlib import Path - - -cache_path = Path("{str(script_path)}") -task_pkl = (cache_path / "_task.pklz") -task = cp.loads(task_pkl.read_bytes()) - -# submit task -task(rerun={rerun}) - -if not task.result(): - raise Exception("Something went wrong") -print("Completed", task.checksum, task) -task_pkl.unlink() -""" - pyscript = script_path / f"pyscript_{checksum}.py" - with pyscript.open("wt") as fp: - fp.writelines(content) - return pyscript - - def hash_function(obj): """Generate hash of object.""" return sha256(str(obj).encode()).hexdigest() @@ -544,3 +540,61 @@ def get_available_cpus(): # Last resort return os.cpu_count() + + +def load_and_run( + task_pkl, ind=None, rerun=False, submitter=None, plugin=None, **kwargs +): + """ + loading a task from a pickle file, settings proper input + and running the task + """ + try: + task = load_task(task_pkl=task_pkl, ind=ind) + except Exception as excinfo: + if task_pkl.parent.exists(): + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + errorfile = record_error(task_pkl.parent, error=traceback) + result = Result(output=None, runtime=None, errored=True) + save(task_pkl.parent, result=result) + raise + + resultfile = task.output_dir / "_result.pklz" + try: + task(rerun=rerun, plugin=plugin, submitter=submitter, **kwargs) + except Exception as excinfo: + # creating result and error files if missing + errorfile = task.output_dir / "_error.pklz" + if not resultfile.exists(): + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + errorfile = record_error(task.output_dir, error=traceback) + result = Result(output=None, runtime=None, errored=True) + save(task.output_dir, result=result) + raise type(excinfo)( + str(excinfo.with_traceback(None)), + f" full crash report is here: {errorfile}", + ) + return resultfile + + +async def load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs): + """ + loading a task from a pickle file, settings proper input + and running the workflow + """ + task = load_task(task_pkl=task_pkl, ind=ind) + await task._run(submitter=submitter, rerun=rerun, **kwargs) + + +def load_task(task_pkl, ind=None): + """ loading a task from a pickle file, settings proper input for the specific ind""" + if isinstance(task_pkl, str): + task_pkl = Path(task_pkl) + task = cp.loads(task_pkl.read_bytes()) + if ind is not None: + _, inputs_dict = task.get_input_el(ind) + task.inputs = attr.evolve(task.inputs, **inputs_dict) + task.state = None + return task diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index a397a86ad1..059e83081f 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -2,7 +2,7 @@ import asyncio from .workers import SerialWorker, ConcurrentFuturesWorker, SlurmWorker, DaskWorker from .core import is_workflow -from .helpers import get_open_loop +from .helpers import get_open_loop, load_and_run_async import logging @@ -64,11 +64,21 @@ def __call__(self, runnable, cache_locations=None, rerun=False): async def submit_workflow(self, workflow, rerun=False): """Distribute or initiate workflow execution.""" - if workflow.plugin and workflow.plugin != self.plugin: - # dj: this is not tested!!! - await self.worker.run_el(workflow, rerun=rerun) - else: - await workflow._run(self, rerun=rerun) + if is_workflow(workflow): + if workflow.plugin and workflow.plugin != self.plugin: + # dj: this is not tested!!! TODO + await self.worker.run_el(workflow, rerun=rerun) + else: + await workflow._run(self, rerun=rerun) + else: # could be a tuple with paths to pickle files wiith tasks and inputs + ind, wf_main_pkl, wf_orig = workflow + if wf_orig.plugin and wf_orig.plugin != self.plugin: + # dj: this is not tested!!! TODO + await self.worker.run_el(workflow, rerun=rerun) + else: + await load_and_run_async( + task_pkl=wf_main_pkl, ind=ind, submitter=self, rerun=rerun + ) async def submit(self, runnable, wait=False, rerun=False): """ @@ -100,17 +110,16 @@ async def submit(self, runnable, wait=False, rerun=False): logger.debug( f"Expanding {runnable} into {len(runnable.state.states_val)} states" ) + task_pkl = runnable.pickle_task() + for sidx in range(len(runnable.state.states_val)): - job = runnable.to_job(sidx) - logger.debug( - f'Submitting runnable {job}{str(sidx) if sidx is not None else ""}' - ) + job_tuple = (sidx, task_pkl, runnable) if is_workflow(runnable): # job has no state anymore - futures.add(self.submit_workflow(job, rerun=rerun)) + futures.add(self.submit_workflow(job_tuple, rerun=rerun)) else: # tasks are submitted to worker for execution - futures.add(self.worker.run_el(job, rerun=rerun)) + futures.add(self.worker.run_el(job_tuple, rerun=rerun)) else: if is_workflow(runnable): await self._run_workflow(runnable, rerun=rerun) diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 9c4e2f1ead..45b3c08e87 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -7,16 +7,11 @@ import pytest import cloudpickle as cp -from .utils import multiply -from ..helpers import ( - hash_value, - hash_function, - get_available_cpus, - save, - create_pyscript, -) +from .utils import multiply, raise_xeq1 +from ..helpers import hash_value, hash_function, get_available_cpus, save, load_and_run from .. import helpers_file from ..specs import File, Directory +from ..core import Workflow def test_save(tmpdir): @@ -43,16 +38,6 @@ def test_save(tmpdir): assert res.output.out == 2 -def test_create_pyscript(tmpdir): - outdir = Path(tmpdir) - with pytest.raises(Exception): - create_pyscript(outdir, "checksum") - foo = multiply(name="mult", x=1, y=2) - save(outdir, task=foo) - pyscript = create_pyscript(outdir, foo.checksum) - assert pyscript.exists() - - def test_hash_file(tmpdir): outdir = Path(tmpdir) with open(outdir / "test.file", "wt") as fp: @@ -212,3 +197,90 @@ def test_get_available_cpus(): if platform.system().lower() == "darwin": assert get_available_cpus() == os.cpu_count() + + +def test_load_and_run(tmpdir): + """ testing load_and_run for pickled task""" + task_pkl = Path(tmpdir.join("task_main.pkl")) + + task = multiply(name="mult", x=[1, 2], y=10).split("x") + task.state.prepare_states(inputs=task.inputs) + task.state.prepare_inputs() + with task_pkl.open("wb") as fp: + cp.dump(task, fp) + + resultfile_0 = load_and_run(task_pkl=task_pkl, ind=0) + resultfile_1 = load_and_run(task_pkl=task_pkl, ind=1) + # checking the result files + result_0 = cp.loads(resultfile_0.read_bytes()) + result_1 = cp.loads(resultfile_1.read_bytes()) + assert result_0.output.out == 10 + assert result_1.output.out == 20 + + +def test_load_and_run_exception_load(tmpdir): + """ testing raising exception and saving info in crashfile when when load_and_run""" + task_pkl = Path(tmpdir.join("task_main.pkl")) + task = raise_xeq1(name="raise", x=[1, 2]).split("x") + with pytest.raises(FileNotFoundError) as excinfo: + task_0 = load_and_run(task_pkl=task_pkl, ind=0) + + +def test_load_and_run_exception_run(tmpdir): + """ testing raising exception and saving info in crashfile when when load_and_run""" + task_pkl = Path(tmpdir.join("task_main.pkl")) + + task = raise_xeq1(name="raise", x=[1, 2]).split("x") + task.state.prepare_states(inputs=task.inputs) + task.state.prepare_inputs() + + with task_pkl.open("wb") as fp: + cp.dump(task, fp) + + with pytest.raises(Exception) as excinfo: + task_0 = load_and_run(task_pkl=task_pkl, ind=0) + assert "i'm raising an exception!" in str(excinfo.value) + # checking if the crashfile has been created + assert "crash" in str(excinfo.value) + errorfile = Path(str(excinfo.value).split("here: ")[1][:-2]) + assert errorfile.exists() + + resultfile = errorfile.parent / "_result.pklz" + assert resultfile.exists() + # checking the content + result_exception = cp.loads(resultfile.read_bytes()) + assert result_exception.errored is True + + # the second task should be fine + resultfile = load_and_run(task_pkl=task_pkl, ind=1) + result_1 = cp.loads(resultfile.read_bytes()) + assert result_1.output.out == 2 + + +def test_load_and_run_wf(tmpdir): + """ testing load_and_run for pickled task""" + wf_pkl = Path(tmpdir.join("wf_main.pkl")) + + wf = Workflow(name="wf", input_spec=["x", "y"]) + wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) + wf.split(("x")) + wf.inputs.x = [1, 2] + wf.inputs.y = 10 + + wf.set_output([("out", wf.mult.lzout.out)]) + + # task = multiply(name="mult", x=[1, 2], y=10).split("x") + wf.state.prepare_states(inputs=wf.inputs) + wf.state.prepare_inputs() + wf.plugin = "cf" + + with wf_pkl.open("wb") as fp: + cp.dump(wf, fp) + + resultfile_0 = load_and_run(ind=0, task_pkl=wf_pkl) + resultfile_1 = load_and_run(ind=1, task_pkl=wf_pkl) + # checking the result files + result_0 = cp.loads(resultfile_0.read_bytes()) + result_1 = cp.loads(resultfile_1.read_bytes()) + assert result_0.output.out == 10 + assert result_1.output.out == 20 diff --git a/pydra/engine/tests/test_node_task.py b/pydra/engine/tests/test_node_task.py index 0bcb49048c..02afde89f7 100644 --- a/pydra/engine/tests/test_node_task.py +++ b/pydra/engine/tests/test_node_task.py @@ -378,6 +378,17 @@ def test_task_nostate_1(plugin_dask_opt): assert nn.output_dir.exists() +def test_task_nostate_1_call(): + """ task without splitter""" + nn = fun_addtwo(name="NA", a=3) + nn() + # checking the results + results = nn.result() + assert results.output.out == 5 + # checking the output_dir + assert nn.output_dir.exists() + + @pytest.mark.flaky(reruns=2) # when dask def test_task_nostate_1_call_subm(plugin_dask_opt): """ task without splitter""" diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index eaa21f9952..20bfd13850 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -5,13 +5,7 @@ import pytest from ... import mark -from ..task import ( - AuditFlag, - ShellCommandTask, - ContainerTask, - DockerTask, - SingularityTask, -) +from ..task import AuditFlag, ShellCommandTask, DockerTask, SingularityTask from ...utils.messenger import FileMessenger, PrintMessenger, collect_messages from .utils import gen_basic_wf diff --git a/pydra/engine/tests/utils.py b/pydra/engine/tests/utils.py index 56558711ed..46c043749d 100644 --- a/pydra/engine/tests/utils.py +++ b/pydra/engine/tests/utils.py @@ -102,6 +102,13 @@ def add2(x): return x + 2 +@mark.task +def raise_xeq1(x): + if x == 1: + raise Exception("x is 1, so i'm raising an exception!") + return x + + @mark.task @mark.annotate({"return": {"out_add": float, "out_sub": float}}) def add2_sub2_res(res): diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index f400f74a94..e4fa5ac231 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -4,10 +4,12 @@ import re from tempfile import gettempdir from pathlib import Path +from shutil import copyfile import concurrent.futures as cf -from .helpers import create_pyscript, get_available_cpus, read_and_display_async, save +from .core import TaskBase +from .helpers import get_available_cpus, read_and_display_async, save, load_and_run import logging @@ -67,24 +69,41 @@ def __init__(self, loop=None, max_jobs=None): self._jobs = 0 def _prepare_runscripts(self, task, interpreter="/bin/sh", rerun=False): - script_dir = ( - task.cache_dir / f"{self.__class__.__name__}_scripts" / task.checksum - ) + + if isinstance(task, TaskBase): + checksum = task.checksum + cache_dir = task.cache_dir + ind = None + else: + ind = task[0] + checksum = task[-1].checksum_states()[ind] + cache_dir = task[-1].cache_dir + + script_dir = cache_dir / f"{self.__class__.__name__}_scripts" / checksum script_dir.mkdir(parents=True, exist_ok=True) - if not (script_dir / "_task.pkl").exists(): - save(script_dir, task=task) - pyscript = create_pyscript(script_dir, task.checksum, rerun=rerun) - batchscript = script_dir / f"batchscript_{task.checksum}.sh" + if ind is None: + if not (script_dir / "_task.pkl").exists(): + save(script_dir, task=task) + else: + copyfile(task[1], script_dir / "_task.pklz") + + task_pkl = script_dir / "_task.pklz" + if not task_pkl.exists() or not task_pkl.stat().st_size: + raise Exception("Missing or empty task!") + + batchscript = script_dir / f"batchscript_{checksum}.sh" + python_string = f"""'from pydra.engine.helpers import load_and_run; load_and_run(task_pkl="{str(task_pkl)}", ind={ind}, rerun={rerun}) ' + """ bcmd = "\n".join( ( f"#!{interpreter}", f"#SBATCH --output={str(script_dir / 'slurm-%j.out')}", - f"{sys.executable} {str(pyscript)}", + f"{sys.executable} -c " + python_string, ) ) with batchscript.open("wt") as fp: fp.writelines(bcmd) - return script_dir, pyscript, batchscript + return script_dir, batchscript async def fetch_finished(self, futures): """ @@ -177,7 +196,13 @@ def run_el(self, runnable, rerun=False, **kwargs): async def exec_as_coro(self, runnable, rerun=False): """Run a task (coroutine wrapper).""" - res = await self.loop.run_in_executor(self.pool, runnable._run, rerun) + if isinstance(runnable, TaskBase): + res = await self.loop.run_in_executor(self.pool, runnable._run, rerun) + else: # it could be tuple that includes pickle files with tasks and inputs + ind, task_main_pkl, task_orig = runnable + res = await self.loop.run_in_executor( + self.pool, load_and_run, task_main_pkl, ind, rerun + ) return res def close(self): @@ -218,20 +243,30 @@ def __init__( def run_el(self, runnable, rerun=False): """Worker submission API.""" - script_dir, _, batch_script = self._prepare_runscripts(runnable, rerun=rerun) + script_dir, batch_script = self._prepare_runscripts(runnable, rerun=rerun) if (script_dir / script_dir.parts[1]) == gettempdir(): logger.warning("Temporary directories may not be shared across computers") - return self._submit_job(runnable, batch_script) - async def _submit_job(self, task, batchscript): - """Coroutine that submits task runscript and polls job until completion or error.""" - script_dir = ( - task.cache_dir / f"{self.__class__.__name__}_scripts" / task.checksum + if isinstance(runnable, TaskBase): + checksum = runnable.checksum + cache_dir = runnable.cache_dir + name = runnable.name + else: + checksum = runnable[-1].checksum_states()[runnable[0]] + cache_dir = runnable[-1].cache_dir + name = runnable[-1].name + + return self._submit_job( + batch_script, name=name, checksum=checksum, cache_dir=cache_dir ) + + async def _submit_job(self, batchscript, name, checksum, cache_dir): + """Coroutine that submits task runscript and polls job until completion or error.""" + script_dir = cache_dir / f"{self.__class__.__name__}_scripts" / checksum sargs = self.sbatch_args.split() jobname = re.search(r"(?<=-J )\S+|(?<=--job-name=)\S+", self.sbatch_args) if not jobname: - jobname = ".".join((task.name, task.checksum)) + jobname = ".".join((name, checksum)) sargs.append(f"--job-name={jobname}") output = re.search(r"(?<=-o )\S+|(?<=--output=)\S+", self.sbatch_args) if not output: @@ -261,7 +296,7 @@ async def _submit_job(self, task, batchscript): # Exception: Polling / job failure done = await self._poll_job(jobid) if done: - return task + return True await asyncio.sleep(self.poll_delay) async def _poll_job(self, jobid):