diff --git a/aiida/backends/tests/calculation_node.py b/aiida/backends/tests/calculation_node.py index 590d657f28..71708117e4 100644 --- a/aiida/backends/tests/calculation_node.py +++ b/aiida/backends/tests/calculation_node.py @@ -54,13 +54,13 @@ def test_calculation_updatable_not_copied(self): Check that updatable attributes of Calculation are not copied """ a = Calculation() - a._set_attr('state', self.stateval) + a._set_attr(Calculation.PROCESS_STATE_KEY, self.stateval) a.store() b = a.copy() # updatable attributes are not copied with self.assertRaises(AttributeError): - b.get_attr('state') + b.get_attr(Calculation.PROCESS_STATE_KEY) def test_calculation_updatable_attribute(self): """ @@ -81,16 +81,17 @@ def test_calculation_updatable_attribute(self): a._set_attr(k, v) # Check before storing - self.assertEquals(a.get_attr('state'), self.stateval) + a._set_attr(Calculation.PROCESS_STATE_KEY, self.stateval) + self.assertEquals(a.get_attr(Calculation.PROCESS_STATE_KEY), self.stateval) a.store() # Check after storing - self.assertEquals(a.get_attr('state'), self.stateval) + self.assertEquals(a.get_attr(Calculation.PROCESS_STATE_KEY), self.stateval) # I should be able to mutate the updatable attribute but not the others - a._set_attr('state', 'FINISHED') - a._del_attr('state') + a._set_attr(Calculation.PROCESS_STATE_KEY, 'FINISHED') + a._del_attr(Calculation.PROCESS_STATE_KEY) with self.assertRaises(ModificationNotAllowed): a._set_attr('bool', False) @@ -102,7 +103,7 @@ def test_calculation_updatable_attribute(self): # After sealing, even updatable attributes should be immutable with self.assertRaises(ModificationNotAllowed): - a._set_attr('state', 'FINISHED') + a._set_attr(Calculation.PROCESS_STATE_KEY, 'FINISHED') with self.assertRaises(ModificationNotAllowed): - a._del_attr('state') \ No newline at end of file + a._del_attr(Calculation.PROCESS_STATE_KEY) \ No newline at end of file diff --git a/aiida/backends/tests/inline_calculation.py b/aiida/backends/tests/inline_calculation.py index 8a33afe9dc..bad0b810ac 100644 --- a/aiida/backends/tests/inline_calculation.py +++ b/aiida/backends/tests/inline_calculation.py @@ -40,6 +40,17 @@ def test_inline_calculation_process_state(self): self.assertEquals(calculation.is_finished_ok, True) self.assertEquals(calculation.is_failed, False) + def test_finish_status(self): + """ + If an InlineCalculation reaches the FINISHED process state, it has to have been successful + which means that the finish status always has to be 0 + """ + calculation, result = self.incr_inline(inp=Int(11)) + self.assertEquals(calculation.is_finished, True) + self.assertEquals(calculation.is_finished_ok, True) + self.assertEquals(calculation.is_failed, False) + self.assertEquals(calculation.finish_status, 0) + def test_incr(self): """ Simple test for the inline increment function. diff --git a/aiida/backends/tests/work/persistence.py b/aiida/backends/tests/work/persistence.py index 6e2d88dcf9..468abfa6e1 100644 --- a/aiida/backends/tests/work/persistence.py +++ b/aiida/backends/tests/work/persistence.py @@ -11,7 +11,7 @@ import tempfile from aiida.backends.testbase import AiidaTestCase -from aiida.work.persistence import Persistence +from aiida.work.persistence import Persistence, AiiDAPersister import aiida.work.utils as util from aiida.work.test_utils import DummyProcess from aiida import work @@ -38,26 +38,26 @@ def test_save_load(self): self.assertEqual(loaded_process.state, work.ProcessState.FINISHED) -# class TestProcess(AiidaTestCase): -# def setUp(self): -# super(TestProcess, self).setUp() -# self.assertEquals(len(util.ProcessStack.stack()), 0) -# -# self.persistence = Persistence(running_directory=tempfile.mkdtemp()) -# -# def tearDown(self): -# super(TestProcess, self).tearDown() -# self.assertEquals(len(util.ProcessStack.stack()), 0) -# -# def test_save_load(self): -# dp = DummyProcess() -# -# # Create a bundle -# b = self.persistence.create_bundle(dp) -# # Save a bundle and reload it -# self.persistence.save(dp) -# b2 = self.persistence._load_checkpoint(dp.pid) -# # Now check that they are equal -# self.assertEqual(b, b2) -# -# work.run(dp) + +class TestAiiDAPersister(AiidaTestCase): + + def setUp(self): + super(TestAiiDAPersister, self).setUp() + self.persister = AiiDAPersister() + + def test_save_load_checkpoint(self): + process = DummyProcess() + bundle_saved = self.persister.save_checkpoint(process) + bundle_loaded = self.persister.load_checkpoint(process.calc.pk) + + self.assertEquals(bundle_saved, bundle_loaded) + + def test_delete_checkpoint(self): + process = DummyProcess() + self.assertEquals(process.calc.checkpoint, None) + + self.persister.save_checkpoint(process) + self.assertTrue(isinstance(process.calc.checkpoint, basestring)) + + self.persister.delete_checkpoint(process.pid) + self.assertEquals(process.calc.checkpoint, None) \ No newline at end of file diff --git a/aiida/backends/tests/work/test_workfunctions.py b/aiida/backends/tests/work/test_workfunctions.py index b6e9e59196..6da345ad16 100644 --- a/aiida/backends/tests/work/test_workfunctions.py +++ b/aiida/backends/tests/work/test_workfunctions.py @@ -73,6 +73,16 @@ def add_mul_wf(a, b, c): result = add_mul_wf(Int(3), Int(4), Int(5)) + def test_finish_status(self): + """ + If a workfunction reaches the FINISHED process state, it has to have been successful + which means that the finish status always has to be 0 + """ + result, calculation = single_return_value.run_get_node() + self.assertEquals(calculation.finish_status, 0) + self.assertEquals(calculation.is_finished_ok, True) + self.assertEquals(calculation.is_failed, False) + def test_hashes(self): result, w1 = run_get_node(return_input, inp=Int(2)) result, w2 = run_get_node(return_input, inp=Int(2)) @@ -85,4 +95,4 @@ def test_hashes_different(self): def _check_hash_consistent(self, pid): wc = load_node(pid) - self.assertEqual(wc.get_hash(), wc.get_extra('_aiida_hash')) + self.assertEqual(wc.get_hash(), wc.get_extra('_aiida_hash')) \ No newline at end of file diff --git a/aiida/backends/tests/work/work_chain.py b/aiida/backends/tests/work/work_chain.py index 6c2dba2531..0b8bbf7096 100644 --- a/aiida/backends/tests/work/work_chain.py +++ b/aiida/backends/tests/work/work_chain.py @@ -25,6 +25,20 @@ from . import utils +def run_and_check_success(process_class, **kwargs): + """ + Instantiates the process class and executes it followed by a check + that it is finished successfully + + :returns: instance of process + """ + process = process_class(inputs=kwargs) + process.execute() + assert process.calc.is_finished_ok == True + + return process + + class Wf(work.WorkChain): # Keep track of which steps were completed by the workflow finished_steps = {} @@ -98,6 +112,44 @@ def _set_finished(self, function_name): self.finished_steps[function_name] = True +class ReturnWorkChain(WorkChain): + + FAILURE_STATUS = 1 + + @classmethod + def define(cls, spec): + super(ReturnWorkChain, cls).define(spec) + spec.input('success', valid_type=Bool) + spec.outline( + cls.failure, + cls.success + ) + + def failure(self): + if self.inputs.success.value is False: + return self.FAILURE_STATUS + + def success(self): + return + + +class TestFinishStatus(AiidaTestCase): + + def test_failing_workchain(self): + result, node = work.launch.run_get_node(ReturnWorkChain, success=Bool(False)) + self.assertEquals(node.finish_status, ReturnWorkChain.FAILURE_STATUS) + self.assertEquals(node.is_finished, True) + self.assertEquals(node.is_finished_ok, False) + self.assertEquals(node.is_failed, True) + + def test_successful_workchain(self): + result, node = work.launch.run_get_node(ReturnWorkChain, success=Bool(True)) + self.assertEquals(node.finish_status, 0) + self.assertEquals(node.is_finished, True) + self.assertEquals(node.is_finished_ok, True) + self.assertEquals(node.is_failed, False) + + class IfTest(work.WorkChain): @classmethod def define(cls, spec): @@ -146,6 +198,7 @@ def test_dict(self): class TestWorkchain(AiidaTestCase): + def setUp(self): super(TestWorkchain, self).setUp() self.assertEquals(len(ProcessStack.stack()), 0) @@ -214,7 +267,7 @@ def check_a_b(self): assert 'b' in self.inputs x = Int(1) - work.launch.run(Wf, a=x, b=x) + run_and_check_success(Wf, a=x, b=x) def test_context(self): A = Str("a") @@ -223,12 +276,12 @@ def test_context(self): class ReturnA(work.Process): def _run(self): self.out('res', A) - return self.outputs + return class ReturnB(work.Process): def _run(self): self.out('res', B) - return self.outputs + return class Wf(WorkChain): @classmethod @@ -252,7 +305,7 @@ def s3(self): assert self.ctx.r1['res'] == B assert self.ctx.r2['res'] == B - work.launch.run(Wf) + run_and_check_success(Wf) def test_str(self): self.assertIsInstance(str(Wf.spec()), basestring) @@ -325,42 +378,43 @@ def isA(self): def after(self): raise RuntimeError("Shouldn't get here") - work.launch.run(WcWithReturn) + run_and_check_success(WcWithReturn) def test_tocontext_submit_workchain_no_daemon(self): class MainWorkChain(WorkChain): @classmethod def define(cls, spec): super(MainWorkChain, cls).define(spec) - spec.outline(cls.run, cls.check) + spec.outline(cls.do_run, cls.check) spec.outputs.dynamic = True - def run(self): + def do_run(self): return ToContext(subwc=self.submit(SubWorkChain)) def check(self): + pass assert self.ctx.subwc.out.value == Int(5) class SubWorkChain(WorkChain): @classmethod def define(cls, spec): super(SubWorkChain, cls).define(spec) - spec.outline(cls.run) + spec.outline(cls.do_run) - def run(self): + def do_run(self): self.out("value", Int(5)) - work.launch.run(MainWorkChain) + run_and_check_success(MainWorkChain) def test_tocontext_schedule_workchain(self): class MainWorkChain(WorkChain): @classmethod def define(cls, spec): super(MainWorkChain, cls).define(spec) - spec.outline(cls.run, cls.check) + spec.outline(cls.do_run, cls.check) spec.outputs.dynamic = True - def run(self): + def do_run(self): return ToContext(subwc=self.submit(SubWorkChain)) def check(self): @@ -370,12 +424,12 @@ class SubWorkChain(WorkChain): @classmethod def define(cls, spec): super(SubWorkChain, cls).define(spec) - spec.outline(cls.run) + spec.outline(cls.do_run) - def run(self): - self.out("value", Int(5)) + def do_run(self): + self.out('value', Int(5)) - work.launch.run(MainWorkChain) + run_and_check_success(MainWorkChain) # @unittest.skip('This is currently broken after merge') def test_if_block_persistence(self): @@ -424,7 +478,7 @@ def check(self): logs = self._backend.log.find() assert len(logs) == 1 - work.launch.run(TestWorkChain) + run_and_check_success(TestWorkChain) def test_to_context(self): val = Int(5) @@ -432,7 +486,7 @@ def test_to_context(self): class SimpleWc(work.Process): def _run(self): self.out('_return', val) - return self.outputs + return class Workchain(WorkChain): @classmethod @@ -449,7 +503,7 @@ def result(self): assert self.ctx.result_b['_return'] == val return - work.launch.run(Workchain) + run_and_check_success(Workchain) def test_persisting(self): persister = plumpy.test_utils.TestPersister() @@ -458,9 +512,10 @@ def test_persisting(self): workchain.execute() def _run_with_checkpoints(self, wf_class, inputs=None): - proc = wf_class(inputs=inputs) - work.launch.run(proc) - return wf_class.finished_steps + if inputs is None: + inputs = {} + proc = run_and_check_success(wf_class, **inputs) + return proc.finished_steps class TestWorkchainWithOldWorkflows(AiidaTestCase): @@ -494,7 +549,7 @@ def begin(self): def check(self): assert self.ctx.wf is not None - work.launch.run(_TestWf) + run_and_check_success(_TestWf) def test_old_wf_results(self): wf = WorkflowDemo() @@ -514,7 +569,7 @@ def begin(self): def check(self): assert set(self.ctx.res) == set(wf.get_results()) - work.launch.run(_TestWf) + run_and_check_success(_TestWf) class TestWorkChainAbort(AiidaTestCase): @@ -735,7 +790,7 @@ def step_two(self): test_class.assertNotIn('c', self.inputs) test_class.assertEquals(self.inputs['a'].value, 1) - work.launch.run(Wf, a=Int(1), b=Int(2)) + run_and_check_success(Wf, a=Int(1), b=Int(2)) def test_immutable_input_groups(self): """ @@ -771,7 +826,7 @@ def step_two(self): x = Int(1) y = Int(2) - work.launch.run(Wf, subspace={'one': Int(1), 'two': Int(2)}) + run_and_check_success(Wf, subspace={'one': Int(1), 'two': Int(2)}) class GrandParentExposeWorkChain(work.WorkChain): @classmethod diff --git a/aiida/daemon/execmanager.py b/aiida/daemon/execmanager.py index 68544c056d..10fc4faefe 100644 --- a/aiida/daemon/execmanager.py +++ b/aiida/daemon/execmanager.py @@ -801,22 +801,36 @@ def retrieve_all(job, transport, retrieved_temporary_folder, logger_extra=None): def parse_results(job, retrieved_temporary_folder=None, logger_extra=None): + """ + Parse the results for a given JobCalculation (job) + + :returns: integer exit code, where 0 indicates success and non-zero failure + """ + from aiida.orm.calculation.job import JobCalculationFinishStatus + job._set_state(calc_states.PARSING) Parser = job.get_parserclass() - # If no parser is set, the calculation is successful - successful = True if Parser is not None: + parser = Parser(job) - successful, new_nodes_tuple = parser.parse_from_calc(retrieved_temporary_folder) + exit_code, new_nodes_tuple = parser.parse_from_calc(retrieved_temporary_folder) + + # Some implementations of parse_from_calc may still return a boolean for the exit_code + # If we get True we convert to 0, for false we simply use the generic value that + # maps to the calculation state FAILED + if isinstance(exit_code, bool) and exit_code is True: + exit_code = 0 + elif isinstance(exit_code, bool) and exit_code is False: + exit_code = JobCalculationFinishStatus[calc_states.FAILED] for label, n in new_nodes_tuple: n.add_link_from(job, label=label, link_type=LinkType.CREATE) n.store() try: - if successful: + if exit_code == 0: job._set_state(calc_states.FINISHED) else: job._set_state(calc_states.FAILED) @@ -825,14 +839,14 @@ def parse_results(job, retrieved_temporary_folder=None, logger_extra=None): # in order to avoid useless error messages, I just ignore pass - if not successful: + if exit_code is not 0: execlogger.error("[parsing of calc {}] " "The parser returned an error, but it should have " "created an output node with some partial results " "and warnings. Check there for more information on " "the problem".format(job.pk), extra=logger_extra) - return successful + return exit_code def _update_job_calc(calc, scheduler, job_info): diff --git a/aiida/orm/calculation/job/__init__.py b/aiida/orm/calculation/job/__init__.py index 379b5ae3d9..dcfc5d5721 100644 --- a/aiida/orm/calculation/job/__init__.py +++ b/aiida/orm/calculation/job/__init__.py @@ -8,4 +8,4 @@ # For further information please visit http://www.aiida.net # ########################################################################### from aiida.orm.calculation import Calculation -from aiida.orm.implementation.calculation import JobCalculation, _input_subfolder +from aiida.orm.implementation.calculation import JobCalculation, _input_subfolder, JobCalculationFinishStatus diff --git a/aiida/orm/implementation/calculation.py b/aiida/orm/implementation/calculation.py index 74c828d9da..9a2d74f108 100644 --- a/aiida/orm/implementation/calculation.py +++ b/aiida/orm/implementation/calculation.py @@ -16,6 +16,7 @@ from aiida.common.old_pluginloader import from_type_to_pluginclassname from aiida.orm.implementation.general.calculation.job import _input_subfolder +from aiida.orm.implementation.general.calculation.job import JobCalculationFinishStatus if BACKEND == BACKEND_SQLA: diff --git a/aiida/orm/implementation/django/calculation/job/__init__.py b/aiida/orm/implementation/django/calculation/job/__init__.py index 721b3b4e70..1b3f1246b4 100644 --- a/aiida/orm/implementation/django/calculation/job/__init__.py +++ b/aiida/orm/implementation/django/calculation/job/__init__.py @@ -42,6 +42,7 @@ def _set_state(self, state): from ``aiida.common.datastructures.calc_states``. :raise: ModificationNotAllowed if the given state was already set. """ + super(JobCalculation, self)._set_state(state) from aiida.common.datastructures import sort_states from aiida.backends.djsite.db.models import DbCalcState diff --git a/aiida/orm/implementation/general/calculation/__init__.py b/aiida/orm/implementation/general/calculation/__init__.py index 5302d19a85..1725bdcaee 100644 --- a/aiida/orm/implementation/general/calculation/__init__.py +++ b/aiida/orm/implementation/general/calculation/__init__.py @@ -9,68 +9,15 @@ ########################################################################### import collections +import enum +import logging from plumpy import ProcessState -from aiida.common.utils import classproperty from aiida.common.links import LinkType +from aiida.common.log import get_dblogger_extra +from aiida.common.utils import classproperty from aiida.orm.mixins import Sealable -def _parse_single_arg(function_name, additional_parameter, - args, kwargs): - """ - Verifies that a single additional argument has been given (or no - additional argument, if additional_parameter is None). Also - verifies its name. - - :param function_name: the name of the caller function, used for - the output messages - :param additional_parameter: None if no additional parameters - should be passed, or a string with the name of the parameter - if one additional parameter should be passed. - - :return: None, if additional_parameter is None, or the value of - the additional parameter - :raise TypeError: on wrong number of inputs - """ - # Here all the logic to check if the parameters are correct. - if additional_parameter is not None: - if len(args) == 1: - if kwargs: - raise TypeError("{}() received too many args".format( - function_name)) - additional_parameter_data = args[0] - elif len(args) == 0: - kwargs_copy = kwargs.copy() - try: - additional_parameter_data = kwargs_copy.pop( - additional_parameter) - except KeyError: - if kwargs_copy: - raise TypeError("{}() got an unexpected keyword " - "argument '{}'".format( - function_name, kwargs_copy.keys()[0])) - else: - raise TypeError("{}() requires more " - "arguments".format(function_name)) - if kwargs_copy: - raise TypeError("{}() got an unexpected keyword " - "argument '{}'".format( - function_name, kwargs_copy.keys()[0])) - else: - raise TypeError("{}() received too many args".format( - function_name)) - return additional_parameter_data - else: - if kwargs: - raise TypeError("{}() got an unexpected keyword " - "argument '{}'".format( - function_name, kwargs.keys()[0])) - if len(args) != 0: - raise TypeError("{}() received too many args".format( - function_name)) - - return None - class AbstractCalculation(Sealable): """ @@ -81,24 +28,22 @@ class AbstractCalculation(Sealable): calculations run via a scheduler. """ - CHECKPOINT_KEY = 'checkpoints' PROCESS_STATE_KEY = 'process_state' - - _cacheable = False - - _updatable_attributes = Sealable._updatable_attributes + ('state', PROCESS_STATE_KEY, CHECKPOINT_KEY) + FINISH_STATUS_KEY = 'finish_status' + CHECKPOINT_KEY = 'checkpoints' # The link_type might not be correct while the object is being created. _hash_ignored_inputs = ['CALL'] + _cacheable = False @classproperty - def _hash_ignored_attributes(cls): - return super(AbstractCalculation, cls)._hash_ignored_attributes + [ - '_sealed', - '_finished', - ] + def _updatable_attributes(cls): + return super(AbstractCalculation, cls)._updatable_attributes + ( + cls.PROCESS_STATE_KEY, + cls.FINISH_STATUS_KEY, + cls.CHECKPOINT_KEY, + ) - # Nodes that can be added as input using the use_* methods @classproperty def _use_methods(cls): """ @@ -135,32 +80,23 @@ def _use_methods(cls): @property def logger(self): """ - Get the logger of the Calculation object, so that it also logs to the - DB. + Get the logger of the Calculation object, so that it also logs to the DB. - :return: LoggerAdapter object, that works like a logger, but also has - the 'extra' embedded + :return: LoggerAdapter object, that works like a logger, but also has the 'extra' embedded """ - import logging - from aiida.common.log import get_dblogger_extra - - return logging.LoggerAdapter( - logger=self._logger, extra=get_dblogger_extra(self)) + return logging.LoggerAdapter(logger=self._logger, extra=get_dblogger_extra(self)) def __dir__(self): """ Allow to list all valid attributes, adding also the use_* methods """ - return sorted(dir(type(self)) + list(['use_{}'.format(k) - for k in - self._use_methods.iterkeys()])) + return sorted(dir(type(self)) + list(['use_{}'.format(k) for k in self._use_methods.iterkeys()])) def __getattr__(self, name): """ - Expand the methods with the use_* calls. Note that this method only - gets called if 'name' is not already defined as a method. Returning - None will then automatically raise the standard AttributeError - exception. + Expand the methods with the use_* calls. Note that this method only gets called if 'name' + is not already defined as a method. Returning one will then automatically raise the + standard AttributeError exception. """ if name == '_use_methods': raise AttributeError("'{0}' object has no attribute '{1}'".format(type(self), name)) @@ -177,6 +113,7 @@ def __init__(self, node, actual_name, data): self.node = node self.actual_name = actual_name self.data = data + try: self.__doc__ = data['docstring'] except KeyError: @@ -184,9 +121,8 @@ def __init__(self, node, actual_name, data): pass def __call__(self, parent_node, *args, **kwargs): - # Not really needed, will be checked in get_linkname - # But I do anyway in order to raise an exception as soon as - # possible, with the most intuitive caller function name + # Not really needed, will be checked in get_linkname but I do anyway in order to raise + # an exception as soon as possible, with the most intuitive caller function name additional_parameter = _parse_single_arg( function_name='use_{}'.format(self.actual_name), additional_parameter=self.data['additional_parameter'], @@ -194,25 +130,18 @@ def __call__(self, parent_node, *args, **kwargs): # Type check if not isinstance(parent_node, self.data['valid_types']): - if isinstance(self.data['valid_types'], - collections.Iterable): - valid_types_string = ",".join([_.__name__ for _ in - self.data[ - 'valid_types']]) + if isinstance(self.data['valid_types'], collections.Iterable): + valid_types_string = ','.join([_.__name__ for _ in self.data['valid_types']]) else: valid_types_string = self.data['valid_types'].__name__ - raise TypeError("The given node is not of the valid type " - "for use_{}. Valid types are: {}, while " - "you provided {}".format( - self.actual_name, valid_types_string, - parent_node.__class__.__name__)) + raise TypeError( + 'The given node is not of the valid type for use_{}.' + 'Valid types are: {}, while you provided {}'.format( + self.actual_name, valid_types_string, parent_node.__class__.__name__)) # Get actual link name - actual_linkname = self.node.get_linkname(actual_name, *args, - **kwargs) - # Checks that such an argument exists have already been - # made inside actual_linkname + actual_linkname = self.node.get_linkname(actual_name, *args, **kwargs) # Here I do the real job self.node._replace_link_from(parent_node, actual_linkname) @@ -222,11 +151,9 @@ def __call__(self, parent_node, *args, **kwargs): if name in valid_use_methods: actual_name = name[len(prefix):] - return UseMethod(node=self, actual_name=actual_name, - data=self._use_methods[actual_name]) + return UseMethod(node=self, actual_name=actual_name, data=self._use_methods[actual_name]) else: - raise AttributeError("'{}' object has no attribute '{}'".format( - self.__class__.__name__, name)) + raise AttributeError("'{}' object has no attribute '{}'".format(self.__class__.__name__, name)) @property def process_state(self): @@ -298,24 +225,48 @@ def is_finished_ok(self): """ Returns whether the Calculation has finished successfully, which means that it terminated nominally and had a zero exit code indicating a successful execution - # TODO when finish_status is implemented add that in return value determination :return: True if the calculation has finished successfully, False otherwise :rtype: bool """ - return self.process_state == ProcessState.FINISHED + return self.is_finished and self.finish_status == 0 @property def is_failed(self): """ Returns whether the Calculation has failed, which means that it terminated nominally but it had a non-zero exit status - # TODO when finish_status is implemented add that in return value determination :return: True if the calculation has failed, False otherwise :rtype: bool """ - return self.process_state == ProcessState.FINISHED and False + return self.is_finished and self.finish_status != 0 + + @property + def finish_status(self): + """ + Return the finish status of the Calculation + + :returns: the finish status, an integer exit code or None + """ + return self.get_attr(self.FINISH_STATUS_KEY, None) + + def _set_finish_status(self, status): + """ + Set the finish status of the Calculation + + :param state: an integer exit code or None, which will be interpreted as zero + """ + if status is None: + status = 0 + + if isinstance(status, enum.Enum): + status = status.value + + if not isinstance(status, int): + raise ValueError('finish status has to be an integer, got {}'.format(status)) + + return self._set_attr(self.FINISH_STATUS_KEY, status) @property def checkpoint(self): @@ -334,18 +285,28 @@ def _set_checkpoint(self, checkpoint): """ return self._set_attr(self.CHECKPOINT_KEY, checkpoint) - def _del_checkpoint(self, checkpoint): + def _del_checkpoint(self): """ Delete the checkpoint bundle set for the Calculation """ - return self._det_attr(self.CHECKPOINT_KEY) + return self._del_attr(self.CHECKPOINT_KEY) @property def called(self): + """ + Return a list of nodes that the Calculation called + + :returns: list of Calculation nodes called by this Calculation instance + """ return self.get_outputs(link_type=LinkType.CALL) @property def called_by(self): + """ + Return the Calculation that called this Calculation, or None if it does not have a caller + + :returns: Calculation that called this Calculation instance or None + """ called_by = self.get_inputs(link_type=LinkType.CALL) if called_by: return called_by[0] @@ -363,8 +324,7 @@ def get_linkname(self, link, *args, **kwargs): try: data = self._use_methods[link] except KeyError: - raise ValueError("No '{}' link is defined for this " - "calculation".format(link)) + raise ValueError("No '{}' link is defined for this calculation".format(link)) # Raises if the wrong # of parameters is passed additional_parameter = _parse_single_arg( @@ -391,18 +351,14 @@ def _linking_as_output(self, dest, link_type): if link_type is LinkType.CREATE or link_type is LinkType.RETURN: if not isinstance(dest, Data): - raise ValueError( - "The output of a calculation node can only be a data node") + raise ValueError('The output of a calculation node can only be a data node') elif link_type is LinkType.CALL: if not isinstance(dest, AbstractCalculation): - raise ValueError("Call links can only link two calculations.") + raise ValueError('Call links can only link two calculations') else: - raise ValueError( - "Calculation cannot have links of type {} as output".format( - link_type)) + raise ValueError('Calculation cannot have links of type {} as output'.format(link_type)) - return super(AbstractCalculation, self)._linking_as_output( - dest, link_type) + return super(AbstractCalculation, self)._linking_as_output(dest, link_type) def add_link_from(self, src, label=None, link_type=LinkType.INPUT): """ @@ -416,24 +372,19 @@ def add_link_from(self, src, label=None, link_type=LinkType.INPUT): :param link_type: The type of link, must be one of the enum values form :class:`~aiida.common.links.LinkType` """ - from aiida.orm.data import Data from aiida.orm.code import Code + from aiida.orm.data import Data if link_type is LinkType.INPUT: if not isinstance(src, (Data, Code)): - raise ValueError( - "Nodes entering calculation as input link can only be of " - "type data or code") + raise ValueError('Nodes entering calculation as input link can only be of type data or code') elif link_type is LinkType.CALL: if not isinstance(src, AbstractCalculation): - raise ValueError("Call links can only link two calculations.") + raise ValueError('Call links can only link two calculations') else: - raise ValueError( - "Calculation cannot have links of type {} as input".format( - link_type)) + raise ValueError('Calculation cannot have links of type {} as input'.format(link_type)) - return super(AbstractCalculation, self).add_link_from( - src, label, link_type) + return super(AbstractCalculation, self).add_link_from( src, label, link_type) def get_code(self): """ @@ -441,6 +392,7 @@ def get_code(self): was not set. """ from aiida.orm.code import Code + return dict(self.get_inputs(node_type=Code, also_labels=True)).get( self._use_methods['code']['linkname'], None) @@ -451,17 +403,20 @@ def _replace_link_from(self, src, label, link_type=LinkType.INPUT): :param src: a node of the database. It cannot be a Calculation object. :param str label: Name of the link. """ - from aiida.orm.data import Data from aiida.orm.code import Code + from aiida.orm.data import Data if not isinstance(src, (Data, Code)): - raise ValueError("Nodes entering in calculation can only be of " - "type data or code") + raise ValueError('Nodes entering in calculation can only be of type data or code') - return super(AbstractCalculation, self)._replace_link_from( - src, label, link_type) + return super(AbstractCalculation, self)._replace_link_from(src, label, link_type) def _is_valid_cache(self): + """ + Return whether the node is valid for caching + + :returns: True if Calculation is valid to be used for caching, False otherwise + """ return super(AbstractCalculation, self)._is_valid_cache() and self.is_finished_ok def _get_objects_to_hash(self): @@ -471,9 +426,63 @@ def _get_objects_to_hash(self): res = super(AbstractCalculation, self)._get_objects_to_hash() res.append({ key: value.get_hash() - for key, value in self.get_inputs_dict( - link_type=LinkType.INPUT - ).items() + for key, value in self.get_inputs_dict(link_type=LinkType.INPUT).items() if key not in self._hash_ignored_inputs }) return res + + +def _parse_single_arg(function_name, additional_parameter, args, kwargs): + """ + Verifies that a single additional argument has been given (or no + additional argument, if additional_parameter is None). Also + verifies its name. + + :param function_name: the name of the caller function, used for + the output messages + :param additional_parameter: None if no additional parameters + should be passed, or a string with the name of the parameter + if one additional parameter should be passed. + + :return: None, if additional_parameter is None, or the value of + the additional parameter + :raise TypeError: on wrong number of inputs + """ + # Here all the logic to check if the parameters are correct. + if additional_parameter is not None: + if len(args) == 1: + if kwargs: + raise TypeError("{}() received too many args".format( + function_name)) + additional_parameter_data = args[0] + elif len(args) == 0: + kwargs_copy = kwargs.copy() + try: + additional_parameter_data = kwargs_copy.pop( + additional_parameter) + except KeyError: + if kwargs_copy: + raise TypeError("{}() got an unexpected keyword " + "argument '{}'".format( + function_name, kwargs_copy.keys()[0])) + else: + raise TypeError("{}() requires more " + "arguments".format(function_name)) + if kwargs_copy: + raise TypeError("{}() got an unexpected keyword " + "argument '{}'".format( + function_name, kwargs_copy.keys()[0])) + else: + raise TypeError("{}() received too many args".format( + function_name)) + return additional_parameter_data + else: + if kwargs: + raise TypeError("{}() got an unexpected keyword " + "argument '{}'".format( + function_name, kwargs.keys()[0])) + if len(args) != 0: + raise TypeError("{}() received too many args".format( + function_name)) + + return None \ No newline at end of file diff --git a/aiida/orm/implementation/general/calculation/job/__init__.py b/aiida/orm/implementation/general/calculation/job/__init__.py index 0b42d851e8..b7a175bab8 100644 --- a/aiida/orm/implementation/general/calculation/job/__init__.py +++ b/aiida/orm/implementation/general/calculation/job/__init__.py @@ -10,6 +10,7 @@ import abc import copy import datetime +import enum from aiida.backends.utils import get_automatic_user from aiida.common.datastructures import calc_states @@ -33,34 +34,67 @@ _input_subfolder = 'raw_input' +class JobCalculationFinishStatus(enum.Enum): + """ + This enumeration maps specific calculation states to an integer. This integer can + then be used to set the finish status of a JobCalculation node. The values defined + here map directly on the failed calculation states, but the idea is that sub classes + of AbstractJobCalculation can extend this enum with additional error codes + """ + FINISHED = 0 + SUBMISSIONFAILED = 100 + RETRIEVALFAILED = 200 + PARSINGFAILED = 300 + FAILED = 400 + I_AM_A_TEAPOT = 418 + + class AbstractJobCalculation(AbstractCalculation): """ This class provides the definition of an AiiDA calculation that is run remotely on a job scheduler. """ - _updatable_attributes = AbstractCalculation._updatable_attributes + ( - 'job_id', 'scheduler_state','scheduler_lastchecktime', 'last_jobinfo', 'remote_workdir', - 'retrieve_list', 'retrieve_temporary_list', 'retrieve_singlefile_list') + @classproperty + def finish_status_enum(cls): + return JobCalculationFinishStatus + + @property + def finish_status_label(self): + """ + Return the label belonging to the finish status of the Calculation + + :returns: the finish status, an integer exit code or None + """ + finish_status = self.finish_status + + try: + finish_status_enum = self.finish_status_enum(finish_status) + finish_status_label = finish_status_enum.name + except ValueError: + finish_status_label = 'UNKNOWN' + + return finish_status_label _cacheable = True + @classproperty + def _updatable_attributes(cls): + return super(AbstractJobCalculation, cls)._updatable_attributes + ( + 'job_id', 'scheduler_state','scheduler_lastchecktime', 'last_jobinfo', 'remote_workdir', + 'retrieve_list', 'retrieve_temporary_list', 'retrieve_singlefile_list', 'state' + ) + @classproperty def _hash_ignored_attributes(cls): - # _updatable_attributes are ignored automatically. - return super(AbstractJobCalculation, cls)._hash_ignored_attributes + [ + return super(AbstractJobCalculation, cls)._hash_ignored_attributes + ( 'queue_name', 'priority', 'max_wallclock_seconds', 'max_memory_kb', - ] + ) - def get_hash( - self, - ignore_errors=True, - ignored_folder_content=('raw_input',), - **kwargs - ): + def get_hash(self, ignore_errors=True, ignored_folder_content=('raw_input',), **kwargs): return super(AbstractJobCalculation, self).get_hash( ignore_errors=ignore_errors, ignored_folder_content=ignored_folder_content, @@ -78,16 +112,15 @@ def get_builder(cls): def _init_internal_params(self): """ - Define here internal parameters that should be defined - right after the __init__. This function is actually called - by the __init__. + Define here internal parameters that should be defined right after the __init__ + This function is actually called by the __init__ - :note: if you inherit this function, ALWAYS remember to - call super()._init_internal_params() as the first thing - in your inherited function. + :note: if you inherit this function, ALWAYS remember to call super()._init_internal_params() + as the first thing in your inherited function. """ # By default, no output parser self._default_parser = None + # Set default for the link to the retrieved folder (after calc is done) self._linkname_retrieved = 'retrieved' @@ -96,8 +129,7 @@ def _init_internal_params(self): self._SCHED_OUTPUT_FILE = '_scheduler-stdout.txt' self._SCHED_ERROR_FILE = '_scheduler-stderr.txt' - # Files that should be shown by default - # Set it to None if you do not have a default file + # Files that should be shown by default, set it to None if you do not have a default file # Used, e.g., by 'verdi calculation inputshow/outputshow self._DEFAULT_INPUT_FILE = None self._DEFAULT_OUTPUT_FILE = None diff --git a/aiida/orm/implementation/general/calculation/work.py b/aiida/orm/implementation/general/calculation/work.py index 8bb74fd4af..38c3ee43d8 100644 --- a/aiida/orm/implementation/general/calculation/work.py +++ b/aiida/orm/implementation/general/calculation/work.py @@ -8,6 +8,7 @@ # For further information please visit http://www.aiida.net # ########################################################################### +from aiida.common.utils import classproperty from aiida.orm.implementation.calculation import Calculation @@ -19,7 +20,9 @@ class WorkCalculation(Calculation): STEPPER_STATE_INFO_KEY = 'stepper_state_info' - _updatable_attributes = Calculation._updatable_attributes + (STEPPER_STATE_INFO_KEY,) + @classproperty + def _updatable_attributes(cls): + return super(WorkCalculation, cls)._updatable_attributes + (cls.STEPPER_STATE_INFO_KEY,) @property def stepper_state_info(self): diff --git a/aiida/orm/implementation/general/node.py b/aiida/orm/implementation/general/node.py index b81d49bfab..a9e181727f 100644 --- a/aiida/orm/implementation/general/node.py +++ b/aiida/orm/implementation/general/node.py @@ -144,8 +144,12 @@ def __new__(cls, name, bases, attrs): # See documentation in the set() method. _set_incompatibilities = [] - # A list of attribute names that will be ignored when creating the hash. - _hash_ignored_attributes = [] + # A tuple of attribute names that can be updated even after node is stored + # Requires Sealable mixin, but needs empty tuple for base class + _updatable_attributes = tuple() + + # A tuple of attribute names that will be ignored when creating the hash. + _hash_ignored_attributes = tuple() # Flag that determines whether the class can be cached. _cacheable = True @@ -1700,8 +1704,8 @@ def _get_objects_to_hash(self): { key: val for key, val in self.get_attrs().items() if ( - (key not in self._hash_ignored_attributes) and - (key not in getattr(self, '_updatable_attributes', tuple())) + key not in self._hash_ignored_attributes and + key not in self._updatable_attributes ) }, self.folder, diff --git a/aiida/orm/implementation/sqlalchemy/calculation/job/__init__.py b/aiida/orm/implementation/sqlalchemy/calculation/job/__init__.py index 7f5f365bc6..c15599dd60 100644 --- a/aiida/orm/implementation/sqlalchemy/calculation/job/__init__.py +++ b/aiida/orm/implementation/sqlalchemy/calculation/job/__init__.py @@ -51,6 +51,7 @@ def _set_state(self, state): from ``aiida.common.datastructures.calc_states``. :raise: ModificationNotAllowed if the given state was already set. """ + super(JobCalculation, self)._set_state(state) if not self.is_stored: raise ModificationNotAllowed("Cannot set the calculation state " diff --git a/aiida/orm/mixins.py b/aiida/orm/mixins.py index f3addd8ccd..3b3a5da042 100644 --- a/aiida/orm/mixins.py +++ b/aiida/orm/mixins.py @@ -11,6 +11,7 @@ from aiida.common.exceptions import ModificationNotAllowed from aiida.common.lang import override from aiida.common.links import LinkType +from aiida.common.utils import classproperty class Sealable(object): @@ -18,7 +19,9 @@ class Sealable(object): # The name of the attribute to indicate if the node is sealed or not SEALED_KEY = '_sealed' - _updatable_attributes = (SEALED_KEY,) + @classproperty + def _updatable_attributes(cls): + return (cls.SEALED_KEY,) def add_link_from(self, src, label=None, link_type=LinkType.UNSPECIFIED): """ @@ -90,12 +93,11 @@ def _del_attr(self, key): @override def copy(self, include_updatable_attrs=False): """ - Create a copy of the node minus the updatable attributes + Create a copy of the node minus the updatable attributes if include_updatable_attrs is False """ clone = super(Sealable, self).copy() if include_updatable_attrs is False: - # Remove the updatable attributes for key, value in self._iter_updatable_attributes(): clone._del_attr(key) diff --git a/aiida/parsers/parser.py b/aiida/parsers/parser.py index cad6fd6854..4f29c6ece4 100644 --- a/aiida/parsers/parser.py +++ b/aiida/parsers/parser.py @@ -11,6 +11,9 @@ This module implements a generic output plugin, that is general enough to allow the reading of the outputs of a calculation. """ +import logging +from aiida.common.exceptions import NotExistent +from aiida.common.log import aiidalogger, get_dblogger_extra class Parser(object): @@ -22,12 +25,11 @@ class Parser(object): Looks for the attached parser_opts or input_settings nodes attached to the calculation. Get the child Folderdata, parse it and store the parsed data. """ + _linkname_outparams = 'output_parameters' _retrieved_temporary_folder_key = 'retrieved_temporary_folder' def __init__(self, calc): - from aiida.common import aiidalogger - self._logger = aiidalogger.getChild('parser').getChild( self.__class__.__name__) self._calc = calc @@ -37,9 +39,6 @@ def logger(self): Return the logger, also with automatic extras of the associated extras of the calculation """ - import logging - from aiida.common.log import get_dblogger_extra - return logging.LoggerAdapter(logger=self._logger, extra=get_dblogger_extra(self._calc)) @property @@ -52,24 +51,33 @@ def retrieved_temporary_folder_key(self): def parse_with_retrieved(self, retrieved): """ - Receives in input a dictionary of retrieved nodes. - Implement all the logic in this function of the subclass. + This function should be implemented in the Parser subclass and should parse the desired + output from the retrieved nodes in the 'retrieved' input dictionary. It should return a + tuple of an integer and a list of tuples. The integer serves as an exit code to indicate + the successfulness of the parsing, where 0 means success and any non-zero integer indicates + a failure. These integer codes can be chosen by the plugin developer. The list of tuples + are the parsed nodes that need to be stored as ouput nodes of the calculation. The first key + should be the link name and the second key the output node itself. :param retrieved: dictionary of retrieved nodes + :returns: exit code, list of tuples ('link_name', output_node) + :rtype: int, [(basestring, Data)] """ raise NotImplementedError def parse_from_calc(self, retrieved_temporary_folder=None): """ - Parses the datafolder, stores results. - Main functionality of the class. If you only have one retrieved node, - you do not need to reimplement this. Implement only the - parse_with_retrieved + Parse the contents of the retrieved folder data node and return a tuple of to be stored + output data nodes. If you only have one retrieved node, the default folder data node, this + function does not have to be reimplemented in a plugin, only the parse_with_retrieved method. + + :param retrieved_temporary_folder: optional absolute path to directory with temporary retrieved files + :returns: exit code, list of tuples ('link_name', output_node) + :rtype: int, [(basestring, Data)] """ - # select the folder object out_folder = self._calc.get_retrieved_node() if out_folder is None: - self.logger.error("No retrieved folder found") + self.logger.error('No retrieved folder found') return False, () retrieved = {self._calc._get_linkname_retrieved(): out_folder} @@ -92,11 +100,9 @@ def get_result_dict(self): Return a dictionary with all results (faster than doing multiple queries) :note: the function returns an empty dictionary if no output params node - can be found (either because the parser did not create it, or because - the calculation has not been parsed yet). + can be found (either because the parser did not create it, or because + the calculation has not been parsed yet). """ - from aiida.common.exceptions import NotExistent - try: resnode = self.get_result_parameterdata_node() except NotExistent: @@ -112,22 +118,21 @@ def get_result_parameterdata_node(self): :raise NotExistent: if the node does not exist """ from aiida.orm.data.parameter import ParameterData - from aiida.common.exceptions import NotExistent out_parameters = self._calc.get_outputs(type=ParameterData, also_labels=True) - out_parameterdata = [i[1] for i in out_parameters - if i[0] == self.get_linkname_outparams()] + out_parameter_data = [i[1] for i in out_parameters if i[0] == self.get_linkname_outparams()] - if not out_parameterdata: - raise NotExistent("No output .res ParameterData node found") - elif len(out_parameterdata) > 1: + if not out_parameter_data: + raise NotExistent('No output .res ParameterData node found') + + elif len(out_parameter_data) > 1: from aiida.common.exceptions import UniquenessError - raise UniquenessError("Output ParameterData should be found once, " - "found it instead {} times" - .format(len(out_parameterdata))) + raise UniquenessError( + 'Output ParameterData should be found once, found it instead {} times' + .format(len(out_parameter_data))) - return out_parameterdata[0] + return out_parameter_data[0] def get_result_keys(self): """ @@ -135,20 +140,18 @@ def get_result_keys(self): that can be then passed to the get_result() method. :note: the function returns an empty list if no output params node - can be found (either because the parser did not create it, or because - the calculation has not been parsed yet). + can be found (either because the parser did not create it, or because + the calculation has not been parsed yet). :raise UniquenessError: if more than one output node with the name - self._get_linkname_outparams() is found. + self._get_linkname_outparams() is found. """ - from aiida.common.exceptions import NotExistent - try: - resnode = self.get_result_parameterdata_node() + node = self.get_result_parameterdata_node() except NotExistent: return iter([]) - return resnode.keys() + return node.keys() def get_result(self, key_name): """ @@ -156,17 +159,13 @@ def get_result(self, key_name): The following method will should work for a generic parser, provided it has to query only one ParameterData object. """ - resnode = self.get_result_parameterdata_node() + node = self.get_result_parameterdata_node() try: - value = resnode.get_attr(key_name) + value = node.get_attr(key_name) except KeyError: from aiida.common.exceptions import ContentNotExistent raise ContentNotExistent("Key {} not found in results".format(key_name)) - # TODO: eventually, here insert further operations - # (ex: key_name = energy_float_rydberg could return only the last element of a list, - # and convert in the right units) - return value diff --git a/aiida/work/job_processes.py b/aiida/work/job_processes.py index 67bae10069..08dfd0519f 100644 --- a/aiida/work/job_processes.py +++ b/aiida/work/job_processes.py @@ -22,6 +22,7 @@ from aiida.daemon import execmanager from aiida.orm.authinfo import AuthInfo from aiida.orm.calculation.job import JobCalculation +from aiida.orm.calculation.job import JobCalculationFinishStatus from aiida.scheduler.datastructures import job_states from aiida.work.process_spec import DictSchema @@ -35,6 +36,12 @@ RETRIEVE_COMMAND = 'retrieve' +class TransportTaskException(Exception): + + def __init__(self, calc_state): + self.calc_state = calc_state + + class TransportTask(plumpy.Future): """ A general task that requires transport """ def __init__(self, calc_node, transport_queue): @@ -60,12 +67,17 @@ def _execute(self, authinfo, transport): class SubmitJob(TransportTask): """ A task to submit a job calculation """ + def execute(self, transport): - return execmanager.submit_calc(self._calc, self._authinfo, transport) + try: + execmanager.submit_calc(self._calc, self._authinfo, transport) + except Exception as exception: + raise TransportTaskException(calc_states.SUBMISSIONFAILED) class UpdateSchedulerState(TransportTask): """ A task to update the scheduler state of a job calculation """ + def execute(self, transport): scheduler = self._calc.get_computer().get_scheduler() scheduler.set_transport(transport) @@ -86,8 +98,7 @@ def execute(self, transport): # Has the state changed? last_jobinfo = self._calc._get_last_jobinfo() - if last_jobinfo is not None and info.job_state != last_jobinfo.job_state: - execmanager.update_job_calc_from_job_info(self._calc, info) + execmanager.update_job_calc_from_job_info(self._calc, info) job_done = info.job_state == job_states.DONE @@ -117,7 +128,10 @@ def __init__(self, calc_node, transport_queue, retrieved_temporary_folder): def execute(self, transport): """ This returns the retrieved temporary folder """ - return execmanager.retrieve_all(self._calc, transport, self._retrieved_temporary_folder) + try: + return execmanager.retrieve_all(self._calc, transport, self._retrieved_temporary_folder) + except Exception as exception: + raise TransportTaskException(calc_states.RETRIEVALFAILED) class Waiting(plumpy.Waiting): @@ -153,6 +167,7 @@ def action_command(self): if self.data == SUBMIT_COMMAND: self._task = SubmitJob(calc, transport_queue) yield self._task + # Now get scheduler updates self.scheduler_update() @@ -177,6 +192,10 @@ def action_command(self): else: raise RuntimeError("Unknown waiting command") + except TransportTaskException as exception: + # calc._set_state(exception.calc_state) + finish_status = JobCalculationFinishStatus[exception.calc_state] + self.finished(finish_status) except plumpy.KilledError: self.transition_to(processes.ProcessState.KILLED) if self._cancelling_future is not None: @@ -208,6 +227,9 @@ def retrieved(self, retrieved_temporary_folder): self.process.retrieved, retrieved_temporary_folder) + def finished(self, result): + self.transition_to(processes.ProcessState.FINISHED, result) + def cancel(self, msg=None): if self._cancelling_future is not None: return self._cancelling_future @@ -371,7 +393,7 @@ def retrieved(self, retrieved_temporary_folder=None): for the calculation to be finished and the data has been retrieved. """ try: - execmanager.parse_results(self.calc, retrieved_temporary_folder) + exit_code = execmanager.parse_results(self.calc, retrieved_temporary_folder) except BaseException: try: self.calc._set_state(calc_states.PARSINGFAILED) @@ -386,8 +408,7 @@ def retrieved(self, retrieved_temporary_folder=None): for label, node in self.calc.get_outputs_dict().iteritems(): self.out(label, node) - # Done, so return the output - return self.outputs + return exit_code class ContinueJobCalculation(JobProcess): diff --git a/aiida/work/persistence.py b/aiida/work/persistence.py index 2c4d4b0d24..23dfccf674 100644 --- a/aiida/work/persistence.py +++ b/aiida/work/persistence.py @@ -105,6 +105,8 @@ def save_checkpoint(self, process, tag=None): calc = process.calc calc._set_checkpoint(yaml.dump(bundle)) + return bundle + def load_checkpoint(self, pid, tag=None): if tag is not None: raise NotImplementedError("Checkpoint tags not supported yet") diff --git a/aiida/work/processes.py b/aiida/work/processes.py index d346a17495..23df6ccb47 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -47,9 +47,7 @@ class Process(plumpy.Process): _spec_type = ProcessSpec - SINGLE_RETURN_LINKNAME = '[return]' - # This is used for saving node pks in the saved instance state - NODE_TYPE = uuid.UUID('5cac9bab-6f46-485b-9e81-d6a666cfdc1b') + SINGLE_RETURN_LINKNAME = 'return' class SaveKeys(enum.Enum): """ @@ -207,15 +205,22 @@ def on_except(self, exc_info): super(Process, self).on_except(exc_info) self.report(traceback.format_exc()) + @override + def on_finish(self, result): + """ + Set the finish status on the Calculation node + """ + super(Process, self).on_finish(result) + self.calc._set_finish_status(result) + @override def on_fail(self, exc_info): + """ + Format the exception info into a string and log it as an error + """ super(Process, self).on_fail(exc_info) - - exc = traceback.format_exception(exc_info[0], exc_info[1], exc_info[2]) - self.logger.error("{} failed:\n{}".format(self.pid, "".join(exc))) - - exception = exc_info[1] - self.calc._set_attr(WorkCalculation.FAILED_KEY, True) + exception = traceback.format_exception(exc_info[0], exc_info[1], exc_info[2]) + self.logger.error('{} failed:\n{}'.format(self.pid, ''.join(exception))) @override def on_output_emitting(self, output_port, value): @@ -413,8 +418,8 @@ def _use_cache_enabled(self): # Second priority: config except KeyError: return ( - caching.get_use_cache(type(self)) or - caching.get_use_cache(type(self._calc)) + caching.get_use_cache(type(self)) or + caching.get_use_cache(type(self._calc)) ) @@ -654,4 +659,5 @@ def _run(self): "Must be a Data type or a Mapping of {{string: Data}}". format(result.__class__)) - return result + # Execution successful so we return exit code (finish status) 0 + return 0 diff --git a/aiida/work/runners.py b/aiida/work/runners.py index 20074ab2e1..0ba095f0dd 100644 --- a/aiida/work/runners.py +++ b/aiida/work/runners.py @@ -18,8 +18,8 @@ _LOGGER = logging.getLogger(__name__) -ResultAndCalcNode = namedtuple("ResultWithPid", ["result", "calc"]) -ResultAndPid = namedtuple("ResultWithPid", ["result", "pid"]) +ResultAndCalcNode = namedtuple('ResultAndCalcNode', ['result', 'calc']) +ResultAndPid = namedtuple('ResultAndPid', ['result', 'pid']) _runner = None diff --git a/aiida/work/workchain.py b/aiida/work/workchain.py index 1081a3b3e3..0245815aa3 100644 --- a/aiida/work/workchain.py +++ b/aiida/work/workchain.py @@ -11,6 +11,7 @@ import plumpy import plumpy.workchains +from plumpy.workchains import if_, while_, return_, _PropagateReturn from aiida.common.extendeddicts import AttributeDict from aiida.orm.utils import load_node, load_workflow from aiida.common.lang import override @@ -22,7 +23,6 @@ __all__ = ['WorkChain', 'if_', 'while_', 'return_', 'ToContext', 'Outputs', '_WorkChainSpec'] -from plumpy.workchains import if_, while_, return_, _PropagateReturn class _WorkChainSpec(processes.ProcessSpec, plumpy.WorkChainSpec): @@ -120,26 +120,29 @@ def _run(self): return self._do_step() def _do_step(self, wait_on=None): + """ + Execute the next step in the outline, if the stepper returns a non-finished status + and the return value is of type ToContext, it will be added to the awaitables. + If the stepper returns that the process is finished, we return the return value + """ self._awaitables = [] try: - finished, retval = self._stepper.step() - except _PropagateReturn: - finished, retval = True, None + finished, return_value = self._stepper.step() + except _PropagateReturn as exception: + finished, return_value = True, exception.exit_code + + if not finished and (return_value is None or isinstance(return_value, ToContext)): - if not finished: - if retval is not None: - if isinstance(retval, ToContext): - self.to_context(**retval) - else: - raise TypeError("Invalid value returned from step '{}'".format(retval)) + if isinstance(return_value, ToContext): + self.to_context(**return_value) if self._awaitables: return plumpy.Wait(self._do_step, 'Waiting before next step') else: return plumpy.Continue(self._do_step) else: - return self.outputs + return return_value def on_wait(self, awaitables): super(WorkChain, self).on_wait(awaitables) diff --git a/requirements.txt b/requirements.txt index 006f85d987..92868d9018 100644 --- a/requirements.txt +++ b/requirements.txt @@ -144,5 +144,4 @@ wcwidth==0.1.7 Werkzeug==0.14.1 wrapt==1.10.11 yapf==0.19.0 --e git://github.com/muhrin/plumpy.git@6be7b2d9b6d2d6704bc605c97432512ca38f181f#egg=plumpy --e git://github.com/muhrin/kiwipy.git@249fe038f109424b6cdd007092d6f3535346aa7f#egg=kiwipy +-e git://github.com/muhrin/plumpy.git@3fc79acaa3ca0b9c568897616d5f01182df10171#egg=plumpy