Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
add idle to xpending (redis/redis-py#1523)

Signed-off-by: Andrew-Chen-Wang <acwangpython@gmail.com>
  • Loading branch information
Andrew-Chen-Wang committed Oct 5, 2021
1 parent 961da8c commit af9f1c7
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 16 deletions.
47 changes: 31 additions & 16 deletions aioredis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3012,6 +3012,7 @@ def xpending_range(
max: StreamIdT,
count: int,
consumername: Optional[ConsumerT] = None,
idle: Optional[bool] = None,
) -> Awaitable:
"""
Returns information about pending messages, in a range.
Expand All @@ -3021,25 +3022,39 @@ def xpending_range(
max: maximum stream ID.
count: number of messages to return
consumername: name of a consumer to filter by (optional).
idle: available from version 6.2. Filter entries by their
idle-time, given in milliseconds (optional).
"""
pieces: List[EncodableT] = [name, groupname]
if min is not None or max is not None or count is not None:
if min is None or max is None or count is None:
raise DataError(
"XPENDING must be provided with min, max "
"and count parameters, or none of them. "
)
if not isinstance(count, int) or count < -1:
raise DataError("XPENDING count must be a integer >= -1")
pieces.extend((min, max, str(count)))
if consumername is not None:
if min is None or max is None or count is None:

if {min, max, count} == {None}:
if idle is not None or consumername is not None:
raise DataError(
"if XPENDING is provided with consumername,"
" it must be provided with min, max and"
" count parameters"
"if XPENDING is provided with idle time"
" or consumername, it must be provided"
" with min, max and count parameters"
)
pieces.append(consumername)
return self.xpending(name, groupname)

pieces: List[EncodableT] = [name, groupname]
if min is None or max is None or count is None:
raise DataError(
"XPENDING must be provided with min, max "
"and count parameters, or none of them."
)
# idle
try:
if int(idle) < 0:
raise DataError("XPENDING idle must be a integer >= 0")
pieces.extend(['IDLE', idle])
except TypeError:
pass
# count
try:
if int(count) < 0:
raise DataError("XPENDING count must be a integer >= 0")
pieces.extend([min, max, count])
except TypeError:
pass
return self.execute_command("XPENDING", *pieces, parse_detail=True)

def xrange(
Expand Down
47 changes: 47 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2857,6 +2857,53 @@ async def test_xpending_range(self, r: aioredis.Redis):
assert response[1]["message_id"] == m2
assert response[1]["consumer"] == consumer2.encode()

@skip_if_server_version_lt('6.2.0')
async def test_xpending_range_idle(self, r: aioredis.Redis):
stream = 'stream'
group = 'group'
consumer1 = 'consumer1'
consumer2 = 'consumer2'
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
await r.xgroup_create(stream, group, 0)

# read 1 message from the group with each consumer
await r.xreadgroup(group, consumer1, streams={stream: '>'}, count=1)
await r.xreadgroup(group, consumer2, streams={stream: '>'}, count=1)

response = await r.xpending_range(stream, group,
min='-', max='+', count=5)
assert len(response) == 2
response = await r.xpending_range(stream, group,
min='-', max='+', count=5, idle=1000)
assert len(response) == 0

@skip_if_server_version_lt('6.2.0')
def test_xpending_range_negative(self, r: aioredis.Redis):
stream = 'stream'
group = 'group'
with pytest.raises(aioredis.DataError):
await r.xpending_range(stream, group, min='-', max='+', count=None)
with pytest.raises(ValueError):
await r.xpending_range(stream, group, min='-', max='+', count="one")
with pytest.raises(aioredis.DataError):
await r.xpending_range(stream, group, min='-', max='+', count=-1)
with pytest.raises(ValueError):
await r.xpending_range(stream, group, min='-', max='+', count=5,
idle="one")
with pytest.raises(aioredis.exceptions.ResponseError):
await r.xpending_range(stream, group, min='-', max='+', count=5,
idle=1.5)
with pytest.raises(aioredis.DataError):
await r.xpending_range(stream, group, min='-', max='+', count=5,
idle=-1)
with pytest.raises(aioredis.DataError):
await r.xpending_range(stream, group, min=None, max=None, count=None,
idle=0)
with pytest.raises(aioredis.DataError):
await r.xpending_range(stream, group, min=None, max=None, count=None,
consumername=0)

@skip_if_server_version_lt("5.0.0")
async def test_xrange(self, r: aioredis.Redis):
stream = "stream"
Expand Down

0 comments on commit af9f1c7

Please sign in to comment.