diff --git a/.gitignore b/.gitignore index a4fdd01ebc..ca6cb23866 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,6 @@ pplot_out/ # docker docker-bake.override.json + +# pyenv +.python-version diff --git a/environment.yml b/environment.yml index ad80dd3416..d38ea2a437 100644 --- a/environment.yml +++ b/environment.yml @@ -22,7 +22,7 @@ dependencies: - importlib-metadata~=6.0 - numpy~=1.21 - paramiko~=3.0 -- plumpy~=0.22.3 +- plumpy - pgsu~=0.3.0 - psutil~=5.6 - psycopg[binary]~=3.0 diff --git a/failed.test b/failed.test new file mode 100644 index 0000000000..23742b610d --- /dev/null +++ b/failed.test @@ -0,0 +1,38 @@ +ERROR tests/brokers/test_rabbitmq.py::test_duplicate_subscriber_identifier - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_process.py::test_process_pause - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_process.py::test_process_play - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_daemon.py::test_daemon_restart[options0] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_process.py::test_process_play_all - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_daemon.py::test_daemon_restart[options1] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_process.py::test_process_kill - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/test_work_chain.py::TestWorkchain::test_member_calcfunction_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_process.py::test_process_kill_all - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_devel.py::test_launch_add_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/test_launch.py::test_submit_wait - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_process.py::test_process_repair_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/test_launch.py::test_await_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_daemon.py::test_daemon_status - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/daemon/test_worker.py::test_logging_configuration - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_rabbitmq.py::test_tasks_list_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_daemon.py::test_daemon_status_timeout - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_rabbitmq.py::test_tasks_analyze_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_processes_all_exclusivity[pause_processes] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_rabbitmq.py::test_revive - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/test_memory_leaks.py::test_leak_run_process - AssertionError: Memory leak: process instances remain in memory: {} +ERROR tests/engine/processes/test_control.py::test_processes_all_exclusivity[play_processes] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_processes_all_exclusivity[kill_processes] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/test_memory_leaks.py::test_leak_local_calcjob - AssertionError: Memory leak: process instances remain in memory: {} +ERROR tests/engine/processes/test_control.py::test_pause_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_pause_processes_all_entries - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_play_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_play_processes_all_entries - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_kill_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_kill_processes_all_entries - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/test_control.py::test_revive - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/processes/calcjobs/test_calc_job.py::test_restart_after_daemon_reset - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/cmdline/commands/test_storage.py::tests_storage_migrate_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. +ERROR tests/engine/test_memory_leaks.py::test_leak_ssh_calcjob - AssertionError: Memory leak: process instances remain in memory: {} +FAILED tests/cmdline/commands/test_daemon.py::test_daemon_start - AssertionError: Traceback (most recent call last): +FAILED tests/cmdline/commands/test_daemon.py::test_daemon_start_number - AssertionError: Traceback (most recent call last): +FAILED tests/cmdline/commands/test_daemon.py::test_daemon_start_number_config - AssertionError: Traceback (most recent call last): +FAILED tests/cmdline/commands/test_devel.py::test_launch_multiply_add_daemon - RuntimeError: Timed out waiting for process with state `ProcessState.CREATED` to enter state `ProcessState.FINISHED`. diff --git a/pyproject.toml b/pyproject.toml index 32894eb4ac..3375a1416c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ 'importlib-metadata~=6.0', 'numpy~=1.21', 'paramiko~=3.0', - 'plumpy~=0.22.3', + "plumpy", 'pgsu~=0.3.0', 'psutil~=5.6', 'psycopg[binary]~=3.0', @@ -47,7 +47,7 @@ dependencies = [ 'tqdm~=4.45', 'typing-extensions~=4.0;python_version<"3.10"', 'upf_to_json~=0.9.2', - 'wrapt~=1.11' + 'wrapt~=1.11', ] description = 'AiiDA is a workflow manager for computational science with a strong focus on provenance, performance and extensibility.' dynamic = ['version'] # read from aiida/__init__.py @@ -510,3 +510,6 @@ passenv = AIIDA_TEST_WORKERS commands = molecule {posargs:test} """ + +[tool.uv.sources] +plumpy = { path = "../plumpy", editable = true } diff --git a/src/aiida/brokers/broker.py b/src/aiida/brokers/broker.py index cfb8b3d50e..7f719f0760 100644 --- a/src/aiida/brokers/broker.py +++ b/src/aiida/brokers/broker.py @@ -3,25 +3,35 @@ import abc import typing as t +from plumpy.controller import ProcessController + if t.TYPE_CHECKING: - from aiida.manage.configuration.profile import Profile + from plumpy.coordinator import Coordinator + __all__ = ('Broker',) +# FIXME: make me a protocol class Broker: """Interface for a message broker that facilitates communication with and between process runners.""" - def __init__(self, profile: 'Profile') -> None: - """Construct a new instance. + # def __init__(self, profile: 'Profile') -> None: + # """Construct a new instance. + # + # :param profile: The profile. + # """ + # self._profile = profile - :param profile: The profile. - """ - self._profile = profile + @abc.abstractmethod + # FIXME: make me a property + def get_coordinator(self) -> 'Coordinator': + """Return an instance of coordinator.""" @abc.abstractmethod - def get_communicator(self): - """Return an instance of :class:`kiwipy.Communicator`.""" + def get_controller(self) -> ProcessController: + """Return the process controller""" + ... @abc.abstractmethod def iterate_tasks(self): diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index c4ecfa2400..dc9af4acd5 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -2,10 +2,15 @@ from __future__ import annotations +import asyncio import functools import typing as t +from plumpy import ProcessController +from plumpy.rmq import RemoteProcessThreadController + from aiida.brokers.broker import Broker +from aiida.brokers.rabbitmq.coordinator import RmqLoopCoordinator from aiida.common.log import AIIDA_LOGGER from aiida.manage.configuration import get_config_option @@ -24,14 +29,16 @@ class RabbitmqBroker(Broker): """Implementation of the message broker interface using RabbitMQ through ``kiwipy``.""" - def __init__(self, profile: Profile) -> None: + def __init__(self, profile: Profile, loop=None) -> None: """Construct a new instance. :param profile: The profile. """ self._profile = profile - self._communicator: 'RmqThreadCommunicator' | None = None + self._communicator: 'RmqThreadCommunicator | None' = None self._prefix = f'aiida-{self._profile.uuid}' + self._coordinator = None + self._loop = loop or asyncio.get_event_loop() def __str__(self): try: @@ -47,16 +54,28 @@ def close(self): def iterate_tasks(self): """Return an iterator over the tasks in the launch queue.""" - for task in self.get_communicator().task_queue(get_launch_queue_name(self._prefix)): + for task in self.get_coordinator().communicator.task_queue(get_launch_queue_name(self._prefix)): yield task - def get_communicator(self) -> 'RmqThreadCommunicator': + def get_coordinator(self): + if self._coordinator is not None: + return self._coordinator + + return self.create_coordinator() + + def create_coordinator(self): if self._communicator is None: self._communicator = self._create_communicator() # Check whether a compatible version of RabbitMQ is being used. self.check_rabbitmq_version() - return self._communicator + coordinator = RmqLoopCoordinator(self._communicator, self._loop) + + return coordinator + + def get_controller(self) -> ProcessController: + coordinator = self.get_coordinator() + return RemoteProcessThreadController(coordinator) def _create_communicator(self) -> 'RmqThreadCommunicator': """Return an instance of :class:`kiwipy.Communicator`.""" @@ -64,7 +83,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator': from aiida.orm.utils import serialize - self._communicator = RmqThreadCommunicator.connect( + _communicator = RmqThreadCommunicator.connect( connection_params={'url': self.get_url()}, message_exchange=get_message_exchange_name(self._prefix), encoder=functools.partial(serialize.serialize, encoding='utf-8'), @@ -78,7 +97,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator': testing_mode=self._profile.is_test_profile, ) - return self._communicator + return _communicator def check_rabbitmq_version(self): """Check the version of RabbitMQ that is being connected to and emit warning if it is not compatible.""" @@ -122,4 +141,4 @@ def get_rabbitmq_version(self): """ from packaging.version import parse - return parse(self.get_communicator().server_properties['version']) + return parse(self.get_coordinator().communicator.server_properties['version']) diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py new file mode 100644 index 0000000000..2cea693d0e --- /dev/null +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -0,0 +1,93 @@ +import concurrent.futures +from asyncio import AbstractEventLoop +from typing import Generic, TypeVar, final + +import kiwipy +from plumpy.exceptions import CoordinatorConnectionError +from plumpy.rmq.communications import convert_to_comm + +__all__ = ['RmqCoordinator'] + +U = TypeVar('U', bound=kiwipy.Communicator) + + +@final +class RmqLoopCoordinator(Generic[U]): + def __init__(self, comm: U, loop: AbstractEventLoop): + self._comm = comm + self._loop = loop + + @property + def communicator(self) -> U: + """The inner communicator.""" + return self._comm + + def add_rpc_subscriber(self, subscriber, identifier=None): + subscriber = convert_to_comm(subscriber, self._loop) + return self._comm.add_rpc_subscriber(subscriber, identifier) + + def add_broadcast_subscriber( + self, + subscriber, + subject_filters=None, + sender_filters=None, + identifier=None, + ): + # XXX: this change behavior of create_task when decide whether the broadcast is_filtered. + # Need to understand the BroadcastFilter and make the improvement. + # To manifest the issue of run_task not await, run twice 'test_launch.py::test_submit_wait'. + + # subscriber = kiwipy.BroadcastFilter(subscriber) + # + # subject_filters = subject_filters or [] + # sender_filters = sender_filters or [] + # + # for filter in subject_filters: + # subscriber.add_subject_filter(filter) + # for filter in sender_filters: + # subscriber.add_sender_filter(filter) + + subscriber = convert_to_comm(subscriber, self._loop) + return self._comm.add_broadcast_subscriber(subscriber, identifier) + + def add_task_subscriber(self, subscriber, identifier=None): + subscriber = convert_to_comm(subscriber, self._loop) + return self._comm.add_task_subscriber(subscriber, identifier) + + def remove_rpc_subscriber(self, identifier): + return self._comm.remove_rpc_subscriber(identifier) + + def remove_broadcast_subscriber(self, identifier): + return self._comm.remove_broadcast_subscriber(identifier) + + def remove_task_subscriber(self, identifier): + return self._comm.remove_task_subscriber(identifier) + + def rpc_send(self, recipient_id, msg): + return self._comm.rpc_send(recipient_id, msg) + + def broadcast_send( + self, + body, + sender=None, + subject=None, + correlation_id=None, + ): + from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError + + try: + rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) + except (ChannelInvalidStateError, AMQPConnectionError, concurrent.futures.TimeoutError) as exc: + raise CoordinatorConnectionError from exc + else: + return rsp + + def task_send(self, task, no_reply=False): + return self._comm.task_send(task, no_reply) + + def close(self): + self._comm.close() + + def is_closed(self) -> bool: + """Return `True` if the communicator was closed""" + return self._comm.is_closed() diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index e203bdddfc..a4e665a544 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -10,6 +10,7 @@ import click +from aiida.brokers.broker import Broker from aiida.cmdline.commands.cmd_verdi import verdi from aiida.cmdline.params import arguments, options, types from aiida.cmdline.utils import decorators, echo @@ -340,8 +341,8 @@ def process_kill(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Killed through `verdi process kill`' - control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + msg_text = 'Killed through `verdi process kill`' + control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, msg_text=msg_text) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') @@ -371,8 +372,8 @@ def process_pause(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Paused through `verdi process pause`' - control.pause_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + msg_text = 'Paused through `verdi process pause`' + control.pause_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, msg_text=msg_text) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') @@ -416,7 +417,7 @@ def process_play(processes, all_entries, timeout, wait): @decorators.with_dbenv() @decorators.with_broker @decorators.only_if_daemon_running(echo.echo_warning, 'daemon is not running, so process may not be reachable') -def process_watch(broker, processes, most_recent_node): +def process_watch(broker: Broker, processes, most_recent_node): """Watch the state transitions of processes. Watch the state transitions for one or multiple running processes.""" @@ -436,7 +437,7 @@ def process_watch(broker, processes, most_recent_node): from kiwipy import BroadcastFilter - def _print(communicator, body, sender, subject, correlation_id): + def _print(coordinator, body, sender, subject, correlation_id): """Format the incoming broadcast data into a message and echo it to stdout.""" if body is None: body = 'No message specified' @@ -446,7 +447,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo(f'Process<{sender}> [{subject}|{correlation_id}]: {body}') - communicator = broker.get_communicator() + coordinator = broker.get_coordinator() echo.echo_report('watching for broadcasted messages, press CTRL+C to stop...') if most_recent_node: @@ -457,7 +458,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo_error(f'Process<{process.pk}> is already terminated') continue - communicator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk)) + coordinator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk)) try: # Block this thread indefinitely until interrupt @@ -467,7 +468,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo('') # add a new line after the interrupt character echo.echo_report('received interrupt, exiting...') try: - communicator.close() + coordinator.close() except RuntimeError: pass diff --git a/src/aiida/cmdline/commands/cmd_rabbitmq.py b/src/aiida/cmdline/commands/cmd_rabbitmq.py index c6a66d6da2..99346896f2 100644 --- a/src/aiida/cmdline/commands/cmd_rabbitmq.py +++ b/src/aiida/cmdline/commands/cmd_rabbitmq.py @@ -20,6 +20,7 @@ from aiida.cmdline.commands.cmd_devel import verdi_devel from aiida.cmdline.params import arguments, options from aiida.cmdline.utils import decorators, echo, echo_tabulate +from aiida.manage.manager import Manager if t.TYPE_CHECKING: import requests @@ -131,12 +132,13 @@ def with_client(ctx, wrapped, _, args, kwargs): @cmd_rabbitmq.command('server-properties') @decorators.with_manager -def cmd_server_properties(manager): +def cmd_server_properties(manager: Manager): """List the server properties.""" import yaml data = {} - for key, value in manager.get_communicator().server_properties.items(): + # FIXME: server_properties as an common API for coordinator? + for key, value in manager.get_coordinator().communicator.server_properties.items(): data[key] = value.decode('utf-8') if isinstance(value, bytes) else value click.echo(yaml.dump(data, indent=4)) diff --git a/src/aiida/cmdline/commands/cmd_status.py b/src/aiida/cmdline/commands/cmd_status.py index 85ef292fa7..6ee1952fb1 100644 --- a/src/aiida/cmdline/commands/cmd_status.py +++ b/src/aiida/cmdline/commands/cmd_status.py @@ -132,7 +132,7 @@ def verdi_status(print_traceback, no_rmq): if broker: try: - broker.get_communicator() + broker.get_coordinator() except Exception as exc: message = f'Unable to connect to broker: {broker}' print_status(ServiceStatus.ERROR, 'broker', message, exception=exc, print_traceback=print_traceback) diff --git a/src/aiida/engine/daemon/worker.py b/src/aiida/engine/daemon/worker.py index 913e44d9b7..7455c4dcc9 100644 --- a/src/aiida/engine/daemon/worker.py +++ b/src/aiida/engine/daemon/worker.py @@ -17,6 +17,7 @@ from aiida.engine.daemon.client import get_daemon_client from aiida.engine.runners import Runner from aiida.manage import get_config_option, get_manager +from aiida.manage.manager import Manager LOGGER = logging.getLogger(__name__) @@ -37,6 +38,39 @@ async def shutdown_worker(runner: Runner) -> None: LOGGER.info('Daemon worker stopped') +def create_daemon_runner(manager: Manager) -> 'Runner': + """Create and return a new daemon runner. + + This is used by workers when the daemon is running and in testing. + + :param loop: the (optional) asyncio event loop to use + + :return: a runner configured to work in the daemon configuration + + """ + from plumpy.persistence import LoadSaveContext + + from aiida.engine import persistence + from aiida.engine.processes.launcher import ProcessLauncher + + runner = manager.create_runner(broker_submit=True, loop=None) + runner_loop = runner.loop + + # Listen for incoming launch requests + task_receiver = ProcessLauncher( + loop=runner_loop, + persister=manager.get_persister(), + load_context=LoadSaveContext(runner=runner), + loader=persistence.get_object_loader(), + ) + + coordinator = manager.get_coordinator() + assert coordinator is not None, 'coordinator not set for runner' + coordinator.add_task_subscriber(task_receiver) + + return runner + + def start_daemon_worker(foreground: bool = False) -> None: """Start a daemon worker for the currently configured profile. @@ -51,7 +85,7 @@ def start_daemon_worker(foreground: bool = False) -> None: try: manager = get_manager() - runner = manager.create_daemon_runner() + runner = create_daemon_runner(manager) manager.set_runner(runner) except Exception: LOGGER.exception('daemon worker failed to start') @@ -66,9 +100,18 @@ def start_daemon_worker(foreground: bool = False) -> None: # https://github.com/python/mypy/issues/12557 runner.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown_worker(runner))) # type: ignore[misc] + # XXX: check the threading use is elegantly implemented: e.g. log handle, error handle, shutdown handle. + # it should work and it is better to have runner has its own event loop to handle the aiida processes only. + # however, it randomly fail some test because of resources not elegantly handled. + # The problem is the runner running in thread is not closed when thread join, the join should be the shutdown operation. + + LOGGER.info('Starting a daemon worker') + # runner_thread = threading.Thread(target=runner.start, daemon=False) + # runner_thread.start() + try: - LOGGER.info('Starting a daemon worker') runner.start() + # runner_thread.join() except SystemError as exception: LOGGER.info('Received a SystemError: %s', exception) runner.close() diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 1059d277ba..2f677c766f 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -11,6 +11,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import functools import logging import tempfile @@ -101,13 +102,13 @@ async def do_upload(): try: logger.info(f'scheduled request to upload CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) skip_submit = await exponential_backoff_retry( do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) except PreSubmitException: raise - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'uploading CalcJob<{node.pk}> failed') @@ -149,11 +150,11 @@ async def do_submit(): try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'submitting CalcJob<{node.pk}> failed') @@ -207,11 +208,11 @@ async def do_update(): try: logger.info(f'scheduled request to update CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) job_done = await exponential_backoff_retry( do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'updating CalcJob<{node.pk}> failed') @@ -257,11 +258,11 @@ async def do_monitor(): try: logger.info(f'scheduled request to monitor CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) monitor_result = await exponential_backoff_retry( do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'monitoring CalcJob<{node.pk}> failed') @@ -333,11 +334,11 @@ async def do_retrieve(): try: logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'retrieving CalcJob<{node.pk}> failed') @@ -569,7 +570,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override await self._kill_job(node, transport_queue) node.set_process_status(str(exception)) return self.retrieve(monitor_result=self._monitor_result) - except (plumpy.futures.CancelledError, asyncio.CancelledError): + except (concurrent.futures.CancelledError, asyncio.CancelledError): node.set_process_status(f'Transport task {self._command} was cancelled') raise except plumpy.process_states.Interruption: diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 7cc214c76c..f9c3434c16 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -4,11 +4,12 @@ import collections import concurrent +import functools import typing as t import kiwipy from kiwipy import communications -from plumpy.futures import unwrap_kiwi_future +from plumpy.rmq.futures import unwrap_kiwi_future from aiida.brokers import Broker from aiida.common.exceptions import AiidaException @@ -18,7 +19,7 @@ from aiida.orm import ProcessNode, QueryBuilder from aiida.tools.query.calculation import CalculationQueryBuilder -LOGGER = AIIDA_LOGGER.getChild('process_control') +LOGGER = AIIDA_LOGGER.getChild('engine.processes') class ProcessTimeoutException(AiidaException): @@ -135,7 +136,7 @@ def play_processes( def pause_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Paused through `aiida.engine.processes.control.pause_processes`', + msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -164,13 +165,14 @@ def pause_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.pause_process, 'pause', 'pausing', timeout, wait, msg=message) + action = functools.partial(controller.pause_process, msg_text=msg_text) + _perform_actions(processes, action, 'pause', 'pausing', timeout, wait) def kill_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Killed through `aiida.engine.processes.control.kill_processes`', + msg_text: str = 'Killed through `aiida.engine.processes.control.kill_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -199,7 +201,8 @@ def kill_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.kill_process, 'kill', 'killing', timeout, wait, msg=message) + action = functools.partial(controller.kill_process, msg_text=msg_text) + _perform_actions(processes, action, 'kill', 'killing', timeout, wait) def _perform_actions( @@ -281,9 +284,9 @@ def handle_result(result): unwrapped = unwrap_kiwi_future(future) result = unwrapped.result() except communications.TimeoutError: - LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out') + LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out', exc_info=True) except Exception as exception: - LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') + LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}', exc_info=True) else: if isinstance(result, kiwipy.Future): LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>') @@ -302,7 +305,7 @@ def handle_result(result): try: result = future.result() except Exception as exception: - LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') + LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}', exc_info=True) else: handle_result(result) diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 8bca68f55c..cca28abedd 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -235,7 +235,7 @@ def run_get_node(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode def kill_process(_num, _frame): """Send the kill signal to the process in the current scope.""" LOGGER.critical('runner received interrupt, killing process %s', process.pid) - result = process.kill(msg='Process was killed because the runner received an interrupt') + result = process.kill(msg_text='Process was killed because the runner received an interrupt') return result # Store the current handler on the signal such that it can be restored after process has terminated diff --git a/src/aiida/engine/processes/futures.py b/src/aiida/engine/processes/futures.py index 096c11b277..79016dacc3 100644 --- a/src/aiida/engine/processes/futures.py +++ b/src/aiida/engine/processes/futures.py @@ -12,6 +12,7 @@ from typing import Optional, Union import kiwipy +from plumpy.coordinator import Coordinator from aiida.orm import Node, load_node @@ -28,17 +29,17 @@ def __init__( pk: int, loop: Optional[asyncio.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, - communicator: Optional[kiwipy.Communicator] = None, + coordinator: Optional[Coordinator] = None, ): """Construct a future for a process node being finished. If a None poll_interval is supplied polling will not be used. - If a communicator is supplied it will be used to listen for broadcast messages. + If a coordinator is supplied it will be used to listen for broadcast messages. :param pk: process pk :param loop: An event loop :param poll_interval: optional polling interval, if None, polling is not activated. - :param communicator: optional communicator, if None, will not subscribe to broadcasts. + :param coordinator: optional coordinator, if None, will not subscribe to broadcasts. """ from .process import ProcessState @@ -46,18 +47,18 @@ def __init__( loop = loop if loop is not None else asyncio.get_event_loop() super().__init__(loop=loop) - assert not (poll_interval is None and communicator is None), 'Must poll or have a communicator to use' + assert not (poll_interval is None and coordinator is None), 'Must poll or have a coordinator to use' node = load_node(pk=pk) if node.is_terminated: self.set_result(node) else: - self._communicator = communicator + self._coordinator = coordinator self.add_done_callback(lambda _: self.cleanup()) # Try setting up a filtered broadcast subscriber - if self._communicator is not None: + if self._coordinator is not None: def _subscriber(*args, **kwargs): if not self.done(): @@ -66,17 +67,17 @@ def _subscriber(*args, **kwargs): broadcast_filter = kiwipy.BroadcastFilter(_subscriber, sender=pk) for state in [ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED]: broadcast_filter.add_subject_filter(f'state_changed.*.{state.value}') - self._broadcast_identifier = self._communicator.add_broadcast_subscriber(broadcast_filter) + self._broadcast_identifier = self._coordinator.add_broadcast_subscriber(broadcast_filter) # Start polling if poll_interval is not None: loop.create_task(self._poll_process(node, poll_interval)) def cleanup(self) -> None: - """Clean up the future by removing broadcast subscribers from the communicator if it still exists.""" - if self._communicator is not None: - self._communicator.remove_broadcast_subscriber(self._broadcast_identifier) - self._communicator = None + """Clean up the future by removing broadcast subscribers from the coordinator if it still exists.""" + if self._coordinator is not None: + self._coordinator.remove_broadcast_subscriber(self._broadcast_identifier) + self._coordinator = None self._broadcast_identifier = None async def _poll_process(self, node: Node, poll_interval: Union[int, float]) -> None: diff --git a/src/aiida/engine/processes/launcher.py b/src/aiida/engine/processes/launcher.py index b6c72aa724..73cb652ee1 100644 --- a/src/aiida/engine/processes/launcher.py +++ b/src/aiida/engine/processes/launcher.py @@ -38,7 +38,7 @@ def handle_continue_exception(node, exception, message): node.set_process_state(ProcessState.EXCEPTED) node.seal() - async def _continue(self, communicator, pid, nowait, tag=None): + async def _continue(self, pid, nowait, tag=None): """Continue the task. Note that the task may already have been completed, as indicated from the corresponding the node, in which @@ -84,7 +84,7 @@ async def _continue(self, communicator, pid, nowait, tag=None): return future.result() try: - result = await super()._continue(communicator, pid, nowait, tag) + result = await super()._continue(pid, nowait, tag) except ImportError as exception: message = 'the class of the process could not be imported.' self.handle_continue_exception(node, exception, message) diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index e25d1b7c23..72480c01e7 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -39,9 +39,10 @@ import plumpy.futures import plumpy.persistence import plumpy.processes -from kiwipy.communications import UnroutableError + +# from kiwipy.communications import UnroutableError +# from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.process_states import Finished, ProcessState -from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.processes import Process as PlumpyProcess from plumpy.utils import AttributesFrozendict @@ -172,13 +173,13 @@ def __init__( from aiida.manage import manager self._runner = runner if runner is not None else manager.get_manager().get_runner() - # assert self._runner.communicator is not None, 'communicator not set for runner' + _coordinator = manager.get_manager().get_coordinator() super().__init__( inputs=self.spec().inputs.serialize(inputs), logger=logger, loop=self._runner.loop, - communicator=self._runner.communicator, + coordinator=_coordinator, ) self._node: Optional[orm.ProcessNode] = None @@ -318,7 +319,9 @@ def load_instance_state( else: self._runner = manager.get_manager().get_runner() - load_context = load_context.copyextend(loop=self._runner.loop, communicator=self._runner.communicator) + _coordinator = manager.get_manager().get_coordinator() + + load_context = load_context.copyextend(loop=self._runner.loop, coordinator=_coordinator) super().load_instance_state(saved_state, load_context) if self.SaveKeys.CALC_ID.value in saved_state: @@ -329,7 +332,7 @@ def load_instance_state( self.node.logger.info(f'Loaded process<{self.node.pk}> from saved state') - def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Future]: + def kill(self, msg_text: Union[str, None] = None) -> Union[bool, plumpy.futures.Future]: """Kill the process and all the children calculations it called :param msg: message @@ -338,7 +341,7 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur had_been_terminated = self.has_terminated() - result = super().kill(msg) + result = super().kill(msg_text) # Only kill children if we could be killed ourselves if result is not False and not had_been_terminated: @@ -348,14 +351,17 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur self.logger.info('no controller available to kill child<%s>', child.pk) continue try: - result = self.runner.controller.kill_process(child.pk, f'Killed by parent<{self.node.pk}>') + result = self.runner.controller.kill_process(child.pk, msg_text=f'Killed by parent<{self.node.pk}>') result = asyncio.wrap_future(result) # type: ignore[arg-type] if asyncio.isfuture(result): killing.append(result) - except ConnectionClosed: - self.logger.info('no connection available to kill child<%s>', child.pk) - except UnroutableError: - self.logger.info('kill signal was unable to reach child<%s>', child.pk) + # FIXME: use generic exception to catch the coordinator side exception + # except ConnectionClosed: + # self.logger.info('no connection available to kill child<%s>', child.pk) + # except UnroutableError: + # self.logger.info('kill signal was unable to reach child<%s>', child.pk) + except Exception: + raise if asyncio.isfuture(result): # We ourselves are waiting to be killed so add it to the list @@ -363,10 +369,10 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur if killing: # We are waiting for things to be killed, so return the 'gathered' future - kill_future = plumpy.futures.gather(*killing) + kill_future = asyncio.gather(*killing) result = self.loop.create_future() - def done(done_future: plumpy.futures.Future): + def done(done_future: asyncio.Future): is_all_killed = all(done_future.result()) result.set_result(is_all_killed) diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 42cb76244c..e0dfad2a86 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -19,11 +19,12 @@ from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Type, Union import kiwipy -from plumpy.communications import wrap_communicator +from plumpy.coordinator import Coordinator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister -from plumpy.process_comms import RemoteProcessThreadController +from plumpy.rmq import RemoteProcessThreadController +from aiida.brokers import Broker from aiida.common import exceptions from aiida.orm import ProcessNode, load_node from aiida.plugins.utils import PluginVersionProvider @@ -56,7 +57,7 @@ class Runner: """Class that can launch processes by running in the current interpreter or by submitting them to the daemon.""" _persister: Optional[Persister] = None - _communicator: Optional[kiwipy.Communicator] = None + _coordinator: Optional[Coordinator] = None _controller: Optional[RemoteProcessThreadController] = None _closed: bool = False @@ -64,7 +65,7 @@ def __init__( self, poll_interval: Union[int, float] = 0, loop: Optional[asyncio.AbstractEventLoop] = None, - communicator: Optional[kiwipy.Communicator] = None, + broker: Broker | None = None, broker_submit: bool = False, persister: Optional[Persister] = None, ): @@ -72,17 +73,17 @@ def __init__( :param poll_interval: interval in seconds between polling for status of active sub processes :param loop: an asyncio event loop, if none is suppled a new one will be created - :param communicator: the communicator to use + :param broker: the broker to use :param broker_submit: if True, processes will be submitted to the broker, otherwise they will be scheduled here :param persister: the persister to use to persist processes """ assert not ( broker_submit and persister is None - ), 'Must supply a persister if you want to submit using communicator' + ), 'Must supply a persister if you want to submit using coordinator/broker' set_event_loop_policy() - self._loop = loop if loop is not None else asyncio.get_event_loop() + self._loop = loop or asyncio.get_event_loop() self._poll_interval = poll_interval self._broker_submit = broker_submit self._transport = transports.TransportQueue(self._loop) @@ -90,11 +91,17 @@ def __init__( self._persister = persister self._plugin_version_provider = PluginVersionProvider() - if communicator is not None: - self._communicator = wrap_communicator(communicator, self._loop) - self._controller = RemoteProcessThreadController(communicator) + # FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction + # Broker should always create inside runner? since they should share the loop. + if broker is not None: + self._coordinator = broker.get_coordinator() + self._controller = broker.get_controller() + + # FIXME: why with wrapper, the pending task not exist?? + # self._coordinator = wrap_communicator(broker.get_coordinator().communicator, self._loop) elif self._broker_submit: - LOGGER.warning('Disabling broker submission, no communicator provided') + # FIXME: if broker then broker_submit else False + LOGGER.warning('Disabling broker submission, no coordinator provided') self._broker_submit = False def __enter__(self) -> 'Runner': @@ -117,11 +124,6 @@ def persister(self) -> Optional[Persister]: """Get the persister used by this runner.""" return self._persister - @property - def communicator(self) -> Optional[kiwipy.Communicator]: - """Get the communicator used by this runner.""" - return self._communicator - @property def plugin_version_provider(self) -> PluginVersionProvider: return self._plugin_version_provider @@ -250,7 +252,7 @@ def kill_process(_num, _frame): LOGGER.warning('runner received interrupt, process %s already being killed', process_inited.pid) return LOGGER.critical('runner received interrupt, killing process %s', process_inited.pid) - process_inited.kill(msg='Process was killed because the runner received an interrupt') + process_inited.kill(msg_text='Process was killed because the runner received an interrupt') original_handler_int = signal.getsignal(signal.SIGINT) original_handler_term = signal.getsignal(signal.SIGTERM) @@ -330,16 +332,16 @@ def inline_callback(event, *args, **kwargs): callback() finally: event.set() - if self.communicator: - self.communicator.remove_broadcast_subscriber(subscriber_identifier) + if self._coordinator: + self._coordinator.remove_broadcast_subscriber(subscriber_identifier) broadcast_filter = kiwipy.BroadcastFilter(functools.partial(inline_callback, event), sender=pk) for state in [ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED]: broadcast_filter.add_subject_filter(f'state_changed.*.{state.value}') - if self.communicator: + if self._coordinator: LOGGER.info('adding subscriber for broadcasts of %d', pk) - self.communicator.add_broadcast_subscriber(broadcast_filter, subscriber_identifier) + self._coordinator.add_broadcast_subscriber(broadcast_filter, subscriber_identifier) self._poll_process(node, functools.partial(inline_callback, event)) def get_process_future(self, pk: int) -> futures.ProcessFuture: @@ -349,7 +351,7 @@ def get_process_future(self, pk: int) -> futures.ProcessFuture: :return: A future representing the completion of the process node """ - return futures.ProcessFuture(pk, self._loop, self._poll_interval, self._communicator) + return futures.ProcessFuture(pk, self._loop, self._poll_interval, self._coordinator) def _poll_process(self, node, callback): """Check whether the process state of the node is terminated and call the callback or reschedule it. diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index 651190454e..fe67e7102a 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -10,12 +10,12 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING, Any, Optional, Union -if TYPE_CHECKING: - import asyncio +from plumpy.coordinator import Coordinator - from kiwipy.rmq import RmqThreadCommunicator +if TYPE_CHECKING: from plumpy.process_comms import RemoteProcessThreadController from aiida.brokers.broker import Broker @@ -59,8 +59,8 @@ class Manager: 3. A single storage backend object for the profile, to connect to data storage resources 5. A single daemon client object for the profile, to connect to the AiiDA daemon - 4. A single communicator object for the profile, to connect to the process control resources - 6. A single process controller object for the profile, which uses the communicator to control process tasks + 4. A single coordinator object for the profile, to connect to the process control resources + 6. A single process controller object for the profile, which uses the coordinator to control process tasks 7. A single runner object for the profile, which uses the process controller to start and stop processes 8. A single persister object for the profile, which can persist running processes to the profile storage @@ -167,7 +167,7 @@ def reset_profile_storage(self) -> None: self._profile_storage = None def reset_broker(self) -> None: - """Reset the communicator.""" + """Reset the broker.""" from concurrent import futures if self._broker is not None: @@ -285,7 +285,14 @@ def get_profile_storage(self) -> 'StorageBackend': return self._profile_storage - def get_broker(self) -> 'Broker' | None: + def get_broker(self) -> 'Broker | None': + if self._broker is not None: + return self._broker + + _default_loop = asyncio.get_event_loop() + return self._create_broker(_default_loop) + + def _create_broker(self, loop) -> 'Broker | None': """Return an instance of :class:`aiida.brokers.broker.Broker` if the profile defines a broker. :returns: The broker of the profile, or ``None`` if the profile doesn't define one. @@ -307,7 +314,7 @@ def get_broker(self) -> 'Broker' | None: entry_point = 'core.rabbitmq' broker_cls = BrokerFactory(entry_point) - self._broker = broker_cls(self._profile) + self._broker = broker_cls(self._profile, loop) return self._broker @@ -324,11 +331,10 @@ def get_persister(self) -> 'AiiDAPersister': return self._persister - def get_communicator(self) -> 'RmqThreadCommunicator': - """Return the communicator - - :return: a global communicator instance + def get_coordinator(self) -> 'Coordinator': + """Return the coordinator + :return: a global coordinator instance """ from aiida.common import ConfigurationError @@ -337,10 +343,10 @@ def get_communicator(self) -> 'RmqThreadCommunicator': if broker is None: assert self._profile is not None raise ConfigurationError( - f'profile `{self._profile.name}` does not provide a communicator because it does not define a broker' + f'profile `{self._profile.name}` does not provide a coordinator because it does not define a broker' ) - return broker.get_communicator() + return broker.get_coordinator() def get_daemon_client(self) -> 'DaemonClient': """Return the daemon client for the current profile. @@ -369,10 +375,10 @@ def get_process_controller(self) -> 'RemoteProcessThreadController': :return: the process controller instance """ - from plumpy.process_comms import RemoteProcessThreadController + from plumpy.rmq import RemoteProcessThreadController if self._process_controller is None: - self._process_controller = RemoteProcessThreadController(self.get_communicator()) + self._process_controller = RemoteProcessThreadController(self.get_coordinator()) return self._process_controller @@ -380,7 +386,6 @@ def get_runner(self, **kwargs) -> 'Runner': """Return a runner that is based on the current profile settings and can be used globally by the code. :return: the global runner - """ if self._runner is None: self._runner = self.create_runner(**kwargs) @@ -391,20 +396,25 @@ def set_runner(self, new_runner: 'Runner') -> None: """Set the currently used runner :param new_runner: the new runner to use - """ if self._runner is not None: self._runner.close() self._runner = new_runner - def create_runner(self, with_persistence: bool = True, **kwargs: Any) -> 'Runner': - """Create and return a new runner + def create_runner( + self, + poll_interval: Union[int, float] | None = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + broker: Broker | None = None, + broker_submit: bool = False, + persister: Optional[AiiDAPersister] = None, + ) -> 'Runner': + """Create and return a new runner, with default settings from profile. :param with_persistence: create a runner with persistence enabled :return: a new runner instance - """ from aiida.common import ConfigurationError from aiida.engine import runners @@ -414,53 +424,22 @@ def create_runner(self, with_persistence: bool = True, **kwargs: Any) -> 'Runner raise ConfigurationError( 'Could not determine the current profile. Consider loading a profile using `aiida.load_profile()`.' ) - poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') - - settings = {'broker_submit': False, 'poll_interval': poll_interval} - settings.update(kwargs) - - if 'communicator' not in settings: - # Only call get_communicator if we have to as it will lazily create - try: - settings['communicator'] = self.get_communicator() - except ConfigurationError: - # The currently loaded profile does not define a broker and so there is no communicator - pass - if with_persistence and 'persister' not in settings: - settings['persister'] = self.get_persister() + _default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') + _default_broker_submit = False + _default_persister = self.get_persister() + _default_loop = asyncio.get_event_loop() - return runners.Runner(**settings) # type: ignore[arg-type] + loop = loop or _default_loop + _default_broker = self._create_broker(loop) - def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner': - """Create and return a new daemon runner. - - This is used by workers when the daemon is running and in testing. - - :param loop: the (optional) asyncio event loop to use - - :return: a runner configured to work in the daemon configuration - - """ - from plumpy.persistence import LoadSaveContext - - from aiida.engine import persistence - from aiida.engine.processes.launcher import ProcessLauncher - - runner = self.create_runner(broker_submit=True, loop=loop) - runner_loop = runner.loop - - # Listen for incoming launch requests - task_receiver = ProcessLauncher( - loop=runner_loop, - persister=self.get_persister(), - load_context=LoadSaveContext(runner=runner), - loader=persistence.get_object_loader(), + runner = runners.Runner( + poll_interval=poll_interval or _default_poll_interval, + loop=loop, + broker=broker or _default_broker, + broker_submit=broker_submit or _default_broker_submit, + persister=persister or _default_persister, ) - - assert runner.communicator is not None, 'communicator not set for runner' - runner.communicator.add_task_subscriber(task_receiver) - return runner def check_version(self): diff --git a/src/aiida/tools/pytest_fixtures/daemon.py b/src/aiida/tools/pytest_fixtures/daemon.py index 89ef02d841..c1f57a5fbc 100644 --- a/src/aiida/tools/pytest_fixtures/daemon.py +++ b/src/aiida/tools/pytest_fixtures/daemon.py @@ -7,6 +7,8 @@ import pytest +from aiida.engine.daemon.client import DaemonClient + if t.TYPE_CHECKING: from aiida.engine import Process, ProcessBuilder from aiida.orm import ProcessNode @@ -47,7 +49,7 @@ def test(daemon_client): @pytest.fixture -def started_daemon_client(daemon_client): +def started_daemon_client(daemon_client: DaemonClient): """Ensure that the daemon is running for the test profile and return the associated client. Usage:: diff --git a/tests/brokers/test_rabbitmq.py b/tests/brokers/test_rabbitmq.py index 2417d27748..5fb5c2bc5c 100644 --- a/tests/brokers/test_rabbitmq.py +++ b/tests/brokers/test_rabbitmq.py @@ -32,7 +32,7 @@ def raise_connection_error(): broker = manager.get_broker() assert 'RabbitMQ v' in str(broker) - monkeypatch.setattr(broker, 'get_communicator', raise_connection_error) + monkeypatch.setattr(broker, 'get_coordinator', raise_connection_error) assert 'RabbitMQ @' in str(broker) @@ -92,14 +92,14 @@ def test_communicator(url): RmqThreadCommunicator.connect(connection_params={'url': url}) -def test_add_rpc_subscriber(communicator): +def test_add_rpc_subscriber(coordinator): """Test ``add_rpc_subscriber``.""" - communicator.add_rpc_subscriber(None) + coordinator.add_rpc_subscriber(lambda: None) -def test_add_broadcast_subscriber(communicator): +def test_add_broadcast_subscriber(coordinator): """Test ``add_broadcast_subscriber``.""" - communicator.add_broadcast_subscriber(None) + coordinator.add_broadcast_subscriber(lambda: None) @pytest.mark.usefixtures('aiida_profile_clean') diff --git a/tests/cmdline/commands/test_rabbitmq.py b/tests/cmdline/commands/test_rabbitmq.py index 78d2d7ce73..b0f304459d 100644 --- a/tests/cmdline/commands/test_rabbitmq.py +++ b/tests/cmdline/commands/test_rabbitmq.py @@ -9,7 +9,7 @@ """Tests for ``verdi devel rabbitmq``.""" import pytest -from plumpy.process_comms import RemoteProcessThreadController +from plumpy.rmq import RemoteProcessThreadController from aiida.cmdline.commands import cmd_rabbitmq from aiida.engine import ProcessState, submit diff --git a/tests/conftest.py b/tests/conftest.py index 89b0a1bad7..acebc1cb7b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,6 +32,7 @@ from aiida.common.folders import Folder from aiida.common.links import LinkType from aiida.manage.configuration import Profile, get_config, load_profile +from aiida.manage.manager import Manager if t.TYPE_CHECKING: from aiida.manage.configuration.config import Config @@ -540,9 +541,9 @@ def backend(manager): @pytest.fixture -def communicator(manager): - """Get the ``Communicator`` instance of the currently loaded profile to communicate with RabbitMQ.""" - return manager.get_communicator() +def coordinator(manager: Manager): + """Get the ``Coordinator`` instance of the currently loaded profile to communicate with RabbitMQ.""" + return manager.get_coordinator() @pytest.fixture diff --git a/tests/engine/processes/test_control.py b/tests/engine/processes/test_control.py index 5bb9b8b7a6..637dbed897 100644 --- a/tests/engine/processes/test_control.py +++ b/tests/engine/processes/test_control.py @@ -1,7 +1,7 @@ """Tests for the :mod:`aiida.engine.processes.control` module.""" import pytest -from plumpy.process_comms import RemoteProcessThreadController +from plumpy.rmq import RemoteProcessThreadController from aiida.engine import ProcessState from aiida.engine.launch import submit @@ -82,6 +82,7 @@ def test_kill_processes(submit_and_await): node = submit_and_await(WaitProcess, ProcessState.WAITING) control.kill_processes([node], wait=True) + # __import__('ipdb').set_trace() assert node.is_terminated assert node.is_killed assert node.process_status == 'Killed through `aiida.engine.processes.control.kill_processes`' diff --git a/tests/engine/test_futures.py b/tests/engine/test_futures.py index b8ba78aa8f..194bcf60c5 100644 --- a/tests/engine/test_futures.py +++ b/tests/engine/test_futures.py @@ -31,7 +31,7 @@ def test_calculation_future_broadcasts(self): # No polling future = processes.futures.ProcessFuture( - pk=process.pid, loop=runner.loop, communicator=manager.get_communicator() + pk=process.pid, loop=runner.loop, coordinator=manager.get_coordinator() ) run(process) diff --git a/tests/engine/test_rmq.py b/tests/engine/test_rmq.py index a2edc2fa41..3284ad4f7c 100644 --- a/tests/engine/test_rmq.py +++ b/tests/engine/test_rmq.py @@ -94,7 +94,7 @@ async def do_pause(): assert calc_node.paused kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -112,7 +112,7 @@ async def do_pause_play(): await asyncio.sleep(0.1) pause_message = 'Take a seat' - pause_future = controller.pause_process(calc_node.pk, msg=pause_message) + pause_future = controller.pause_process(calc_node.pk, msg_text=pause_message) future = await with_timeout(asyncio.wrap_future(pause_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert calc_node.paused @@ -127,7 +127,7 @@ async def do_pause_play(): assert calc_node.process_status is None kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -145,7 +145,7 @@ async def do_kill(): await asyncio.sleep(0.1) kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result diff --git a/tests/engine/test_runners.py b/tests/engine/test_runners.py index db6e0e7493..3281cfc535 100644 --- a/tests/engine/test_runners.py +++ b/tests/engine/test_runners.py @@ -11,7 +11,6 @@ import asyncio import threading -import plumpy import pytest from aiida.calculations.arithmetic.add import ArithmeticAddCalculation @@ -51,7 +50,7 @@ def test_call_on_process_finish(runner): """Test call on calculation finish.""" loop = runner.loop proc = Proc(runner=runner, inputs={'a': Str('input')}) - future = plumpy.Future() + future = asyncio.Future() event = threading.Event() def calc_done(): diff --git a/tests/engine/test_work_chain.py b/tests/engine/test_work_chain.py index 4fb8c70667..6480a45f81 100644 --- a/tests/engine/test_work_chain.py +++ b/tests/engine/test_work_chain.py @@ -28,7 +28,7 @@ def run_until_paused(proc): """Set up a future that will be resolved when process is paused""" listener = plumpy.ProcessListener() - paused = plumpy.Future() + paused = asyncio.Future() if proc.paused: paused.set_result(True) @@ -49,7 +49,7 @@ def run_until_waiting(proc): from aiida.engine import ProcessState listener = plumpy.ProcessListener() - in_waiting = plumpy.Future() + in_waiting = asyncio.Future() if proc.state == ProcessState.WAITING: in_waiting.set_result(True) diff --git a/tests/manage/test_manager.py b/tests/manage/test_manager.py index 3a8f4949cf..d0b82eb7a9 100644 --- a/tests/manage/test_manager.py +++ b/tests/manage/test_manager.py @@ -21,11 +21,16 @@ def test_disconnect(): demonstrate the problematic behavior. Getting the communicator and then disconnecting it (through calling :meth:`aiida.manage.manager.Manager.reset_profile`) works fine. However, if a process is a run before closing it, for example running a calcfunction, the closing of the communicator will raise a ``TimeoutError``. + + The problem was solved by: + - https://github.com/aiidateam/aiida-core/pull/6672 + - https://github.com/mosquito/aiormq/pull/208 + # XXX: this may wrong, because in the new combination of rmq-out, problem solved without the change in aiormq """ from aiida.manage import get_manager manager = get_manager() - manager.get_communicator() + _ = manager.get_coordinator() manager.reset_profile() # This returns just fine result, node = add_calcfunction.run_get_node(1) diff --git a/tests/tools/groups/test_paths.py b/tests/tools/groups/test_paths.py index 6ff2459650..3a8d4dbb69 100644 --- a/tests/tools/groups/test_paths.py +++ b/tests/tools/groups/test_paths.py @@ -116,6 +116,7 @@ def test_walk(setup_groups): @pytest.mark.filterwarnings('ignore::UserWarning') +@pytest.mark.usefixtures('aiida_profile_clean') def test_walk_with_invalid_path(): """Test the ``GroupPath.walk`` method with invalid paths.""" for label in ['a', 'a/b', 'a/c/d', 'a/c/e/g', 'a/f', 'bad//group', 'bad/other']: diff --git a/uv.lock b/uv.lock index 9aa6014660..5a9ee6fb88 100644 --- a/uv.lock +++ b/uv.lock @@ -200,7 +200,7 @@ requires-dist = [ { name = "pg8000", marker = "extra == 'tests'", specifier = "~=1.13" }, { name = "pgsu", specifier = "~=0.3.0" }, { name = "pgtest", marker = "extra == 'tests'", specifier = "~=1.3,>=1.3.1" }, - { name = "plumpy", specifier = "~=0.22.3" }, + { name = "plumpy", editable = "../plumpy" }, { name = "pre-commit", marker = "extra == 'pre-commit'", specifier = "~=3.5" }, { name = "psutil", specifier = "~=5.6" }, { name = "psycopg", extras = ["binary"], specifier = "~=3.0" }, @@ -3085,7 +3085,7 @@ name = "pexpect" version = "4.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "ptyprocess" }, + { name = "ptyprocess", marker = "python_full_version < '3.10' or sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450 } wheels = [ @@ -3241,16 +3241,37 @@ wheels = [ [[package]] name = "plumpy" -version = "0.22.3" -source = { registry = "https://pypi.org/simple" } +version = "0.24.0" +source = { editable = "../plumpy" } dependencies = [ { name = "kiwipy", extra = ["rmq"] }, { name = "nest-asyncio" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ab/99/6c931d3f4697acd34cf18eb3fbfe96ed55cd0408d9be7c0f316349117a8e/plumpy-0.22.3.tar.gz", hash = "sha256:e58f45e6360f173babf04e2a4abacae9867622768ce2a126c8260db3b46372c4", size = 73582 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/95/d9/12fd8281f494ca79d6a7a9d40099616d16415be5807959e5b024dffe8aed/plumpy-0.22.3-py3-none-any.whl", hash = "sha256:63ae6c90713f52483836a3b2b3e1941eab7ada920c303092facc27e78229bdc3", size = 74244 }, + +[package.metadata] +requires-dist = [ + { name = "importlib-metadata", marker = "extra == 'docs'", specifier = "~=4.12.0" }, + { name = "importlib-resources", marker = "extra == 'tests'", specifier = "~=5.2" }, + { name = "ipykernel", marker = "extra == 'tests'", specifier = "==6.12.1" }, + { name = "ipython", marker = "extra == 'docs'", specifier = "~=7.0" }, + { name = "jinja2", marker = "extra == 'docs'", specifier = "==2.11.3" }, + { name = "kiwipy", extras = ["docs"], marker = "extra == 'docs'", specifier = "~=0.8.3" }, + { name = "kiwipy", extras = ["rmq"], specifier = "~=0.8.5" }, + { name = "markupsafe", marker = "extra == 'docs'", specifier = "==2.0.1" }, + { name = "mypy", marker = "extra == 'pre-commit'", specifier = "==1.13.0" }, + { name = "myst-nb", marker = "extra == 'docs'", specifier = "~=0.11.0" }, + { name = "nest-asyncio", specifier = "~=1.5,>=1.5.1" }, + { name = "pre-commit", marker = "extra == 'pre-commit'", specifier = "~=2.2" }, + { name = "pytest", marker = "extra == 'tests'", specifier = "~=7.0" }, + { name = "pytest-asyncio", marker = "extra == 'tests'", specifier = "~=0.12,<0.17" }, + { name = "pytest-cov", marker = "extra == 'tests'", specifier = "~=4.1" }, + { name = "pytest-notebook", marker = "extra == 'tests'", specifier = ">=0.8.0" }, + { name = "pyyaml", specifier = "~=6.0" }, + { name = "shortuuid", marker = "extra == 'tests'", specifier = "==1.0.8" }, + { name = "sphinx", marker = "extra == 'docs'", specifier = "~=3.2.0" }, + { name = "sphinx-book-theme", marker = "extra == 'docs'", specifier = "~=0.0.39" }, + { name = "types-pyyaml", marker = "extra == 'pre-commit'" }, ] [[package]]