diff --git a/src/plumpy/__init__.py b/src/plumpy/__init__.py index cc65ba23..864d2226 100644 --- a/src/plumpy/__init__.py +++ b/src/plumpy/__init__.py @@ -18,9 +18,9 @@ from .process_listener import * from .process_states import * from .processes import * +from .rmq import * from .utils import * from .workchains import * -from .rmq import * __all__ = ( events.__all__ diff --git a/src/plumpy/futures.py b/src/plumpy/futures.py index b67c0e80..ed43389e 100644 --- a/src/plumpy/futures.py +++ b/src/plumpy/futures.py @@ -88,6 +88,4 @@ def create_task(coro: Callable[[], Awaitable[Any]], loop: Optional[asyncio.Abstr # asyncio.run_coroutine_threadsafe(run_task(), loop) # return future - return asyncio.wrap_future( - asyncio.run_coroutine_threadsafe(coro(), loop) - ) + return asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro(), loop)) diff --git a/src/plumpy/message.py b/src/plumpy/message.py index 14a4e251..009f1b26 100644 --- a/src/plumpy/message.py +++ b/src/plumpy/message.py @@ -10,8 +10,6 @@ from plumpy.coordinator import Coordinator from plumpy.exceptions import PersistenceError, TaskRejectedError -from plumpy.exceptions import PersistenceError, TaskRejectedError - from . import loaders, persistence from .utils import PID_TYPE diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index c40347b2..7e82a9c3 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- """The main Process module""" +from __future__ import annotations + import abc import asyncio import concurrent.futures diff --git a/src/plumpy/rmq/communications.py b/src/plumpy/rmq/communications.py index b27b65a1..50927557 100644 --- a/src/plumpy/rmq/communications.py +++ b/src/plumpy/rmq/communications.py @@ -78,11 +78,11 @@ def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> k return converted + T = TypeVar('T', bound=kiwipy.Communicator) -def wrap_communicator( - communicator: T, loop: Optional[asyncio.AbstractEventLoop] = None -) -> 'LoopCommunicator[T]': + +def wrap_communicator(communicator: T, loop: Optional[asyncio.AbstractEventLoop] = None) -> 'LoopCommunicator[T]': """ Wrap a communicator such that all callbacks made to any subscribers are scheduled on the given event loop. diff --git a/src/plumpy/rmq/coordinator.py b/src/plumpy/rmq/coordinator.py deleted file mode 100644 index c529b61c..00000000 --- a/src/plumpy/rmq/coordinator.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -from typing import Generic, TypeVar, final -import kiwipy -import concurrent.futures - -from plumpy.exceptions import CoordinatorConnectionError - -__all__ = ['RmqCoordinator'] - -U = TypeVar("U", bound=kiwipy.Communicator) - -@final -class RmqCoordinator(Generic[U]): - def __init__(self, comm: U): - self._comm = comm - - @property - def communicator(self) -> U: - """The inner communicator.""" - return self._comm - - # XXX: naming - `add_receiver_rpc` - def add_rpc_subscriber(self, subscriber, identifier=None): - return self._comm.add_rpc_subscriber(subscriber, identifier) - - # XXX: naming - `add_receiver_broadcast` - def add_broadcast_subscriber( - self, - subscriber, - subject_filters=None, - sender_filters=None, - identifier=None, - ): - subscriber = kiwipy.BroadcastFilter(subscriber) - - subject_filters = subject_filters or [] - sender_filters = sender_filters or [] - - for filter in subject_filters: - subscriber.add_subject_filter(filter) - for filter in sender_filters: - subscriber.add_sender_filter(filter) - - return self._comm.add_broadcast_subscriber(subscriber, identifier) - - # XXX: naming - `add_reciver_task` (can be combined with two above maybe??) - def add_task_subscriber(self, subscriber, identifier=None): - return self._comm.add_task_subscriber(subscriber, identifier) - - def remove_rpc_subscriber(self, identifier): - return self._comm.remove_rpc_subscriber(identifier) - - def remove_broadcast_subscriber(self, identifier): - return self._comm.remove_broadcast_subscriber(identifier) - - def remove_task_subscriber(self, identifier): - return self._comm.remove_task_subscriber(identifier) - - # XXX: naming - `send_to` - def rpc_send(self, recipient_id, msg): - return self._comm.rpc_send(recipient_id, msg) - - # XXX: naming - `broadcast` - def broadcast_send( - self, - body, - sender=None, - subject=None, - correlation_id=None, - ): - from aio_pika.exceptions import ChannelInvalidStateError, AMQPConnectionError - - try: - rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) - except (ChannelInvalidStateError, AMQPConnectionError, concurrent.futures.TimeoutError) as exc: - raise CoordinatorConnectionError from exc - else: - return rsp - - # XXX: naming - `assign_task` (this may able to be combined with send_to) - def task_send(self, task, no_reply=False): - return self._comm.task_send(task, no_reply) - - def close(self): - self._comm.close() diff --git a/src/plumpy/rmq/futures.py b/src/plumpy/rmq/futures.py index 0ebe0d45..b0da02db 100644 --- a/src/plumpy/rmq/futures.py +++ b/src/plumpy/rmq/futures.py @@ -10,7 +10,7 @@ import kiwipy -__all__ = ['wrap_to_concurrent_future', 'unwrap_kiwi_future'] +__all__ = ['unwrap_kiwi_future', 'wrap_to_concurrent_future'] def _convert_future_exc(exc): @@ -112,6 +112,7 @@ def wrap_to_concurrent_future(future: asyncio.Future[Any]) -> kiwipy.Future: _chain_future(future, new_future) return new_future + # XXX: this required in aiida-core, see if really need this unwrap. def unwrap_kiwi_future(future: kiwipy.Future) -> kiwipy.Future: """