Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine: implement functionality to import completed CalcJobs #5086

Merged
merged 2 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion aiida/calculations/arithmetic/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ def prepare_for_submission(self, folder: Folder) -> CalcInfo:
handle.write(f'echo $(({self.inputs.x.value} + {self.inputs.y.value}))\n')

codeinfo = CodeInfo()
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.stdin_name = self.options.input_filename
codeinfo.stdout_name = self.options.output_filename

if 'code' in self.inputs:
codeinfo.code_uuid = self.inputs.code.uuid

calcinfo = CalcInfo()
calcinfo.codes_info = [codeinfo]
calcinfo.retrieve_list = [self.options.output_filename]
Expand Down
Empty file.
Empty file.
39 changes: 39 additions & 0 deletions aiida/calculations/importers/arithmetic/add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""Importer for the :class:`aiida.calculations.arithmetic.add.ArithmeticAddCalculation` plugin."""
from pathlib import Path
from re import match
from typing import Dict, Union
from tempfile import NamedTemporaryFile

from aiida.engine import CalcJobImporter
from aiida.orm import Node, Int, RemoteData


class ArithmeticAddCalculationImporter(CalcJobImporter):
"""Importer for the :class:`aiida.calculations.arithmetic.add.ArithmeticAddCalculation` plugin."""

@staticmethod
def parse_remote_data(remote_data: RemoteData, **kwargs) -> Dict[str, Union[Node, Dict]]:
"""Parse the input nodes from the files in the provided ``RemoteData``.

:param remote_data: the remote data node containing the raw input files.
:param kwargs: additional keyword arguments to control the parsing process.
:returns: a dictionary with the parsed inputs nodes that match the input spec of the associated ``CalcJob``.
"""
with NamedTemporaryFile('w+') as handle:
with remote_data.get_authinfo().get_transport() as transport:
filepath = Path(remote_data.get_remote_path()) / 'aiida.in'
transport.getfile(filepath, handle.name)

handle.seek(0)
data = handle.read()

matches = match(r'echo \$\(\(([0-9]+) \+ ([0-9]+)\)\).*', data.strip())

if matches is None:
raise ValueError(f'failed to parse the integers `x` and `y` from the input content: {data}')

return {
'x': Int(matches.group(1)),
'y': Int(matches.group(2)),
}
1 change: 1 addition & 0 deletions aiida/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
'AwaitableTarget',
'BaseRestartWorkChain',
'CalcJob',
'CalcJobImporter',
'CalcJobOutputPort',
'CalcJobProcessSpec',
'DaemonClient',
Expand Down
5 changes: 3 additions & 2 deletions aiida/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ def submit(process: TYPE_SUBMIT_PROCESS, **inputs: Any) -> ProcessNode:
process_inited = instantiate_process(runner, process, **inputs)

# If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this
# instead of raising, because in this way the user does not have to change the launcher when testing.
if process_inited.metadata.get('dry_run', False):
# instead of raising, because in this way the user does not have to change the launcher when testing. The same goes
# for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation.
if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs:
_, node = run_get_node(process_inited)
return node

Expand Down
1 change: 1 addition & 0 deletions aiida/engine/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
'AwaitableTarget',
'BaseRestartWorkChain',
'CalcJob',
'CalcJobImporter',
'CalcJobOutputPort',
'CalcJobProcessSpec',
'ExitCode',
Expand Down
2 changes: 2 additions & 0 deletions aiida/engine/processes/calcjobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# pylint: disable=wildcard-import

from .calcjob import *
from .importer import *
from .manager import *

__all__ = (
'CalcJob',
'CalcJobImporter',
'JobManager',
'JobsList',
)
Expand Down
132 changes: 113 additions & 19 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ..process import Process, ProcessState
from ..process_spec import CalcJobProcessSpec
from .tasks import Waiting, UPLOAD_COMMAND
from .importer import CalcJobImporter

__all__ = ('CalcJob',)

Expand All @@ -40,6 +41,7 @@ def validate_calc_job(inputs: Any, ctx: PortNamespace) -> Optional[str]: # pyli
* No `Computer` has been specified, neither directly in `metadata.computer` nor indirectly through the `Code` input
* The specified computer is not stored
* The `Computer` specified in `metadata.computer` is not the same as that of the specified `Code`
* No `Code` has been specified and no `remote_folder` input has been specified, i.e. this is no import run

:return: string with error message in case the inputs are invalid
"""
Expand All @@ -50,6 +52,14 @@ def validate_calc_job(inputs: Any, ctx: PortNamespace) -> Optional[str]: # pyli
# If the namespace no longer contains the `code` or `metadata.computer` ports we skip validation
return None

