Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
Support MINID and LIMIT on XADD (redis/redis-py#1548)

Signed-off-by: Andrew-Chen-Wang <acwangpython@gmail.com>
  • Loading branch information
Andrew-Chen-Wang committed Oct 8, 2021
1 parent a83c83e commit 3ba0232
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
24 changes: 22 additions & 2 deletions aioredis/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1935,24 +1935,40 @@ def xadd(
maxlen: Optional[int] = None,
approximate: bool = True,
nomkstream: bool = False,
minid: Optional[StreamIdT] = None,
limit: Optional[int] = None,
) -> Awaitable:
"""
Add to a stream.
name: name of the stream
fields: dict of field/value pairs to insert into the stream
id: Location to insert this record. By default it is appended.
maxlen: truncate old stream members beyond this size
maxlen: truncate old stream members beyond this size.
Can't be specify with minid.
minid: the minimum id in the stream to query.
Can't be specify with maxlen.
approximate: actual stream length may be slightly more than maxlen
nomkstream: When set to true, do not make a stream
limit: specifies the maximum number of entries to retrieve
"""
pieces: List[EncodableT] = []
if maxlen is not None and minid is not None:
raise DataError("Only one of ```maxlen``` or ```minid``` may be specified")
if maxlen is not None:
if not isinstance(maxlen, int) or maxlen < 1:
raise DataError("XADD maxlen must be a positive integer")
pieces.append(b"MAXLEN")
if approximate:
pieces.append(b"~")
pieces.append(str(maxlen))
if minid is not None:
pieces.append(b'MINID')
if approximate:
pieces.append(b'~')
pieces.append(minid)
if limit is not None:
pieces.append(b"LIMIT")
pieces.append(limit)
if nomkstream:
pieces.append(b"NOMKSTREAM")
pieces.append(id)
Expand Down Expand Up @@ -2375,8 +2391,12 @@ def xtrim(
"""
Trims old messages from a stream.
name: name of the stream.
maxlen: truncate old stream messages beyond this size
maxlen: truncate old stream members beyond this size.
Can't be specified with minid.
minid: the minimum id in the stream to query.
Can't be specified with maxlen.
approximate: actual stream length may be slightly more than maxlen
limit: specifies the maximum number of entries to retrieve
"""
pieces: List[EncodableT] = [b"MAXLEN"]
if maxlen is not None and minid is not None:
Expand Down
50 changes: 50 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,56 @@ async def test_xadd_nomkstream(self, r: aioredis.Redis):
await r.xadd(stream, {"some": "other"}, nomkstream=True)
assert await r.xlen(stream) == 3

@skip_if_server_version_lt('6.2.0')
async def test_xadd_minlen_and_limit(self, r: aioredis.Redis):
stream = 'stream'

await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})

# Future self: No limits without approximate, according to the api
with pytest.raises(aioredis.ResponseError):
assert await r.xadd(stream, {'foo': 'bar'}, maxlen=3,
approximate=False, limit=2)

# limit can not be provided without maxlen or minid
with pytest.raises(aioredis.ResponseError):
assert await r.xadd(stream, {'foo': 'bar'}, limit=2)

# maxlen with a limit
assert await r.xadd(stream, {'foo': 'bar'}, maxlen=3,
approximate=True, limit=2)
await r.delete(stream)

# maxlen and minid can not be provided together
with pytest.raises(aioredis.DataError):
assert await r.xadd(stream, {'foo': 'bar'}, maxlen=3,
minid="sometestvalue")

# minid with a limit
m1 = await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
assert await r.xadd(stream, {'foo': 'bar'}, approximate=True,
minid=m1, limit=3)

# pure minid
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
m4 = await r.xadd(stream, {'foo': 'bar'})
assert await r.xadd(stream, {'foo': 'bar'}, approximate=False, minid=m4)

# minid approximate
await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
m3 = await r.xadd(stream, {'foo': 'bar'})
await r.xadd(stream, {'foo': 'bar'})
assert await r.xadd(stream, {'foo': 'bar'}, approximate=True, minid=m3)

@skip_if_server_version_lt("6.2.0")
async def test_xautoclaim(self, r: aioredis.Redis):
stream = "stream"
Expand Down

0 comments on commit 3ba0232

Please sign in to comment.