-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple broker and coordinator interface #6675
base: main
Are you sure you want to change the base?
Changes from 31 commits
e8856a0
a1e8f87
b837d72
92f1683
6085bca
61934f3
d5733b2
b1f446a
36f1a86
28cdb1c
5d59e6a
c769906
03f7a5b
02a939e
329c51c
82f14dd
2c00fb7
1857656
01f92f7
69db549
a078400
5746ae8
d62816e
597cef1
0cee2f2
eb34d37
ba7896d
5661134
2d84e63
f2ec982
89565bb
adddbae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,3 +43,6 @@ pplot_out/ | |
|
||
# docker | ||
docker-bake.override.json | ||
|
||
# pyenv | ||
.python-version |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
ERROR tests/brokers/test_rabbitmq.py::test_duplicate_subscriber_identifier - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_process.py::test_process_pause - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_process.py::test_process_play - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_daemon.py::test_daemon_restart[options0] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_process.py::test_process_play_all - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_daemon.py::test_daemon_restart[options1] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_process.py::test_process_kill - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/test_work_chain.py::TestWorkchain::test_member_calcfunction_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_process.py::test_process_kill_all - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_devel.py::test_launch_add_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/test_launch.py::test_submit_wait - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_process.py::test_process_repair_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/test_launch.py::test_await_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_daemon.py::test_daemon_status - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/daemon/test_worker.py::test_logging_configuration - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_rabbitmq.py::test_tasks_list_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_daemon.py::test_daemon_status_timeout - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_rabbitmq.py::test_tasks_analyze_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_processes_all_exclusivity[pause_processes] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_rabbitmq.py::test_revive - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/test_memory_leaks.py::test_leak_run_process - AssertionError: Memory leak: process instances remain in memory: {<tests.utils.processes.DummyProcess object at 0x76bfcf0d3a60>} | ||
ERROR tests/engine/processes/test_control.py::test_processes_all_exclusivity[play_processes] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_processes_all_exclusivity[kill_processes] - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/test_memory_leaks.py::test_leak_local_calcjob - AssertionError: Memory leak: process instances remain in memory: {<aiida.calculations.arithmetic.add.ArithmeticAddCalculation object at 0x76bf815eddf0>} | ||
ERROR tests/engine/processes/test_control.py::test_pause_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_pause_processes_all_entries - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_play_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_play_processes_all_entries - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_kill_processes - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_kill_processes_all_entries - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/test_control.py::test_revive - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/processes/calcjobs/test_calc_job.py::test_restart_after_daemon_reset - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/cmdline/commands/test_storage.py::tests_storage_migrate_running_daemon - aiida.engine.daemon.client.DaemonTimeoutException: The daemon failed to start or is unresponsive after 2 seconds. | ||
ERROR tests/engine/test_memory_leaks.py::test_leak_ssh_calcjob - AssertionError: Memory leak: process instances remain in memory: {<aiida.calculations.arithmetic.add.ArithmeticAddCalculation object at 0x76bfcf2a75e0>} | ||
FAILED tests/cmdline/commands/test_daemon.py::test_daemon_start - AssertionError: Traceback (most recent call last): | ||
FAILED tests/cmdline/commands/test_daemon.py::test_daemon_start_number - AssertionError: Traceback (most recent call last): | ||
FAILED tests/cmdline/commands/test_daemon.py::test_daemon_start_number_config - AssertionError: Traceback (most recent call last): | ||
FAILED tests/cmdline/commands/test_devel.py::test_launch_multiply_add_daemon - RuntimeError: Timed out waiting for process with state `ProcessState.CREATED` to enter state `ProcessState.FINISHED`. |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,93 @@ | ||||||||||
import concurrent.futures | ||||||||||
from asyncio import AbstractEventLoop | ||||||||||
from typing import Generic, TypeVar, final | ||||||||||
|
||||||||||
import kiwipy | ||||||||||
from plumpy.exceptions import CoordinatorConnectionError | ||||||||||
from plumpy.rmq.communications import convert_to_comm | ||||||||||
|
||||||||||
__all__ = ['RmqCoordinator'] | ||||||||||
|
||||||||||
U = TypeVar('U', bound=kiwipy.Communicator) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we give it a more meaningful name?
Suggested change
|
||||||||||
|
||||||||||
|
||||||||||
@final | ||||||||||
class RmqLoopCoordinator(Generic[U]): | ||||||||||
def __init__(self, comm: U, loop: AbstractEventLoop): | ||||||||||
self._comm = comm | ||||||||||
self._loop = loop | ||||||||||
|
||||||||||
@property | ||||||||||
def communicator(self) -> U: | ||||||||||
"""The inner communicator.""" | ||||||||||
return self._comm | ||||||||||
|
||||||||||
def add_rpc_subscriber(self, subscriber, identifier=None): | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably should expose types in plumpy to external libraries (see code in plumpy) to use them for type hints here |
||||||||||
subscriber = convert_to_comm(subscriber, self._loop) | ||||||||||
return self._comm.add_rpc_subscriber(subscriber, identifier) | ||||||||||
Comment on lines
+26
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think once we use type hints it will be important for type checking to have a different name, because you change the type of the variable
Suggested change
|
||||||||||
|
||||||||||
def add_broadcast_subscriber( | ||||||||||
self, | ||||||||||
subscriber, | ||||||||||
subject_filters=None, | ||||||||||
sender_filters=None, | ||||||||||
identifier=None, | ||||||||||
): | ||||||||||
# XXX: this change behavior of create_task when decide whether the broadcast is_filtered. | ||||||||||
# Need to understand the BroadcastFilter and make the improvement. | ||||||||||
# To manifest the issue of run_task not await, run twice 'test_launch.py::test_submit_wait'. | ||||||||||
|
||||||||||
# subscriber = kiwipy.BroadcastFilter(subscriber) | ||||||||||
# | ||||||||||
# subject_filters = subject_filters or [] | ||||||||||
# sender_filters = sender_filters or [] | ||||||||||
# | ||||||||||
# for filter in subject_filters: | ||||||||||
# subscriber.add_subject_filter(filter) | ||||||||||
# for filter in sender_filters: | ||||||||||
# subscriber.add_sender_filter(filter) | ||||||||||
|
||||||||||
subscriber = convert_to_comm(subscriber, self._loop) | ||||||||||
return self._comm.add_broadcast_subscriber(subscriber, identifier) | ||||||||||
|
||||||||||
def add_task_subscriber(self, subscriber, identifier=None): | ||||||||||
subscriber = convert_to_comm(subscriber, self._loop) | ||||||||||
return self._comm.add_task_subscriber(subscriber, identifier) | ||||||||||
|
||||||||||
def remove_rpc_subscriber(self, identifier): | ||||||||||
return self._comm.remove_rpc_subscriber(identifier) | ||||||||||
|
||||||||||
def remove_broadcast_subscriber(self, identifier): | ||||||||||
return self._comm.remove_broadcast_subscriber(identifier) | ||||||||||
|
||||||||||
def remove_task_subscriber(self, identifier): | ||||||||||
return self._comm.remove_task_subscriber(identifier) | ||||||||||
|
||||||||||
def rpc_send(self, recipient_id, msg): | ||||||||||
return self._comm.rpc_send(recipient_id, msg) | ||||||||||
|
||||||||||
def broadcast_send( | ||||||||||
self, | ||||||||||
body, | ||||||||||
sender=None, | ||||||||||
subject=None, | ||||||||||
correlation_id=None, | ||||||||||
): | ||||||||||
from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError | ||||||||||
|
||||||||||
try: | ||||||||||
rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) | ||||||||||
except (ChannelInvalidStateError, AMQPConnectionError, concurrent.futures.TimeoutError) as exc: | ||||||||||
raise CoordinatorConnectionError from exc | ||||||||||
else: | ||||||||||
return rsp | ||||||||||
|
||||||||||
def task_send(self, task, no_reply=False): | ||||||||||
return self._comm.task_send(task, no_reply) | ||||||||||
|
||||||||||
def close(self): | ||||||||||
self._comm.close() | ||||||||||
|
||||||||||
def is_closed(self) -> bool: | ||||||||||
"""Return `True` if the communicator was closed""" | ||||||||||
return self._comm.is_closed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we keep two communicator
It could happen easily in the code that two communicators get out of sync (so reference to a different one). If we expose the Communicator in the Coordinator we could simplify it to
and avoid this issue of two communicators. I am not sure how easy it is to replace the communicator in the coordinator, since no one is managing the old communicator anymore. So we would need to close the existing coordinator and raise an error if it cannot be closed.