remote_folder = inputs.get('remote_folder', None)

if remote_folder is not None:
# The `remote_folder` input has been specified and so this concerns an import run, which means that neither
# a `Code` nor a `Computer` are required. However, they are allowed to be specified but will not be explicitly
# checked for consistency.
return None

code = inputs.get('code', None)
computer_from_code = code.computer
computer_from_metadata = inputs.get('metadata', {}).get('computer', None)
Expand Down Expand Up @@ -182,7 +192,15 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
# yapf: disable
super().define(spec)
spec.inputs.validator = validate_calc_job # type: ignore[assignment] # takes only PortNamespace not Port
spec.input('code', valid_type=orm.Code, help='The `Code` to use for this job.')
spec.input('code', valid_type=orm.Code, required=False,
help='The `Code` to use for this job. This input is required, unless the `remote_folder` input is '
'specified, which means an existing job is being imported and no code will actually be run.')
spec.input('remote_folder', valid_type=orm.RemoteData, required=False,
help='Remote directory containing the results of an already completed calculation job without AiiDA. The '
'inputs should be passed to the `CalcJob` as normal but instead of launching the actual job, the '
'engine will recreate the input files and then proceed straight to the retrieve step where the files '
'of this `RemoteData` will be retrieved as if it had been actually launched through AiiDA. If a '
'parser is defined in the inputs, the results are parsed and attached as output nodes as usual.')
spec.input('metadata.dry_run', valid_type=bool, default=False,
help='When set to `True` will prepare the calculation job for submission but not actually launch it.')
spec.input('metadata.computer', valid_type=orm.Computer, required=False,
Expand Down Expand Up @@ -278,6 +296,27 @@ def spec_options(cls): # pylint: disable=no-self-argument
"""
return cls.spec_metadata['options'] # pylint: disable=unsubscriptable-object

@classmethod
def get_importer(cls, entry_point_name: str = None) -> CalcJobImporter:
"""Load the `CalcJobImporter` associated with this `CalcJob` if it exists.

By default an importer with the same entry point as the ``CalcJob`` will be loaded, however, this can be
overridden using the ``entry_point_name`` argument.

:param entry_point_name: optional entry point name of a ``CalcJobImporter`` to override the default.
:return: the loaded ``CalcJobImporter``.
:raises: if no importer class could be loaded.
"""
from aiida.plugins import CalcJobImporterFactory
from aiida.plugins.entry_point import get_entry_point_from_class

if entry_point_name is None:
_, entry_point = get_entry_point_from_class(cls.__module__, cls.__name__)
if entry_point is not None:
entry_point_name = entry_point.name # type: ignore[attr-defined]

return CalcJobImporterFactory(entry_point_name)()

@property
def options(self) -> AttributeDict:
"""Return the options of the metadata that were specified when this process instance was launched.
Expand Down Expand Up @@ -322,21 +361,13 @@ def run(self) -> Union[plumpy.process_states.Stop, int, plumpy.process_states.Wa

"""
if self.inputs.metadata.dry_run: # type: ignore[union-attr]
from aiida.common.folders import SubmitTestFolder
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport

with LocalTransport() as transport:
with SubmitTestFolder() as folder:
calc_info = self.presubmit(folder)
transport.chdir(folder.abspath)
upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
self.node.dry_run_info = {
'folder': folder.abspath,
'script_filename': self.node.get_option('submit_script_filename')
}
self._perform_dry_run()
return plumpy.process_states.Stop(None, True)

if 'remote_folder' in self.inputs: # type: ignore[operator]
exit_code = self._perform_import()
return exit_code

# The following conditional is required for the caching to properly work. Even if the source node has a process
# state of `Finished` the cached process will still enter the running state. The process state will have then
# been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other
Expand All @@ -358,7 +389,54 @@ def prepare_for_submission(self, folder: Folder) -> CalcInfo:
:param folder: a temporary folder on the local file system.
:returns: the `CalcInfo` instance
"""
raise NotImplementedError
raise NotImplementedError()

def _perform_dry_run(self):
"""Perform a dry run.

Instead of performing the normal sequence of steps, just the `presubmit` is called, which will call the method
`prepare_for_submission` of the plugin to generate the input files based on the inputs. Then the upload action
is called, but using a normal local transport that will copy the files to a local sandbox folder. The generated
input script and the absolute path to the sandbox folder are stored in the `dry_run_info` attribute of the node
of this process.
"""
from aiida.common.folders import SubmitTestFolder
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport

with LocalTransport() as transport:
with SubmitTestFolder() as folder:
calc_info = self.presubmit(folder)
transport.chdir(folder.abspath)
upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
self.node.dry_run_info = {
'folder': folder.abspath,
'script_filename': self.node.get_option('submit_script_filename')
}

def _perform_import(self):
"""Perform the import of an already completed calculation.

The inputs contained a `RemoteData` under the key `remote_folder` signalling that this is not supposed to be run
as a normal calculation job, but rather the results are already computed outside of AiiDA and merely need to be
imported.
"""
from aiida.common.datastructures import CalcJobState
from aiida.common.folders import SandboxFolder
from aiida.engine.daemon.execmanager import retrieve_calculation
from aiida.transports.plugins.local import LocalTransport

with LocalTransport() as transport:
with SandboxFolder() as folder:
with SandboxFolder() as retrieved_temporary_folder:
self.presubmit(folder)
self.node.set_remote_workdir(
self.inputs.remote_folder.get_remote_path() # type: ignore[union-attr]
)
retrieve_calculation(self.node, transport, retrieved_temporary_folder.abspath)
self.node.set_state(CalcJobState.PARSING)
self.node.set_attribute(orm.CalcJobNode.IMMIGRATED_KEY, True)
return self.parse(retrieved_temporary_folder.abspath)

def parse(self, retrieved_temporary_folder: Optional[str] = None) -> ExitCode:
"""Parse a retrieved job calculation.
Expand Down Expand Up @@ -411,7 +489,16 @@ def parse(self, retrieved_temporary_folder: Optional[str] = None) -> ExitCode:

def parse_scheduler_output(self, retrieved: orm.Node) -> Optional[ExitCode]:
"""Parse the output of the scheduler if that functionality has been implemented for the plugin."""
scheduler = self.node.computer.get_scheduler()
computer = self.node.computer

if computer is None:
self.logger.info(
'no computer is defined for this calculation job which suggest that it is an imported job and so '
'scheduler output probably is not available or not in a format that can be reliably parsed, skipping..'
)
return None

scheduler = computer.get_scheduler()
filename_stderr = self.node.get_option('scheduler_stderr')
filename_stdout = self.node.get_option('scheduler_stdout')

Expand Down Expand Up @@ -499,12 +586,12 @@ def presubmit(self, folder: Folder) -> CalcInfo:
from aiida.orm import load_node, Code, Computer
from aiida.schedulers.datastructures import JobTemplate

computer = self.node.computer
inputs = self.node.get_incoming(link_type=LinkType.INPUT_CALC)

if not self.inputs.metadata.dry_run and self.node.has_cached_links(): # type: ignore[union-attr]
raise InvalidOperation('calculation node has unstored links in cache')

computer = self.node.computer
codes = [_ for _ in inputs.all_nodes() if isinstance(_, Code)]

for code in codes:
Expand All @@ -522,17 +609,17 @@ def presubmit(self, folder: Folder) -> CalcInfo:

calc_info = self.prepare_for_submission(folder)
calc_info.uuid = str(self.node.uuid)
scheduler = computer.get_scheduler()

# I create the job template to pass to the scheduler
job_tmpl = JobTemplate()
job_tmpl.shebang = computer.get_shebang()
job_tmpl.submit_as_hold = False
job_tmpl.rerunnable = self.options.get('rerunnable', False)
job_tmpl.job_environment = {}
# 'email', 'email_on_started', 'email_on_terminated',
job_tmpl.job_name = f'aiida-{self.node.pk}'
job_tmpl.sched_output_path = self.options.scheduler_stdout
if computer is not None:
job_tmpl.shebang = computer.get_shebang()
if self.options.scheduler_stderr == self.options.scheduler_stdout:
job_tmpl.sched_join_files = True
else:
Expand All @@ -553,6 +640,13 @@ def presubmit(self, folder: Folder) -> CalcInfo:
retrieve_temporary_list = calc_info.retrieve_temporary_list or []
self.node.set_retrieve_temporary_list(retrieve_temporary_list)

# If the inputs contain a ``remote_folder`` input node, we are in an import scenario and can skip the rest
if 'remote_folder' in inputs.all_link_labels():
return

# The remaining code is only necessary for actual runs, for example, creating the submission script
scheduler = computer.get_scheduler()

# the if is done so that if the method returns None, this is
# not added. This has two advantages:
# - it does not add too many \n\n if most of the prepend_text are empty
Expand Down
21 changes: 21 additions & 0 deletions aiida/engine/processes/calcjobs/importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""Abstract utility class that helps to import calculation jobs completed outside of AiiDA."""
from abc import abstractmethod
from typing import Dict, Union

from aiida.orm import Node, RemoteData

__all__ = ('CalcJobImporter',)


class CalcJobImporter:

@staticmethod
@abstractmethod
def parse_remote_data(remote_data: RemoteData, **kwargs) -> Dict[str, Union[Node, Dict]]:
"""Parse the input nodes from the files in the provided ``RemoteData``.

:param remote_data: the remote data node containing the raw input files.
:param kwargs: additional keyword arguments to control the parsing process.
:returns: a dictionary with the parsed inputs nodes that match the input spec of the associated ``CalcJob``.
"""
7 changes: 7 additions & 0 deletions aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CalcJobNode(CalculationNode):
# pylint: disable=too-many-public-methods

CALC_JOB_STATE_KEY = 'state'
IMMIGRATED_KEY = 'imported'
REMOTE_WORKDIR_KEY = 'remote_workdir'
RETRIEVE_LIST_KEY = 'retrieve_list'
RETRIEVE_TEMPORARY_LIST_KEY = 'retrieve_temporary_list'
Expand Down Expand Up @@ -89,6 +90,7 @@ def tools(self) -> 'CalculationTools':
def _updatable_attributes(cls) -> Tuple[str, ...]: # pylint: disable=no-self-argument
return super()._updatable_attributes + (
cls.CALC_JOB_STATE_KEY,
cls.IMMIGRATED_KEY,
cls.REMOTE_WORKDIR_KEY,
cls.RETRIEVE_LIST_KEY,
cls.RETRIEVE_TEMPORARY_LIST_KEY,
Expand Down Expand Up @@ -151,6 +153,11 @@ def get_builder_restart(self) -> 'ProcessBuilder':
builder.metadata.options = self.get_options() # type: ignore[attr-defined]
return builder

@property
def is_imported(self) -> bool:
"""Return whether the calculation job was imported instead of being an actual run."""
return self.get_attribute(self.IMMIGRATED_KEY, None) is True

def get_option(self, name: str) -> Optional[Any]:
"""
Retun the value of an option that was set for this CalcJobNode
Expand Down
1 change: 1 addition & 0 deletions aiida/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

__all__ = (
'BaseFactory',
'CalcJobImporterFactory',
'CalculationFactory',
'DataFactory',
'DbImporterFactory',
Expand Down
Loading