Skip to content

Commit

Permalink
Python: add XINFO STREAM command (valkey-io#1816)
Browse files Browse the repository at this point in the history
* Grab value conversion from Java draft PR

* wip

* wip

* refactor value conversion wip

* Make value_conversion build

* Add value conversion test

* Verified test values

* Fix errors in test

* RESP3 tests passing

* Tests passing for RESP3, mypy passing

* Update CHANGELOG, minor doc fix

* Fix failing value conversion test

* Clean up value_conversion.rs a bit

* minor cleanup

* Modify test to cover failing scenario

* Python tests passing

* Fix test function signatures

* Fix failing pytest, cleanup value conversion comments

* PR suggestions

---------

Co-authored-by: Jonathan Louie <jonathanl@bitquilltech.com>
  • Loading branch information
2 people authored and cyip10 committed Jul 16, 2024
1 parent 055957a commit 85935a5
Show file tree
Hide file tree
Showing 9 changed files with 763 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
* Python: Added FUNCTION KILL command ([#1797](https://github.com/aws/glide-for-redis/pull/1797))
* Python: Type migration for entries_read ([#1768](https://github.com/aws/glide-for-redis/pull/1768))
* Python: Added FUNCTION DUMP and FUNCTION RESTORE commands ([#1769](https://github.com/aws/glide-for-redis/pull/1769))
* Python: Added FUNCTION STATS command ([#1794](https://github.com/aws/glide-for-redis/pull/1794))
* Python: Added XINFO STREAM command ([#1816](https://github.com/aws/glide-for-redis/pull/1816))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down Expand Up @@ -209,7 +211,6 @@
* Node: Added LINDEX command ([#999](https://github.com/aws/glide-for-redis/pull/999))
* Python, Node: Added ZPOPMAX command ([#996](https://github.com/aws/glide-for-redis/pull/996), [#1009](https://github.com/aws/glide-for-redis/pull/1009))
* Python: Added DBSIZE command ([#1040](https://github.com/aws/glide-for-redis/pull/1040))
* Python: Added FUNCTION STATS command ([#1794](https://github.com/aws/glide-for-redis/pull/1794))

#### Features
* Python, Node: Added support in Lua Scripts ([#775](https://github.com/aws/glide-for-redis/pull/775), [#860](https://github.com/aws/glide-for-redis/pull/860))
Expand Down
410 changes: 410 additions & 0 deletions glide-core/src/client/value_conversion.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ enum RequestType {
XAutoClaim = 203;
XInfoGroups = 204;
XInfoConsumers = 205;
XInfoStream = 207;
Scan = 206;
Wait = 208;
XClaim = 209;
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub enum RequestType {
XAutoClaim = 203,
XInfoGroups = 204,
XInfoConsumers = 205,
XInfoStream = 207,
Scan = 206,
Wait = 208,
XClaim = 209,
Expand Down Expand Up @@ -434,6 +435,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim,
ProtobufRequestType::XInfoGroups => RequestType::XInfoGroups,
ProtobufRequestType::XInfoConsumers => RequestType::XInfoConsumers,
ProtobufRequestType::XInfoStream => RequestType::XInfoStream,
ProtobufRequestType::Wait => RequestType::Wait,
ProtobufRequestType::XClaim => RequestType::XClaim,
ProtobufRequestType::Scan => RequestType::Scan,
Expand Down Expand Up @@ -652,6 +654,7 @@ impl RequestType {
RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")),
RequestType::XInfoGroups => Some(get_two_word_command("XINFO", "GROUPS")),
RequestType::XInfoConsumers => Some(get_two_word_command("XINFO", "CONSUMERS")),
RequestType::XInfoStream => Some(get_two_word_command("XINFO", "STREAM")),
RequestType::Wait => Some(cmd("WAIT")),
RequestType::XClaim => Some(cmd("XCLAIM")),
RequestType::Scan => Some(cmd("SCAN")),
Expand Down
148 changes: 147 additions & 1 deletion python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@
StreamTrimOptions,
_create_xpending_range_args,
)
from glide.constants import TOK, TEncodable, TResult
from glide.constants import (
TOK,
TEncodable,
TResult,
TXInfoStreamFullResponse,
TXInfoStreamResponse,
)
from glide.protobuf.redis_request_pb2 import RequestType
from glide.routes import Route

Expand Down Expand Up @@ -3545,6 +3551,146 @@ async def xinfo_consumers(
await self._execute_command(RequestType.XInfoConsumers, [key, group_name]),
)

async def xinfo_stream(
self,
key: TEncodable,
) -> TXInfoStreamResponse:
"""
Returns information about the stream stored at `key`. To get more detailed information, use `xinfo_stream_full`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
Returns:
TXInfoStreamResponse: A mapping of stream information for the given `key`. See the example for a sample
response.
Examples:
>>> await client.xinfo_stream("my_stream")
{
b"length": 4,
b"radix-tree-keys": 1L,
b"radix-tree-nodes": 2L,
b"last-generated-id": b"1719877599564-0",
b"max-deleted-entry-id": b"0-0", # This field was added in Redis version 7.0.0.
b"entries-added": 4L, # This field was added in Redis version 7.0.0.
b"recorded-first-entry-id": b"1719710679916-0", # This field was added in Redis version 7.0.0.
b"groups": 1L,
b"first-entry": [
b"1719710679916-0",
[b"foo1", b"bar1", b"foo2", b"bar2"],
],
b"last-entry": [
b"1719877599564-0",
[b"field1", b"value1"],
],
}
# Stream information for "my_stream". Note that "first-entry" and "last-entry" could both be `None` if
# the stream is empty.
"""
return cast(
TXInfoStreamResponse,
await self._execute_command(RequestType.XInfoStream, [key]),
)

async def xinfo_stream_full(
self,
key: TEncodable,
count: Optional[int] = None,
) -> TXInfoStreamFullResponse:
"""
Returns verbose information about the stream stored at `key`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
count (Optional[int]): The number of stream and PEL entries that are returned. A value of `0` means that all
entries will be returned. If not provided, defaults to `10`.
Returns:
TXInfoStreamFullResponse: A mapping of detailed stream information for the given `key`. See the example for
a sample response.
Examples:
>>> await client.xinfo_stream_full("my_stream")
{
b"length": 4,
b"radix-tree-keys": 1L,
b"radix-tree-nodes": 2L,
b"last-generated-id": b"1719877599564-0",
b"max-deleted-entry-id": b"0-0", # This field was added in Redis version 7.0.0.
b"entries-added": 4L, # This field was added in Redis version 7.0.0.
b"recorded-first-entry-id": b"1719710679916-0", # This field was added in Redis version 7.0.0.
b"entries": [
[
b"1719710679916-0",
[b"foo1", b"bar1", b"foo2", b"bar2"],
],
[
b"1719877599564-0":
[b"field1", b"value1"],
]
],
b"groups": [
{
b"name": b"mygroup",
b"last-delivered-id": b"1719710688676-0",
b"entries-read": 2, # This field was added in Redis version 7.0.0.
b"lag": 0, # This field was added in Redis version 7.0.0.
b"pel-count": 2,
b"pending": [
[
b"1719710679916-0",
b"Alice",
1719710707260,
1,
],
[
b"1719710688676-0",
b"Alice",
1719710718373,
1,
],
],
b"consumers": [
{
b"name": b"Alice",
b"seen-time": 1719710718373,
b"active-time": 1719710718373, # This field was added in Redis version 7.2.0.
b"pel-count": 2,
b"pending": [
[
b"1719710679916-0",
1719710707260,
1
],
[
b"1719710688676-0",
1719710718373,
1
]
]
}
]
}
]
}
# Detailed stream information for "my_stream".
Since: Redis version 6.0.0.
"""
args = [key, "FULL"]
if count is not None:
args.extend(["COUNT", str(count)])

return cast(
TXInfoStreamFullResponse,
await self._execute_command(RequestType.XInfoStream, args),
)

async def geoadd(
self,
key: TEncodable,
Expand Down
41 changes: 41 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2591,6 +2591,47 @@ def xinfo_consumers(
"""
return self.append_command(RequestType.XInfoConsumers, [key, group_name])

def xinfo_stream(
self: TTransaction,
key: TEncodable,
) -> TTransaction:
"""
Returns information about the stream stored at `key`. To get more detailed information, use `xinfo_stream_full`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
Command response:
TXInfoStreamResponse: A mapping of stream information for the given `key`.
"""
return self.append_command(RequestType.XInfoStream, [key])

def xinfo_stream_full(
self: TTransaction,
key: TEncodable,
count: Optional[int] = None,
) -> TTransaction:
"""
Returns verbose information about the stream stored at `key`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
count (Optional[int]): The number of stream and PEL entries that are returned. A value of `0` means that all
entries will be returned. If not provided, defaults to `10`.
Command response:
TXInfoStreamFullResponse: A mapping of detailed stream information for the given `key`.
"""
args = [key, "FULL"]
if count is not None:
args.extend(["COUNT", str(count)])

return self.append_command(RequestType.XInfoStream, args)

def geoadd(
self: TTransaction,
key: TEncodable,
Expand Down
13 changes: 13 additions & 0 deletions python/python/glide/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@
],
],
]

TXInfoStreamResponse = Mapping[
bytes, Union[bytes, int, Mapping[bytes, Optional[List[List[bytes]]]]]
]
TXInfoStreamFullResponse = Mapping[
bytes,
Union[
bytes,
int,
Mapping[bytes, List[List[bytes]]],
List[Mapping[bytes, Union[bytes, int, List[List[Union[bytes, int]]]]]],
],
]
117 changes: 117 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6816,6 +6816,123 @@ async def test_xinfo_groups_xinfo_consumers_edge_cases_and_failures(
with pytest.raises(RequestError):
await glide_client.xinfo_consumers(string_key, group_name)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xinfo_stream(
self, glide_client: TGlideClient, cluster_mode, protocol
):
key = get_random_string(10)
group_name = get_random_string(10)
consumer = get_random_string(10)
stream_id0_0 = "0-0"
stream_id1_0 = "1-0"
stream_id1_1 = "1-1"

# setup: add stream entry, create consumer group and consumer, read from stream with consumer
assert (
await glide_client.xadd(
key, [("a", "b"), ("c", "d")], StreamAddOptions(stream_id1_0)
)
== stream_id1_0.encode()
)
assert await glide_client.xgroup_create(key, group_name, stream_id0_0) == OK
assert await glide_client.xreadgroup({key: ">"}, group_name, consumer) == {
key.encode(): {stream_id1_0.encode(): [[b"a", b"b"], [b"c", b"d"]]}
}

result = await glide_client.xinfo_stream(key)
assert result.get(b"length") == 1
expected_first_entry = [stream_id1_0.encode(), [b"a", b"b", b"c", b"d"]]
assert result.get(b"first-entry") == expected_first_entry

# only one entry exists, so first and last entry should be the same
assert result.get(b"last-entry") == expected_first_entry

# call XINFO STREAM with a byte string arg
result2 = await glide_client.xinfo_stream(key.encode())
assert result2 == result

# add one more entry
assert (
await glide_client.xadd(
key, [("foo", "bar")], StreamAddOptions(stream_id1_1)
)
== stream_id1_1.encode()
)

result_full = await glide_client.xinfo_stream_full(key, count=1)
print(result_full)
assert result_full.get(b"length") == 2
entries = cast(list, result_full.get(b"entries"))
# only the first entry will be returned since we passed count=1
assert len(entries) == 1
assert entries[0] == expected_first_entry

groups = cast(list, result_full.get(b"groups"))
assert len(groups) == 1
group_info = groups[0]
assert group_info.get(b"name") == group_name.encode()
pending = group_info.get(b"pending")
assert len(pending) == 1
assert stream_id1_0.encode() in pending[0]

consumers = group_info.get(b"consumers")
assert len(consumers) == 1
consumer_info = consumers[0]
assert consumer_info.get(b"name") == consumer.encode()
consumer_pending = consumer_info.get(b"pending")
assert len(consumer_pending) == 1
assert stream_id1_0.encode() in consumer_pending[0]

# call XINFO STREAM FULL with byte arg
result_full2 = await glide_client.xinfo_stream_full(key.encode())
# 2 entries should be returned, since we didn't pass the COUNT arg this time
assert len(cast(list, result_full2.get(b"entries"))) == 2

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xinfo_stream_edge_cases_and_failures(
self, glide_client: TGlideClient, cluster_mode, protocol
):
key = get_random_string(10)
string_key = get_random_string(10)
non_existing_key = get_random_string(10)
stream_id1_0 = "1-0"

# setup: create empty stream
assert (
await glide_client.xadd(
key, [("field", "value")], StreamAddOptions(stream_id1_0)
)
== stream_id1_0.encode()
)
assert await glide_client.xdel(key, [stream_id1_0]) == 1

# XINFO STREAM called against empty stream
result = await glide_client.xinfo_stream(key)
assert result.get(b"length") == 0
assert result.get(b"first-entry") is None
assert result.get(b"last-entry") is None

# XINFO STREAM FULL called against empty stream. Negative count values are ignored.
result_full = await glide_client.xinfo_stream_full(key, count=-3)
assert result_full.get(b"length") == 0
assert result_full.get(b"entries") == []
assert result_full.get(b"groups") == []

# calling XINFO STREAM with a non-existing key raises an error
with pytest.raises(RequestError):
await glide_client.xinfo_stream(non_existing_key)
with pytest.raises(RequestError):
await glide_client.xinfo_stream_full(non_existing_key)

# key exists, but it is not a stream
assert await glide_client.set(string_key, "foo")
with pytest.raises(RequestError):
await glide_client.xinfo_stream(string_key)
with pytest.raises(RequestError):
await glide_client.xinfo_stream_full(string_key)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xgroup_set_id(
Expand Down
Loading

0 comments on commit 85935a5

Please sign in to comment.