Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
fix pubsub Receiver missing iter() method (fixes #203)
Browse files Browse the repository at this point in the history
  • Loading branch information
popravich committed Mar 27, 2017
1 parent b4f54fc commit 3b779a8
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 7 deletions.
34 changes: 27 additions & 7 deletions aioredis/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ def iter(self, *, encoding=None, decoder=None):
>>> async for msg in ch.iter():
... print(msg)
"""
return _ChannelIter(self, encoding=encoding,
decoder=decoder)
return _IterHelper(self,
is_active=lambda ch: ch.is_active,
encoding=encoding,
decoder=decoder)

@asyncio.coroutine
def wait_message(self):
Expand Down Expand Up @@ -156,12 +158,13 @@ def close(self):


if PY_35:
class _ChannelIter:
class _IterHelper:

__slots__ = ('_ch', '_args', '_kw')
__slots__ = ('_ch', '_is_active', '_args', '_kw')

def __init__(self, ch, *args, **kw):
def __init__(self, ch, is_active, *args, **kw):
self._ch = ch
self._is_active = is_active
self._args = args
self._kw = kw

Expand All @@ -171,7 +174,7 @@ def __aiter__(self):

@asyncio.coroutine
def __anext__(self):
if not self._ch.is_active:
if not self._is_active(self._ch):
raise StopAsyncIteration # noqa
msg = yield from self._ch.get(*self._args, **self._kw)
if msg is None:
Expand Down Expand Up @@ -272,16 +275,19 @@ def get(self, *, encoding=None, decoder=None):
"""Wait for and return pub/sub message from one of channels.
Return value is either:
* tuple of two elements: channel & message;
* tuple of three elements: pattern channel, (target channel & message);
* or None in case Receiver is stopped.
:raises aioredis.ChannelClosedError: If listener is stopped
and all messages have been received.
"""
assert decoder is None or callable(decoder), decoder
if not self.is_active:
if not self._running:
if not self._running: # inactive but running
raise ChannelClosedError()
return
ch, msg = yield from self._queue.get()
Expand Down Expand Up @@ -323,6 +329,20 @@ def stop(self):
"""
self._running = False

if PY_35:
def iter(self, *, encoding=None, decoder=None):
"""Returns async iterator.
Usage example:
>>> async for ch, msg in mpsc.iter():
... print(ch, msg)
"""
return _IterHelper(self,
is_active=lambda r: r.is_active or r._running,
encoding=encoding,
decoder=decoder)

# internal methods

def _put_nowait(self, data, *, sender):
Expand Down
35 changes: 35 additions & 0 deletions tests/py35_pubsub_receiver_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import pytest

from aioredis.pubsub import Receiver


@pytest.mark.run_loop
async def test_pubsub_receiver_iter(create_redis, server, loop):
sub = await create_redis(server.tcp_address, loop=loop)
pub = await create_redis(server.tcp_address, loop=loop)

mpsc = Receiver(loop=loop)

async def coro(mpsc):
lst = []
async for msg in mpsc.iter():
lst.append(msg)
return lst

tsk = asyncio.ensure_future(coro(mpsc), loop=loop)
snd1, = await sub.subscribe(mpsc.channel('chan:1'))
snd2, = await sub.subscribe(mpsc.channel('chan:2'))
snd3, = await sub.psubscribe(mpsc.pattern('chan:*'))

await pub.publish_json('chan:1', {'Hello': 'World'})
await pub.publish_json('chan:2', ['message'])
mpsc.stop()
await asyncio.sleep(0, loop=loop)
assert await tsk == [
(snd1, b'{"Hello": "World"}'),
(snd3, (b'chan:1', b'{"Hello": "World"}')),
(snd2, b'["message"]'),
(snd3, (b'chan:2', b'["message"]')),
]
assert not mpsc.is_active

0 comments on commit 3b779a8

Please sign in to comment.