Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add idle to xpending #1523

Merged
merged 7 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2724,7 +2724,7 @@ def xpending(self, name, groupname):
return self.execute_command('XPENDING', name, groupname)

def xpending_range(self, name, groupname, min, max, count,
consumername=None):
consumername=None, idle=None):
"""
Returns information about pending messages, in a range.
name: name of the stream.
Expand All @@ -2733,21 +2733,35 @@ def xpending_range(self, name, groupname, min, max, count,
max: maximum stream ID.
count: number of messages to return
consumername: name of a consumer to filter by (optional).
idle: available from version 6.2. filter entries by their
idle-time, given in milliseconds (optional).
"""
if {min, max, count} == {None}:
if idle is not None or consumername is not None:
raise DataError("if XPENDING is provided with idle time"
" or consumername, it must be provided"
" with min, max and count parameters")
return self.xpending(name, groupname)

pieces = [name, groupname]
if min is not None or max is not None or count is not None:
if min is None or max is None or count is None:
raise DataError("XPENDING must be provided with min, max "
"and count parameters, or none of them. ")
if not isinstance(count, int) or count < -1:
raise DataError("XPENDING count must be a integer >= -1")
pieces.extend((min, max, str(count)))
if consumername is not None:
if min is None or max is None or count is None:
raise DataError("if XPENDING is provided with consumername,"
" it must be provided with min, max and"
" count parameters")
pieces.append(consumername)
if min is None or max is None or count is None:
raise DataError("XPENDING must be provided with min, max "
"and count parameters, or none of them.")
# idle
try:
if int(idle) < 0:
raise DataError("XPENDING idle must be a integer >= 0")
pieces.extend(['IDLE', idle])
except TypeError:
AvitalFineRedis marked this conversation as resolved.
Show resolved Hide resolved
pass
# count
try:
if int(count) < 0:
raise DataError("XPENDING count must be a integer >= 0")
pieces.extend([min, max, count])
except TypeError:
pass

return self.execute_command('XPENDING', *pieces, parse_detail=True)

def xrange(self, name, min='-', max='+', count=None):
Expand Down
43 changes: 43 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,49 @@ def test_xpending_range(self, r):
assert response[1]['message_id'] == m2
assert response[1]['consumer'] == consumer2.encode()

@skip_if_server_version_lt('6.2.0')
def test_xpending_range_idle(self, r):
stream = 'stream'
group = 'group'
consumer1 = 'consumer1'
consumer2 = 'consumer2'
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
r.xgroup_create(stream, group, 0)

# read 1 message from the group with each consumer
r.xreadgroup(group, consumer1, streams={stream: '>'}, count=1)
r.xreadgroup(group, consumer2, streams={stream: '>'}, count=1)

response = r.xpending_range(stream, group,
min='-', max='+', count=5)
assert len(response) == 2
response = r.xpending_range(stream, group,
min='-', max='+', count=5, idle=1000)
AvitalFineRedis marked this conversation as resolved.
Show resolved Hide resolved
assert len(response) == 0

def test_xpending_range_negative(self, r):
stream = 'stream'
group = 'group'
with pytest.raises(redis.DataError):
r.xpending_range(stream, group, min='-', max='+', count=None)
with pytest.raises(ValueError):
r.xpending_range(stream, group, min='-', max='+', count="one")
with pytest.raises(redis.DataError):
r.xpending_range(stream, group, min='-', max='+', count=-1)
with pytest.raises(ValueError):
r.xpending_range(stream, group, min='-', max='+', count=5,
idle="one")
with pytest.raises(redis.DataError):
r.xpending_range(stream, group, min='-', max='+', count=5,
idle=-1)
with pytest.raises(redis.DataError):
r.xpending_range(stream, group, min=None, max=None, count=None,
idle=0)
with pytest.raises(redis.DataError):
r.xpending_range(stream, group, min=None, max=None, count=None,
consumername=0)

@skip_if_server_version_lt('5.0.0')
def test_xrange(self, r):
stream = 'stream'
Expand Down