From 1c47319fcd1317cfd8a278888186eee16441f1f3 Mon Sep 17 00:00:00 2001 From: Davide Rizzo Date: Thu, 3 Jan 2019 17:28:01 +0100 Subject: [PATCH 1/4] use trio.BrokenResourceError --- trio_amqp/protocol.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/trio_amqp/protocol.py b/trio_amqp/protocol.py index 7070bf9..6bd624f 100644 --- a/trio_amqp/protocol.py +++ b/trio_amqp/protocol.py @@ -224,7 +224,7 @@ async def _writer_loop(self, task_status=trio.TASK_STATUS_IGNORED): f = frame.get_frame(encoder) try: await self._stream.send_all(f) - except (trio.BrokenStreamError,trio.ClosedStreamError): + except trio.BrokenResourceError: # raise exceptions.AmqpClosedConnection(self) from None # the reader will raise the error also return @@ -258,7 +258,7 @@ async def aclose(self, no_wait=False): encoder.write_short(0) try: await self._write_frame(frame, encoder) - except trio.ClosedStreamError: + except trio.BrokenResourceError: pass except Exception: logger.exception("Error while closing") @@ -423,7 +423,7 @@ async def get_frame(self): frame = amqp_frame.AmqpResponse(self._stream) try: await frame.read_frame() - except trio.BrokenStreamError: + except trio.BrokenResourceError: raise exceptions.AmqpClosedConnection(self) from None return frame @@ -511,7 +511,7 @@ async def _reader_loop(self, task_status=trio.TASK_STATUS_IGNORED): with trio.fail_after(timeout): try: frame = await self.get_frame() - except trio.ClosedStreamError: + except trio.BrokenResourceError: # the stream is now *really* closed … return try: From 46cd7403758c9e9402c67e7b15c158156cbcf420 Mon Sep 17 00:00:00 2001 From: Davide Rizzo Date: Thu, 3 Jan 2019 17:31:33 +0100 Subject: [PATCH 2/4] require trio >= 0.9.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index df338bc..43a6e94 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ 'pytest-trio >= 0.3', ], install_requires=[ - 'trio', + 'trio >= 0.9.0', ], packages=[ 'trio_amqp', From 3eece4dbd62f626ea31c73e357f28341ef84b75e Mon Sep 17 00:00:00 2001 From: Davide Rizzo Date: Thu, 3 Jan 2019 17:28:26 +0100 Subject: [PATCH 3/4] use trio memory channel instead of Queue --- trio_amqp/channel.py | 35 ++++++++++++++++++++--------------- trio_amqp/protocol.py | 6 +++--- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/trio_amqp/channel.py b/trio_amqp/channel.py index faca23c..81261c1 100644 --- a/trio_amqp/channel.py +++ b/trio_amqp/channel.py @@ -32,9 +32,9 @@ def __init__(self, channel, consumer_tag, **kwargs): async def _data(self, channel, msg, env, prop): if msg is None: - await self._q.put(None) + await self._chan_send.send(None) else: - await self._q.put((msg, env, prop)) + await self._chan_send.send((msg, env, prop)) if sys.version_info >= (3,5,3): def __aiter__(self): @@ -44,14 +44,15 @@ async def __aiter__(self): return self async def __anext__(self): - res = await self._q.get() + res = await self._chan_receive.receive() if res is None: raise StopAsyncIteration return res async def __aenter__(self): await self.channel.basic_consume(self._data, consumer_tag=self.consumer_tag, **self.kwargs) - self._q = trio.Queue(30) # TODO: 2 + possible prefetch + # TODO: 2 + possible prefetch + self._chan_send, self._chan_receive = trio.open_memory_channel(30) return self async def __aexit__(self, *tb): @@ -60,7 +61,8 @@ async def __aexit__(self, *tb): await self.channel.basic_cancel(self.consumer_tag) except AmqpClosedConnection: pass - del self._q + del self._chan_send + del self._chan_receive # these messages are not acknowledged, thus deleting the queue will # not lose them @@ -75,7 +77,6 @@ def __iter__(self): class Channel: - _q = None # for returned messages def __init__(self, protocol, channel_id): self.protocol = protocol @@ -97,9 +98,13 @@ def __init__(self, protocol, channel_id): self._futures = {} self._ctag_events = {} + self._chan_send = None + self._chan_receive = None + def __aiter__(self): - if self._q is None: - self._q = trio.Queue(30) # TODO: 2 + possible prefetch + if self._chan_send is None: + # TODO: 2 + possible prefetch + self._chan_send, self._chan_receive = trio.open_memory_channel(30) return self if sys.version_info < (3,5,3): @@ -108,7 +113,7 @@ async def __aiter__(self): return self._aiter() async def __anext__(self): - res = await self._q.get() + res = await self._chan_receive.receive() if res is None: raise StopAsyncIteration return res @@ -149,8 +154,8 @@ def connection_closed(self, server_code=None, server_reason=None, exception=None self.protocol.release_channel_id(self.channel_id) self.close_event.set() - if self._q is not None: - self._q.put_nowait(None) + if self._chan_send is not None: + self._chan_send.send_nowait(None) async def dispatch_frame(self, frame): methods = { @@ -271,8 +276,8 @@ async def close(self, reply_code=0, reply_text="Normal Shutdown"): if not self.is_open: raise exceptions.ChannelClosed("channel already closed or closing") self.close_event.set() - if self._q is not None: - self._q.put_nowait(None) + if self._chan_send is not None: + self._chan_send.send_nowait(None) frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) frame.declare_method(amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE) request = amqp_frame.AmqpEncoder() @@ -946,11 +951,11 @@ async def basic_return(self, frame): envelope = ReturnEnvelope(reply_code, reply_text, exchange_name, routing_key) properties = content_header_frame.properties - if self._q is None: + if self._chan_send is None: # they have set mandatory bit, but havent added a callback logger.warning("You don't iterate the channel for returned messages!") else: - await self._q.put((body, envelope, properties)) + await self._chan_send.send((body, envelope, properties)) async def basic_get(self, queue_name='', no_ack=False): frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) diff --git a/trio_amqp/protocol.py b/trio_amqp/protocol.py index 6bd624f..72a3715 100644 --- a/trio_amqp/protocol.py +++ b/trio_amqp/protocol.py @@ -202,7 +202,7 @@ async def _drain(self): async def _write_frame(self, frame, encoder, drain=True): # Doesn't actually write frame, pushes it for _writer_loop task to # pick it up. - await self._send_queue.put((frame, encoder)) + await self._send_send_channel.send((frame, encoder)) @trio.hazmat.enable_ki_protection async def _writer_loop(self, task_status=trio.TASK_STATUS_IGNORED): @@ -216,7 +216,7 @@ async def _writer_loop(self, task_status=trio.TASK_STATUS_IGNORED): timeout = inf with trio.move_on_after(timeout) as timeout_scope: - frame, encoder = await self._send_queue.get() + frame, encoder = await self._send_receive_channel.receive() if timeout_scope.cancelled_caught: await self.send_heartbeat() continue @@ -315,7 +315,7 @@ async def __aenter__(self): self.server_channel_max = None self.channels_ids_ceil = 0 self.channels_ids_free = set() - self._send_queue = trio.Queue(1) + self._send_send_channel, self._send_receive_channel = trio.open_memory_channel(1) if self._ssl: if self._ssl is True: From 900009bc1dd5154da25d487ea04e3235a79f48dd Mon Sep 17 00:00:00 2001 From: Davide Rizzo Date: Tue, 8 Jan 2019 16:37:52 +0100 Subject: [PATCH 4/4] catch trio.ClosedResourceError --- trio_amqp/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trio_amqp/protocol.py b/trio_amqp/protocol.py index 72a3715..f383ba8 100644 --- a/trio_amqp/protocol.py +++ b/trio_amqp/protocol.py @@ -224,7 +224,7 @@ async def _writer_loop(self, task_status=trio.TASK_STATUS_IGNORED): f = frame.get_frame(encoder) try: await self._stream.send_all(f) - except trio.BrokenResourceError: + except (trio.BrokenResourceError, trio.ClosedResourceError): # raise exceptions.AmqpClosedConnection(self) from None # the reader will raise the error also return @@ -511,7 +511,7 @@ async def _reader_loop(self, task_status=trio.TASK_STATUS_IGNORED): with trio.fail_after(timeout): try: frame = await self.get_frame() - except trio.BrokenResourceError: + except (trio.BrokenResourceError, trio.ClosedResourceError): # the stream is now *really* closed … return try: