Skip to content

Commit

Permalink
Engine: implement functionality to import completed CalcJobs
Browse files Browse the repository at this point in the history
When people start using AiiDA they typically already have many
calculation jobs completed without the use of AiiDA and they wish to
import these somehow, such that they can be included in the provenance
graph along with the future calculations they will run through AiiDA.

This concept was originally implemented for the `PwCalculation` in the
`aiida-quantumespresso` plugin and worked, but the approach required a
separate `CalcJob` implementation for each existing `CalcJob` class that
one might want to import.

Here we implement a generic mechanism directly in `aiida-core` that will
allow any `CalcJob` implementation to import already completed jobs. The
calculation job is launched just as one would launch a normal one through
AiiDA, except one additional input is passed: a `RemoteData` instance
under the name `remote_folder` that contains the output files of the
completed calculation. The naming is chosen on purpose to be the same as
the `RemoteData` that is normally created by the engine during a normal
calculation job run.

When the engine detects this input, instead of going through the normal
sequence of transport tasks, it simply performs the presubmit and then
goes straight to the "retrieve" step. Here the engine will retrieve the
files from the provided `RemoteData` as if they had just been produced
during an actual run. In this way, the process is executed almost
exactly in the same way as a normal run, except the job itself is not
actually executed.
  • Loading branch information
sphuber committed Sep 13, 2021
1 parent cce0e30 commit fee0902
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 19 deletions.
5 changes: 3 additions & 2 deletions aiida/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ def submit(process: TYPE_SUBMIT_PROCESS, **inputs: Any) -> ProcessNode:
process_inited = instantiate_process(runner, process, **inputs)

# If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this
# instead of raising, because in this way the user does not have to change the launcher when testing.
if process_inited.metadata.get('dry_run', False):
# instead of raising, because in this way the user does not have to change the launcher when testing. The same goes
# for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation.
if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs:
_, node = run_get_node(process_inited)
return node

Expand Down
84 changes: 69 additions & 15 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def validate_calc_job(inputs: Any, ctx: PortNamespace) -> Optional[str]: # pyli
* No `Computer` has been specified, neither directly in `metadata.computer` nor indirectly through the `Code` input
* The specified computer is not stored
* The `Computer` specified in `metadata.computer` is not the same as that of the specified `Code`
* No `Code` has been specified and no `remote_folder` input has been specified, i.e. this is no import run
:return: string with error message in case the inputs are invalid
"""
Expand All @@ -53,6 +54,13 @@ def validate_calc_job(inputs: Any, ctx: PortNamespace) -> Optional[str]: # pyli
code = inputs.get('code', None)
computer_from_code = code.computer
computer_from_metadata = inputs.get('metadata', {}).get('computer', None)
remote_folder = inputs.get('remote_folder', None)

if remote_folder is not None:
# The `remote_folder` input has been specified and so this concerns an import run, which means that neither
# a `Code` nor a `Computer` are required. However, they are allowed to be specified but will not be explicitly
# checked for consistency.
return None

if not computer_from_code and not computer_from_metadata:
return 'no computer has been specified in `metadata.computer` nor via `code`.'
Expand Down Expand Up @@ -182,7 +190,15 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
# yapf: disable
super().define(spec)
spec.inputs.validator = validate_calc_job # type: ignore[assignment] # takes only PortNamespace not Port
spec.input('code', valid_type=orm.Code, help='The `Code` to use for this job.')
spec.input('code', valid_type=orm.Code, required=False,
help='The `Code` to use for this job. This input is required, unless the `remote_folder` input is '
'specified, which means an existing job is being imported and no code will actually be run.')
spec.input('remote_folder', valid_type=orm.RemoteData, required=False,
help='Remote directory containing the results of an already completed calculation job without AiiDA. The '
'inputs should be passed to the `CalcJob` as normal but instead of launching the actual job, the '
'engine will recreate the input files and then proceed straight to the retrieve step where the files '
'of this `RemoteData` will be retrieved as if it had been actually launched through AiiDA. If a '
'parser is defined in the inputs, the results are parsed and attached as output nodes as usual.')
spec.input('metadata.dry_run', valid_type=bool, default=False,
help='When set to `True` will prepare the calculation job for submission but not actually launch it.')
spec.input('metadata.computer', valid_type=orm.Computer, required=False,
Expand Down Expand Up @@ -322,21 +338,13 @@ def run(self) -> Union[plumpy.process_states.Stop, int, plumpy.process_states.Wa
"""
if self.inputs.metadata.dry_run: # type: ignore[union-attr]
from aiida.common.folders import SubmitTestFolder
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport

