Skip to content

Commit

Permalink
Support MINID and LIMIT on XADD (#1548)
Browse files Browse the repository at this point in the history
* MINID and LIMIT
  • Loading branch information
AvitalFineRedis authored Aug 29, 2021
1 parent 7c77883 commit 295b547
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
22 changes: 19 additions & 3 deletions redis/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1640,24 +1640,40 @@ def xack(self, name, groupname, *ids):
return self.execute_command('XACK', name, groupname, *ids)

def xadd(self, name, fields, id='*', maxlen=None, approximate=True,
nomkstream=False):
nomkstream=False, minid=None, limit=None):
"""
Add to a stream.
name: name of the stream
fields: dict of field/value pairs to insert into the stream
id: Location to insert this record. By default it is appended.
maxlen: truncate old stream members beyond this size
maxlen: truncate old stream members beyond this size.
Can't be specify with minid.
minid: the minimum id in the stream to query.
Can't be specify with maxlen.
approximate: actual stream length may be slightly more than maxlen
nomkstream: When set to true, do not make a stream
limit: specifies the maximum number of entries to retrieve
"""
pieces = []
if maxlen is not None and minid is not None:
raise DataError("Only one of ```maxlen``` or ```minid```",
"may be specified")

if maxlen is not None:
if not isinstance(maxlen, int) or maxlen < 1:
raise DataError('XADD maxlen must be a positive integer')
pieces.append(b'MAXLEN')
if approximate:
pieces.append(b'~')
pieces.append(str(maxlen))
if minid is not None:
pieces.append(b'MINID')
if approximate:
pieces.append(b'~')
pieces.append(minid)
if limit is not None:
pieces.append(b"LIMIT")
pieces.append(limit)
if nomkstream:
pieces.append(b'NOMKSTREAM')
pieces.append(id)
Expand Down Expand Up @@ -2002,7 +2018,7 @@ def xtrim(self, name, maxlen=None, approximate=True, minid=None,
name: name of the stream.
maxlen: truncate old stream messages beyond this size
approximate: actual stream length may be slightly more than maxlen
minin: the minimum id in the stream to query
minid: the minimum id in the stream to query
limit: specifies the maximum number of entries to retrieve
"""
pieces = []
Expand Down
50 changes: 50 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,56 @@ def test_xadd_nomkstream(self, r):
r.xadd(stream, {'some': 'other'}, nomkstream=True)
assert r.xlen(stream) == 3

@skip_if_server_version_lt('6.2.0')
def test_xadd_minlen_and_limit(self, r):
stream = 'stream'

r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})

# Future self: No limits without approximate, according to the api
with pytest.raises(redis.ResponseError):
assert r.xadd(stream, {'foo': 'bar'}, maxlen=3,
approximate=False, limit=2)

# limit can not be provided without maxlen or minid
with pytest.raises(redis.ResponseError):
assert r.xadd(stream, {'foo': 'bar'}, limit=2)

# maxlen with a limit
assert r.xadd(stream, {'foo': 'bar'}, maxlen=3,
approximate=True, limit=2)
r.delete(stream)

# maxlen and minid can not be provided together
with pytest.raises(redis.DataError):
assert r.xadd(stream, {'foo': 'bar'}, maxlen=3,
minid="sometestvalue")

# minid with a limit
m1 = r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
assert r.xadd(stream, {'foo': 'bar'}, approximate=True,
minid=m1, limit=3)

# pure minid
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
m4 = r.xadd(stream, {'foo': 'bar'})
assert r.xadd(stream, {'foo': 'bar'}, approximate=False, minid=m4)

# minid approximate
r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
m3 = r.xadd(stream, {'foo': 'bar'})
r.xadd(stream, {'foo': 'bar'})
assert r.xadd(stream, {'foo': 'bar'}, approximate=True, minid=m3)

@skip_if_server_version_lt('6.2.0')
def test_xautoclaim(self, r):
stream = 'stream'
Expand Down

0 comments on commit 295b547

Please sign in to comment.