diff --git a/aiida/engine/launch.py b/aiida/engine/launch.py index 6026ac4731..967a5c4c74 100644 --- a/aiida/engine/launch.py +++ b/aiida/engine/launch.py @@ -101,8 +101,9 @@ def submit(process: TYPE_SUBMIT_PROCESS, **inputs: Any) -> ProcessNode: process_inited = instantiate_process(runner, process, **inputs) # If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this - # instead of raising, because in this way the user does not have to change the launcher when testing. - if process_inited.metadata.get('dry_run', False): + # instead of raising, because in this way the user does not have to change the launcher when testing. The same goes + # for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation. + if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs: _, node = run_get_node(process_inited) return node diff --git a/aiida/engine/processes/calcjobs/calcjob.py b/aiida/engine/processes/calcjobs/calcjob.py index 474c9c6260..25beb9d817 100644 --- a/aiida/engine/processes/calcjobs/calcjob.py +++ b/aiida/engine/processes/calcjobs/calcjob.py @@ -40,6 +40,7 @@ def validate_calc_job(inputs: Any, ctx: PortNamespace) -> Optional[str]: # pyli * No `Computer` has been specified, neither directly in `metadata.computer` nor indirectly through the `Code` input * The specified computer is not stored * The `Computer` specified in `metadata.computer` is not the same as that of the specified `Code` + * No `Code` has been specified and no `remote_folder` input has been specified, i.e. this is no import run :return: string with error message in case the inputs are invalid """ @@ -53,6 +54,13 @@ def validate_calc_job(inputs: Any, ctx: PortNamespace) -> Optional[str]: # pyli code = inputs.get('code', None) computer_from_code = code.computer computer_from_metadata = inputs.get('metadata', {}).get('computer', None) + remote_folder = inputs.get('remote_folder', None) + + if remote_folder is not None: + # The `remote_folder` input has been specified and so this concerns an import run, which means that neither + # a `Code` nor a `Computer` are required. However, they are allowed to be specified but will not be explicitly + # checked for consistency. + return None if not computer_from_code and not computer_from_metadata: return 'no computer has been specified in `metadata.computer` nor via `code`.' @@ -182,7 +190,15 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override] # yapf: disable super().define(spec) spec.inputs.validator = validate_calc_job # type: ignore[assignment] # takes only PortNamespace not Port - spec.input('code', valid_type=orm.Code, help='The `Code` to use for this job.') + spec.input('code', valid_type=orm.Code, required=False, + help='The `Code` to use for this job. This input is required, unless the `remote_folder` input is ' + 'specified, which means an existing job is being imported and no code will actually be run.') + spec.input('remote_folder', valid_type=orm.RemoteData, required=False, + help='Remote directory containing the results of an already completed calculation job without AiiDA. The ' + 'inputs should be passed to the `CalcJob` as normal but instead of launching the actual job, the ' + 'engine will recreate the input files and then proceed straight to the retrieve step where the files ' + 'of this `RemoteData` will be retrieved as if it had been actually launched through AiiDA. If a ' + 'parser is defined in the inputs, the results are parsed and attached as output nodes as usual.') spec.input('metadata.dry_run', valid_type=bool, default=False, help='When set to `True` will prepare the calculation job for submission but not actually launch it.') spec.input('metadata.computer', valid_type=orm.Computer, required=False, @@ -322,21 +338,13 @@ def run(self) -> Union[plumpy.process_states.Stop, int, plumpy.process_states.Wa """ if self.inputs.metadata.dry_run: # type: ignore[union-attr] - from aiida.common.folders import SubmitTestFolder - from aiida.engine.daemon.execmanager import upload_calculation - from aiida.transports.plugins.local import LocalTransport - - with LocalTransport() as transport: - with SubmitTestFolder() as folder: - calc_info = self.presubmit(folder) - transport.chdir(folder.abspath) - upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True) - self.node.dry_run_info = { - 'folder': folder.abspath, - 'script_filename': self.node.get_option('submit_script_filename') - } + self._perform_dry_run() return plumpy.process_states.Stop(None, True) + if 'remote_folder' in self.inputs: # type: ignore[operator] + exit_code = self._perform_import() + return exit_code + # The following conditional is required for the caching to properly work. Even if the source node has a process # state of `Finished` the cached process will still enter the running state. The process state will have then # been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other @@ -358,7 +366,53 @@ def prepare_for_submission(self, folder: Folder) -> CalcInfo: :param folder: a temporary folder on the local file system. :returns: the `CalcInfo` instance """ - raise NotImplementedError + raise NotImplementedError() + + def _perform_dry_run(self): + """Perform a dry run. + + Instead of performing the normal sequence of steps, just the `presubmit` is called, which will call the method + `prepare_for_submission` of the plugin to generate the input files based on the inputs. Then the upload action + is called, but using a normal local transport that will copy the files to a local sandbox folder. The generated + input script and the absolute path to the sandbox folder are stored in the `dry_run_info` attribute of the node + of this process. + """ + from aiida.common.folders import SubmitTestFolder + from aiida.engine.daemon.execmanager import upload_calculation + from aiida.transports.plugins.local import LocalTransport + + with LocalTransport() as transport: + with SubmitTestFolder() as folder: + calc_info = self.presubmit(folder) + transport.chdir(folder.abspath) + upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True) + self.node.dry_run_info = { + 'folder': folder.abspath, + 'script_filename': self.node.get_option('submit_script_filename') + } + + def _perform_import(self): + """Perform the import of an already completed calculation. + + The inputs contained a `RemoteData` under the key `remote_folder` signalling that this is not supposed to be run + as a normal calculation job, but rather the results are already computed outside of AiiDA and merely need to be + imported. + """ + from aiida.common.datastructures import CalcJobState + from aiida.common.folders import SandboxFolder + from aiida.engine.daemon.execmanager import retrieve_calculation + from aiida.transports.plugins.local import LocalTransport + + with LocalTransport() as transport: + with SandboxFolder() as folder: + with SandboxFolder() as retrieved_temporary_folder: + self.presubmit(folder) + self.node.set_remote_workdir( + self.inputs.remote_folder.get_remote_path() # type: ignore[union-attr] + ) + retrieve_calculation(self.node, transport, retrieved_temporary_folder.abspath) + self.node.set_state(CalcJobState.PARSING) + return self.parse(retrieved_temporary_folder.abspath) def parse(self, retrieved_temporary_folder: Optional[str] = None) -> ExitCode: """Parse a retrieved job calculation. diff --git a/tests/engine/processes/calcjobs/__init__.py b/tests/engine/processes/calcjobs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/engine/test_calc_job.py b/tests/engine/processes/calcjobs/test_calc_job.py similarity index 87% rename from tests/engine/test_calc_job.py rename to tests/engine/processes/calcjobs/test_calc_job.py index d8c86169fa..cdf6872692 100644 --- a/tests/engine/test_calc_job.py +++ b/tests/engine/processes/calcjobs/test_calc_job.py @@ -14,6 +14,7 @@ from functools import partial import io import os +import tempfile from unittest.mock import patch import pytest @@ -409,8 +410,6 @@ def test_rerunnable(self): @pytest.mark.usefixtures('chdir_tmp_path') def test_provenance_exclude_list(self): """Test the functionality of the `CalcInfo.provenance_exclude_list` attribute.""" - import tempfile - code = orm.Code(input_plugin_name='core.arithmetic.add', remote_computer_exec=[self.computer, '/bin/true']).store() @@ -773,3 +772,112 @@ def test_validate_stash_options(stash_options, expected): assert validate_stash_options(stash_options, None) is expected else: assert expected in validate_stash_options(stash_options, None) + + +class TestImport(AiidaTestCase): + """Test the functionality to import existing calculations completed outside of AiiDA.""" + + @classmethod + def setUpClass(cls, *args, **kwargs): + super().setUpClass(*args, **kwargs) + cls.computer.configure() # pylint: disable=no-member + cls.inputs = { + 'code': orm.Code(remote_computer_exec=(cls.computer, '/bin/true')).store(), + 'x': orm.Int(1), + 'y': orm.Int(2), + 'metadata': { + 'options': { + 'resources': { + 'num_machines': 1, + 'num_mpiprocs_per_machine': 1 + }, + } + } + } + + def test_import_from_valid(self): + """Test the import of a successfully completed `ArithmeticAddCalculation`.""" + expected_sum = (self.inputs['x'] + self.inputs['y']).value + + with tempfile.TemporaryDirectory() as directory: + filepath = os.path.join(directory, ArithmeticAddCalculation.spec_options['output_filename'].default) + with open(filepath, 'w') as handle: + handle.write(f'{expected_sum}\n') + + remote = orm.RemoteData(directory, computer=self.computer).store() + inputs = deepcopy(self.inputs) + inputs['remote_folder'] = remote + + results, node = launch.run.get_node(ArithmeticAddCalculation, **inputs) + + # Check node attributes + assert isinstance(node, orm.CalcJobNode) + assert node.is_finished_ok + assert node.is_sealed + + # Verify the expected outputs are there + assert 'retrieved' in results + assert isinstance(results['retrieved'], orm.FolderData) + assert 'sum' in results + assert isinstance(results['sum'], orm.Int) + assert results['sum'].value == expected_sum + + def test_import_from_invalid(self): + """Test the import of a completed `ArithmeticAddCalculation` where parsing will fail. + + The `ArithmeticParser` will return a non-zero exit code if the output file could not be parsed. Make sure that + this is piped through correctly through the infrastructure and will cause the process to be marked as failed. + """ + with tempfile.TemporaryDirectory() as directory: + filepath = os.path.join(directory, ArithmeticAddCalculation.spec_options['output_filename'].default) + with open(filepath, 'w') as handle: + handle.write('a\n') # On purpose write a non-integer to output so the parsing will fail + + remote = orm.RemoteData(directory, computer=self.computer).store() + inputs = deepcopy(self.inputs) + inputs['remote_folder'] = remote + + results, node = launch.run.get_node(ArithmeticAddCalculation, **inputs) + + # Check node attributes + assert isinstance(node, orm.CalcJobNode) + assert node.is_failed + assert node.is_sealed + assert node.exit_status == ArithmeticAddCalculation.exit_codes.ERROR_INVALID_OUTPUT.status + + # Verify the expected outputs are there + assert 'retrieved' in results + assert isinstance(results['retrieved'], orm.FolderData) + + def test_import_non_default_input_file(self): + """Test the import of a successfully completed `ArithmeticAddCalculation` + + The only difference of this test with `test_import_from_valid` is that here the name of the output file + of the completed calculation differs from the default written by the calculation job class.""" + expected_sum = (self.inputs['x'] + self.inputs['y']).value + + output_filename = 'non_standard.out' + + with tempfile.TemporaryDirectory() as directory: + filepath = os.path.join(directory, output_filename) + with open(filepath, 'w') as handle: + handle.write(f'{expected_sum}\n') + + remote = orm.RemoteData(directory, computer=self.computer).store() + inputs = deepcopy(self.inputs) + inputs['remote_folder'] = remote + inputs['metadata']['options']['output_filename'] = output_filename + + results, node = launch.run.get_node(ArithmeticAddCalculation, **inputs) + + # Check node attributes + assert isinstance(node, orm.CalcJobNode) + assert node.is_finished_ok + assert node.is_sealed + + # Verify the expected outputs are there + assert 'retrieved' in results + assert isinstance(results['retrieved'], orm.FolderData) + assert 'sum' in results + assert isinstance(results['sum'], orm.Int) + assert results['sum'].value == expected_sum diff --git a/tests/engine/test_calc_job/test_multi_codes_run_parallel_False_.sh b/tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_parallel_False_.sh similarity index 100% rename from tests/engine/test_calc_job/test_multi_codes_run_parallel_False_.sh rename to tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_parallel_False_.sh diff --git a/tests/engine/test_calc_job/test_multi_codes_run_parallel_True_.sh b/tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_parallel_True_.sh similarity index 100% rename from tests/engine/test_calc_job/test_multi_codes_run_parallel_True_.sh rename to tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_parallel_True_.sh diff --git a/tests/engine/test_calc_job/test_multi_codes_run_withmpi_False_.sh b/tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_withmpi_False_.sh similarity index 100% rename from tests/engine/test_calc_job/test_multi_codes_run_withmpi_False_.sh rename to tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_withmpi_False_.sh diff --git a/tests/engine/test_calc_job/test_multi_codes_run_withmpi_True_.sh b/tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_withmpi_True_.sh similarity index 100% rename from tests/engine/test_calc_job/test_multi_codes_run_withmpi_True_.sh rename to tests/engine/processes/calcjobs/test_calc_job/test_multi_codes_run_withmpi_True_.sh