with LocalTransport() as transport:
with SubmitTestFolder() as folder:
calc_info = self.presubmit(folder)
transport.chdir(folder.abspath)
upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
self.node.dry_run_info = {
'folder': folder.abspath,
'script_filename': self.node.get_option('submit_script_filename')
}
self._perform_dry_run()
return plumpy.process_states.Stop(None, True)

if 'remote_folder' in self.inputs: # type: ignore[operator]
exit_code = self._perform_import()
return exit_code

# The following conditional is required for the caching to properly work. Even if the source node has a process
# state of `Finished` the cached process will still enter the running state. The process state will have then
# been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other
Expand All @@ -358,7 +366,53 @@ def prepare_for_submission(self, folder: Folder) -> CalcInfo:
:param folder: a temporary folder on the local file system.
:returns: the `CalcInfo` instance
"""
raise NotImplementedError
raise NotImplementedError()

def _perform_dry_run(self):
"""Perform a dry run.
Instead of performing the normal sequence of steps, just the `presubmit` is called, which will call the method
`prepare_for_submission` of the plugin to generate the input files based on the inputs. Then the upload action
is called, but using a normal local transport that will copy the files to a local sandbox folder. The generated
input script and the absolute path to the sandbox folder are stored in the `dry_run_info` attribute of the node
of this process.
"""
from aiida.common.folders import SubmitTestFolder
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport

with LocalTransport() as transport:
with SubmitTestFolder() as folder:
calc_info = self.presubmit(folder)
transport.chdir(folder.abspath)
upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
self.node.dry_run_info = {
'folder': folder.abspath,
'script_filename': self.node.get_option('submit_script_filename')
}

def _perform_import(self):
"""Perform the import of an already completed calculation.
The inputs contained a `RemoteData` under the key `remote_folder` signalling that this is not supposed to be run
as a normal calculation job, but rather the results are already computed outside of AiiDA and merely need to be
imported.
"""
from aiida.common.datastructures import CalcJobState
from aiida.common.folders import SandboxFolder
from aiida.engine.daemon.execmanager import retrieve_calculation
from aiida.transports.plugins.local import LocalTransport

with LocalTransport() as transport:
with SandboxFolder() as folder:
with SandboxFolder() as retrieved_temporary_folder:
self.presubmit(folder)
self.node.set_remote_workdir(
self.inputs.remote_folder.get_remote_path() # type: ignore[union-attr]
)
retrieve_calculation(self.node, transport, retrieved_temporary_folder.abspath)
self.node.set_state(CalcJobState.PARSING)
return self.parse(retrieved_temporary_folder.abspath)

def parse(self, retrieved_temporary_folder: Optional[str] = None) -> ExitCode:
"""Parse a retrieved job calculation.
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from functools import partial
import io
import os
import tempfile
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -409,8 +410,6 @@ def test_rerunnable(self):
@pytest.mark.usefixtures('chdir_tmp_path')
def test_provenance_exclude_list(self):
"""Test the functionality of the `CalcInfo.provenance_exclude_list` attribute."""
import tempfile

code = orm.Code(input_plugin_name='core.arithmetic.add', remote_computer_exec=[self.computer,
'/bin/true']).store()

Expand Down Expand Up @@ -773,3 +772,112 @@ def test_validate_stash_options(stash_options, expected):
assert validate_stash_options(stash_options, None) is expected
else:
assert expected in validate_stash_options(stash_options, None)


