Skip to content

Commit

Permalink
Merge branch 'develop' into process_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjsewell authored Mar 9, 2021
2 parents 7c08e11 + 77e3820 commit 69494c7
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repos:
- id: pylint
additional_dependencies: [
"pyyaml~=5.1.2", "nest_asyncio~=1.4.0", "aio-pika~=6.6",
"aiocontextvars~=0.2.2; python_version<'3.7'", "kiwipy[rmq]~=0.7.1"
"aiocontextvars~=0.2.2; python_version<'3.7'", "kiwipy[rmq]~=0.7.4"
]
args:
[
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog

## v0.19.0 - 2021-03-09

- ‼️ DEPRECATE: `Process.done` method:
This method is a duplicate of `Process.has_terminated`, and is not used anywhere in plumpy (or aiida-core).

- 🐛 FIX: `Task.cancel` should not set state as `EXCEPTED`
`asyncio.CancelledError` are generated when an async task is cancelled.
In python 3.7 this exception class inherits from `Exception`, whereas in python 3.8+ it inherits from `BaseException`.
This meant it python 3.7 it was being caught by `except Exception`, and setting the process state to `EXCEPTED`,
whereas in python 3.8+ it was being re-raised to the caller.
We now ensure in both versions it is re-raised (particularly because aiida-core currently relies on this behaviour).

- 👌 IMPROVE: Process broadcast subscriber
Filter out `state_changed` broadcasts, and allow these to pass-through without generating a (costly) asynchronous task.
Note this also required an update in the minimal kiwipy version, to `0.7.4`

## v0.18.6 - 2021-02-24

👌 IMPROVE: Catch state change broadcast timeout
Expand Down
23 changes: 22 additions & 1 deletion plumpy/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,34 @@ def convert_to_comm(callback: 'Subscriber',
on the given even loop and return a kiwi future representing the future outcome
of the original method.
:param loop: the even loop to schedule the callback in
:param callback: the function to convert
:param loop: the even loop to schedule the callback in
:return: a new callback function that returns a future
"""
if isinstance(callback, kiwipy.BroadcastFilter):

# if the broadcast is filtered for this callback,
# we don't want to go through the (costly) process
# of setting up async tasks and callbacks

def _passthrough(*args: Any, **kwargs: Any) -> bool:
sender = kwargs.get('sender', args[1])
subject = kwargs.get('subject', args[2])
return callback.is_filtered(sender, subject) # type: ignore[attr-defined]
else:

def _passthrough(*args: Any, **kwargs: Any) -> bool: # pylint: disable=unused-argument
return False

coro = ensure_coroutine(callback)

def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> kiwipy.Future:

if _passthrough(*args, **kwargs):
kiwi_future = kiwipy.Future()
kiwi_future.set_result(None)
return kiwi_future

msg_fn = functools.partial(coro, communicator, *args, **kwargs)
task_future = futures.create_task(msg_fn, loop)
return plum_to_kiwi_future(task_future)
Expand Down
7 changes: 4 additions & 3 deletions plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import enum
import functools
import logging
import re
import sys
import time
from types import TracebackType
Expand Down Expand Up @@ -300,9 +301,9 @@ def init(self) -> None:
self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

try:
identifier = self._communicator.add_broadcast_subscriber(
self.broadcast_receive, identifier=str(self.pid)
)
# filter out state change broadcasts
subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*'))
identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid))
self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
except kiwipy.TimeoutError:
self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)
Expand Down
2 changes: 1 addition & 1 deletion plumpy/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
__version__: str = '0.18.6'
__version__: str = '0.19.0'

__all__ = ['__version__']
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
python_requires='>=3.6',
install_requires=[
'pyyaml~=5.1.2', 'nest_asyncio~=1.4.0', 'aio-pika~=6.6', 'aiocontextvars~=0.2.2; python_version<"3.7"',
'kiwipy[rmq]~=0.7.1'
'kiwipy[rmq]~=0.7.4'
],
extras_require={
'docs': ['sphinx~=3.2.0', 'myst-nb~=0.11.0', 'sphinx-book-theme~=0.0.39', 'ipython~=7.0'],
Expand Down
29 changes: 28 additions & 1 deletion test/rmq/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import asyncio
import shortuuid

from kiwipy import BroadcastFilter, RemoteException, rmq
import pytest
from kiwipy import RemoteException, rmq

import plumpy
from plumpy import communications, process_comms
Expand Down Expand Up @@ -78,6 +78,33 @@ def get_broadcast(_comm, body, sender, subject, correlation_id):
result = await broadcast_future
assert result == BROADCAST

@pytest.mark.asyncio
async def test_broadcast_filter(self, loop_communicator):

broadcast_future = plumpy.Future()

loop = asyncio.get_event_loop()

def ignore_broadcast(_comm, body, sender, subject, correlation_id):
broadcast_future.set_exception(AssertionError('broadcast received'))

def get_broadcast(_comm, body, sender, subject, correlation_id):
broadcast_future.set_result(True)

loop_communicator.add_broadcast_subscriber(BroadcastFilter(ignore_broadcast, subject='other'))
loop_communicator.add_broadcast_subscriber(get_broadcast)
loop_communicator.broadcast_send(
**{
'body': 'present',
'sender': 'Martin',
'subject': 'sup',
'correlation_id': 420
}
)

result = await broadcast_future
assert result is True

@pytest.mark.asyncio
async def test_rpc(self, loop_communicator):
MSG = 'rpc this'
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ filterwarnings =


[mypy]
show_error_codes = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
check_untyped_defs = True
Expand Down

0 comments on commit 69494c7

Please sign in to comment.