class TestImport(AiidaTestCase):
"""Test the functionality to import existing calculations completed outside of AiiDA."""

@classmethod
def setUpClass(cls, *args, **kwargs):
super().setUpClass(*args, **kwargs)
cls.computer.configure() # pylint: disable=no-member
cls.inputs = {
'code': orm.Code(remote_computer_exec=(cls.computer, '/bin/true')).store(),
'x': orm.Int(1),
'y': orm.Int(2),
'metadata': {
'options': {
'resources': {
'num_machines': 1,
'num_mpiprocs_per_machine': 1
},
}
}
}

def test_import_from_valid(self):
"""Test the import of a successfully completed `ArithmeticAddCalculation`."""
expected_sum = (self.inputs['x'] + self.inputs['y']).value

with tempfile.TemporaryDirectory() as directory:
filepath = os.path.join(directory, ArithmeticAddCalculation.spec_options['output_filename'].default)
with open(filepath, 'w') as handle:
handle.write(f'{expected_sum}\n')

remote = orm.RemoteData(directory, computer=self.computer).store()
inputs = deepcopy(self.inputs)
inputs['remote_folder'] = remote

results, node = launch.run.get_node(ArithmeticAddCalculation, **inputs)

# Check node attributes
assert isinstance(node, orm.CalcJobNode)
assert node.is_finished_ok
assert node.is_sealed

# Verify the expected outputs are there
assert 'retrieved' in results
assert isinstance(results['retrieved'], orm.FolderData)
assert 'sum' in results
assert isinstance(results['sum'], orm.Int)
assert results['sum'].value == expected_sum

def test_import_from_invalid(self):
"""Test the import of a completed `ArithmeticAddCalculation` where parsing will fail.
The `ArithmeticParser` will return a non-zero exit code if the output file could not be parsed. Make sure that
this is piped through correctly through the infrastructure and will cause the process to be marked as failed.
"""
with tempfile.TemporaryDirectory() as directory:
filepath = os.path.join(directory, ArithmeticAddCalculation.spec_options['output_filename'].default)
with open(filepath, 'w') as handle:
handle.write('a\n') # On purpose write a non-integer to output so the parsing will fail

remote = orm.RemoteData(directory, computer=self.computer).store()
inputs = deepcopy(self.inputs)
inputs['remote_folder'] = remote

results, node = launch.run.get_node(ArithmeticAddCalculation, **inputs)

# Check node attributes
assert isinstance(node, orm.CalcJobNode)
assert node.is_failed
assert node.is_sealed
assert node.exit_status == ArithmeticAddCalculation.exit_codes.ERROR_INVALID_OUTPUT.status

# Verify the expected outputs are there
assert 'retrieved' in results
assert isinstance(results['retrieved'], orm.FolderData)

def test_import_non_default_input_file(self):
"""Test the import of a successfully completed `ArithmeticAddCalculation`
The only difference of this test with `test_import_from_valid` is that here the name of the output file
of the completed calculation differs from the default written by the calculation job class."""
expected_sum = (self.inputs['x'] + self.inputs['y']).value

output_filename = 'non_standard.out'

with tempfile.TemporaryDirectory() as directory:
filepath = os.path.join(directory, output_filename)
with open(filepath, 'w') as handle:
handle.write(f'{expected_sum}\n')

remote = orm.RemoteData(directory, computer=self.computer).store()
inputs = deepcopy(self.inputs)
inputs['remote_folder'] = remote
inputs['metadata']['options']['output_filename'] = output_filename

results, node = launch.run.get_node(ArithmeticAddCalculation, **inputs)

# Check node attributes
assert isinstance(node, orm.CalcJobNode)
assert node.is_finished_ok
assert node.is_sealed

# Verify the expected outputs are there
assert 'retrieved' in results
assert isinstance(results['retrieved'], orm.FolderData)
assert 'sum' in results
assert isinstance(results['sum'], orm.Int)
assert results['sum'].value == expected_sum

0 comments on commit fee0902

Please sign in to comment.