diff --git a/CHANGELOG.md b/CHANGELOG.md index d7ee75323c..35df0bd7a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,8 @@ * Java: Added SCAN command ([#1751](https://github.com/aws/glide-for-redis/pull/1751)) * Python: Type migration for entries_read ([#1768](https://github.com/aws/glide-for-redis/pull/1768)) * Python: Added FUNCTION DUMP and FUNCTION RESTORE commands ([#1769](https://github.com/aws/glide-for-redis/pull/1769)) +* Python: Added FUNCTION STATS command ([#1794](https://github.com/aws/glide-for-redis/pull/1794)) +* Python: Added XINFO STREAM command ([#1816](https://github.com/aws/glide-for-redis/pull/1816)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) @@ -208,7 +210,6 @@ * Node: Added LINDEX command ([#999](https://github.com/aws/glide-for-redis/pull/999)) * Python, Node: Added ZPOPMAX command ([#996](https://github.com/aws/glide-for-redis/pull/996), [#1009](https://github.com/aws/glide-for-redis/pull/1009)) * Python: Added DBSIZE command ([#1040](https://github.com/aws/glide-for-redis/pull/1040)) -* Python: Added FUNCTION STATS command ([#1794](https://github.com/aws/glide-for-redis/pull/1794)) #### Features * Python, Node: Added support in Lua Scripts ([#775](https://github.com/aws/glide-for-redis/pull/775), [#860](https://github.com/aws/glide-for-redis/pull/860)) diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 345c132b14..95e0540141 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -35,6 +35,7 @@ pub(crate) enum ExpectedReturnType<'a> { GeoSearchReturnType, SimpleString, XAutoClaimReturnType, + XInfoStreamFullReturnType, } pub(crate) fn convert_to_expected_type( @@ -673,6 +674,237 @@ pub(crate) fn convert_to_expected_type( ) .into()), }, + // `XINFO STREAM` returns nested maps with different types of data + /* RESP2 response example + + 1) "length" + 2) (integer) 2 + ... + 13) "recorded-first-entry-id" + 14) "1719710679916-0" + 15) "entries" + 16) 1) 1) "1719710679916-0" + 2) 1) "foo" + 2) "bar" + 3) "foo" + 4) "bar2" + 5) "some" + 6) "value" + 2) 1) "1719710688676-0" + 2) 1) "foo" + 2) "bar2" + 17) "groups" + 18) 1) 1) "name" + 2) "mygroup" + ... + 9) "pel-count" + 10) (integer) 2 + 11) "pending" + 12) 1) 1) "1719710679916-0" + 2) "Alice" + 3) (integer) 1719710707260 + 4) (integer) 1 + 2) 1) "1719710688676-0" + 2) "Alice" + 3) (integer) 1719710718373 + 4) (integer) 1 + 13) "consumers" + 14) 1) 1) "name" + 2) "Alice" + ... + 7) "pel-count" + 8) (integer) 2 + 9) "pending" + 10) 1) 1) "1719710679916-0" + 2) (integer) 1719710707260 + 3) (integer) 1 + 2) 1) "1719710688676-0" + 2) (integer) 1719710718373 + 3) (integer) 1 + + RESP3 response example + + 1# "length" => (integer) 2 + ... + 8# "entries" => + 1) 1) "1719710679916-0" + 2) 1) "foo" + 2) "bar" + 3) "foo" + 4) "bar2" + 5) "some" + 6) "value" + 2) 1) "1719710688676-0" + 2) 1) "foo" + 2) "bar2" + 9# "groups" => + 1) 1# "name" => "mygroup" + ... + 6# "pending" => + 1) 1) "1719710679916-0" + 2) "Alice" + 3) (integer) 1719710707260 + 4) (integer) 1 + 2) 1) "1719710688676-0" + 2) "Alice" + 3) (integer) 1719710718373 + 4) (integer) 1 + 7# "consumers" => + 1) 1# "name" => "Alice" + ... + 5# "pending" => + 1) 1) "1719710679916-0" + 2) (integer) 1719710707260 + 3) (integer) 1 + 2) 1) "1719710688676-0" + 2) (integer) 1719710718373 + 3) (integer) 1 + + Another RESP3 example on an empty stream + + 1# "length" => (integer) 0 + 2# "radix-tree-keys" => (integer) 0 + 3# "radix-tree-nodes" => (integer) 1 + 4# "last-generated-id" => "0-1" + 5# "max-deleted-entry-id" => "0-1" + 6# "entries-added" => (integer) 1 + 7# "recorded-first-entry-id" => "0-0" + 8# "entries" => (empty array) + 9# "groups" => (empty array) + + We want to convert the RESP2 format to RESP3, so we need to: + - convert any consumer in the consumer array to a map, if there are any consumers + - convert any group in the group array to a map, if there are any groups + - convert the root of the response into a map + */ + ExpectedReturnType::XInfoStreamFullReturnType => match value { + Value::Map(_) => Ok(value), // Response is already in RESP3 format - no conversion needed + Value::Array(mut array) => { + // Response is in RESP2 format. We need to convert to RESP3 format. + let groups_key = Value::SimpleString("groups".into()); + let opt_groups_key_index = array + .iter() + .position( + |key| { + let res = convert_to_expected_type(key.clone(), Some(ExpectedReturnType::SimpleString)); + match res { + Ok(converted_key) => { + converted_key == groups_key + }, + Err(_) => { + false + } + } + } + ); + + let Some(groups_key_index) = opt_groups_key_index else { + return Err((ErrorKind::TypeError, "No groups key found").into()); + }; + + let groups_value_index = groups_key_index + 1; + if array.get(groups_value_index).is_none() { + return Err((ErrorKind::TypeError, "No groups value found.").into()); + } + + let Value::Array(groups) = array[groups_value_index].clone() else { + return Err((ErrorKind::TypeError, "Incorrect value type received. Wanted an Array.").into()); + }; + + if groups.is_empty() { + let converted_response = convert_to_expected_type(Value::Array(array), Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &None, + }))?; + + let Value::Map(map) = converted_response else { + return Err((ErrorKind::TypeError, "Incorrect value type received. Wanted a Map.").into()); + }; + + return Ok(Value::Map(map)); + } + + let mut groups_as_maps = Vec::new(); + for group_value in &groups { + let Value::Array(mut group) = group_value.clone() else { + return Err((ErrorKind::TypeError, "Incorrect value type received for group value. Wanted an Array").into()); + }; + + let consumers_key = Value::SimpleString("consumers".into()); + let opt_consumers_key_index = group + .iter() + .position( + |key| { + let res = convert_to_expected_type(key.clone(), Some(ExpectedReturnType::SimpleString)); + match res { + Ok(converted_key) => { + converted_key == consumers_key + }, + Err(_) => { + false + } + } + } + ); + + let Some(consumers_key_index) = opt_consumers_key_index else { + return Err((ErrorKind::TypeError, "No consumers key found").into()); + }; + + let consumers_value_index = consumers_key_index + 1; + if group.get(consumers_value_index).is_none() { + return Err((ErrorKind::TypeError, "No consumers value found.").into()); + } + + let Value::Array(ref consumers) = group[consumers_value_index] else { + return Err((ErrorKind::TypeError, "Incorrect value type received for consumers. Wanted an Array.").into()); + }; + + if consumers.is_empty() { + groups_as_maps.push( + convert_to_expected_type(Value::Array(group.clone()), Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &None, + }))? + ); + continue; + } + + let mut consumers_as_maps = Vec::new(); + for consumer in consumers { + consumers_as_maps.push(convert_to_expected_type(consumer.clone(), Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &None, + }))?); + } + + group[consumers_value_index] = Value::Array(consumers_as_maps); + let group_map = convert_to_expected_type(Value::Array(group), Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &None, + }))?; + groups_as_maps.push(group_map); + } + + array[groups_value_index] = Value::Array(groups_as_maps); + let converted_response = convert_to_expected_type(Value::Array(array.to_vec()), Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &None, + }))?; + + let Value::Map(map) = converted_response else { + return Err((ErrorKind::TypeError, "Incorrect value type received for response. Wanted a Map.").into()); + }; + + Ok(Value::Map(map)) + } + _ => Err(( + ErrorKind::TypeError, + "Response couldn't be converted to XInfoStreamFullReturnType", + format!("(response was {:?})", get_value_type(&value)), + ) + .into()), + } } } @@ -1023,6 +1255,16 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { None } } + b"XINFO STREAM" => { + if cmd.position(b"FULL").is_some() { + Some(ExpectedReturnType::XInfoStreamFullReturnType) + } else { + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &None, + }) + } + } _ => None, } } @@ -1052,6 +1294,174 @@ pub(crate) fn get_value_type<'a>(value: &Value) -> &'a str { mod tests { use super::*; + #[test] + fn xinfo_stream_expected_return_type() { + assert!(matches!( + expected_type_for_cmd(redis::cmd("XINFO").arg("STREAM").arg("key")), + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &None + }) + )); + + assert!(matches!( + expected_type_for_cmd(redis::cmd("XINFO").arg("STREAM").arg("key").arg("FULL")), + Some(ExpectedReturnType::XInfoStreamFullReturnType) + )); + } + + #[test] + fn convert_xinfo_stream() { + // Only a partial response is represented here for brevity - the rest of the response follows the same format. + let groups_resp2_response = Value::Array(vec![ + Value::BulkString("length".to_string().into_bytes()), + Value::Int(2), + Value::BulkString("entries".to_string().into_bytes()), + Value::Array(vec![Value::Array(vec![ + Value::BulkString("1-0".to_string().into_bytes()), + Value::Array(vec![ + Value::BulkString("a".to_string().into_bytes()), + Value::BulkString("b".to_string().into_bytes()), + Value::BulkString("c".to_string().into_bytes()), + Value::BulkString("d".to_string().into_bytes()), + ]), + ])]), + Value::BulkString("groups".to_string().into_bytes()), + Value::Array(vec![ + Value::Array(vec![ + Value::BulkString("name".to_string().into_bytes()), + Value::BulkString("group1".to_string().into_bytes()), + Value::BulkString("consumers".to_string().into_bytes()), + Value::Array(vec![ + Value::Array(vec![ + Value::BulkString("name".to_string().into_bytes()), + Value::BulkString("consumer1".to_string().into_bytes()), + Value::BulkString("pending".to_string().into_bytes()), + Value::Array(vec![Value::Array(vec![ + Value::BulkString("1-0".to_string().into_bytes()), + Value::Int(1), + ])]), + ]), + Value::Array(vec![ + Value::BulkString("pending".to_string().into_bytes()), + Value::Array(vec![]), + ]), + ]), + ]), + Value::Array(vec![ + Value::BulkString("consumers".to_string().into_bytes()), + Value::Array(vec![]), + ]), + ]), + ]); + + let groups_resp3_response = Value::Map(vec![ + ( + Value::BulkString("length".to_string().into_bytes()), + Value::Int(2), + ), + ( + Value::BulkString("entries".to_string().into_bytes()), + Value::Array(vec![Value::Array(vec![ + Value::BulkString("1-0".to_string().into_bytes()), + Value::Array(vec![ + Value::BulkString("a".to_string().into_bytes()), + Value::BulkString("b".to_string().into_bytes()), + Value::BulkString("c".to_string().into_bytes()), + Value::BulkString("d".to_string().into_bytes()), + ]), + ])]), + ), + ( + Value::BulkString("groups".to_string().into_bytes()), + Value::Array(vec![ + Value::Map(vec![ + ( + Value::BulkString("name".to_string().into_bytes()), + Value::BulkString("group1".to_string().into_bytes()), + ), + ( + Value::BulkString("consumers".to_string().into_bytes()), + Value::Array(vec![ + Value::Map(vec![ + ( + Value::BulkString("name".to_string().into_bytes()), + Value::BulkString("consumer1".to_string().into_bytes()), + ), + ( + Value::BulkString("pending".to_string().into_bytes()), + Value::Array(vec![Value::Array(vec![ + Value::BulkString("1-0".to_string().into_bytes()), + Value::Int(1), + ])]), + ), + ]), + Value::Map(vec![( + Value::BulkString("pending".to_string().into_bytes()), + Value::Array(vec![]), + )]), + ]), + ), + ]), + Value::Map(vec![( + Value::BulkString("consumers".to_string().into_bytes()), + Value::Array(vec![]), + )]), + ]), + ), + ]); + + // We want the RESP2 response to be converted into RESP3 format. + assert_eq!( + convert_to_expected_type( + groups_resp2_response.clone(), + Some(ExpectedReturnType::XInfoStreamFullReturnType) + ) + .unwrap(), + groups_resp3_response.clone() + ); + + // RESP3 responses are already in the correct format and should not change format. + assert_eq!( + convert_to_expected_type( + groups_resp3_response.clone(), + Some(ExpectedReturnType::XInfoStreamFullReturnType) + ) + .unwrap(), + groups_resp3_response.clone() + ); + + let resp2_empty_groups = Value::Array(vec![ + Value::BulkString("groups".to_string().into_bytes()), + Value::Array(vec![]), + ]); + + let resp3_empty_groups = Value::Map(vec![( + Value::BulkString("groups".to_string().into_bytes()), + Value::Array(vec![]), + )]); + + // We want the RESP2 response to be converted into RESP3 format. + assert_eq!( + convert_to_expected_type( + resp2_empty_groups.clone(), + Some(ExpectedReturnType::XInfoStreamFullReturnType) + ) + .unwrap(), + resp3_empty_groups.clone() + ); + + // RESP3 responses are already in the correct format and should not change format. + assert_eq!( + convert_to_expected_type( + resp3_empty_groups.clone(), + Some(ExpectedReturnType::XInfoStreamFullReturnType) + ) + .unwrap(), + resp3_empty_groups.clone() + ); + } + #[test] fn xinfo_groups_xinfo_consumers_expected_return_type() { assert!(matches!( diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index 146872cc79..3648469f3f 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -244,6 +244,7 @@ enum RequestType { XAutoClaim = 203; XInfoGroups = 204; XInfoConsumers = 205; + XInfoStream = 207; Scan = 206; Wait = 208; XClaim = 209; diff --git a/glide-core/src/request_type.rs b/glide-core/src/request_type.rs index 8ea86eb9c3..960bcbb8ff 100644 --- a/glide-core/src/request_type.rs +++ b/glide-core/src/request_type.rs @@ -214,6 +214,7 @@ pub enum RequestType { XAutoClaim = 203, XInfoGroups = 204, XInfoConsumers = 205, + XInfoStream = 207, Scan = 206, Wait = 208, XClaim = 209, @@ -434,6 +435,7 @@ impl From<::protobuf::EnumOrUnknown> for RequestType { ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim, ProtobufRequestType::XInfoGroups => RequestType::XInfoGroups, ProtobufRequestType::XInfoConsumers => RequestType::XInfoConsumers, + ProtobufRequestType::XInfoStream => RequestType::XInfoStream, ProtobufRequestType::Wait => RequestType::Wait, ProtobufRequestType::XClaim => RequestType::XClaim, ProtobufRequestType::Scan => RequestType::Scan, @@ -652,6 +654,7 @@ impl RequestType { RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")), RequestType::XInfoGroups => Some(get_two_word_command("XINFO", "GROUPS")), RequestType::XInfoConsumers => Some(get_two_word_command("XINFO", "CONSUMERS")), + RequestType::XInfoStream => Some(get_two_word_command("XINFO", "STREAM")), RequestType::Wait => Some(cmd("WAIT")), RequestType::XClaim => Some(cmd("XCLAIM")), RequestType::Scan => Some(cmd("SCAN")), diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index e357bc2302..454c487e71 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -55,7 +55,13 @@ StreamTrimOptions, _create_xpending_range_args, ) -from glide.constants import TOK, TEncodable, TResult +from glide.constants import ( + TOK, + TEncodable, + TResult, + TXInfoStreamFullResponse, + TXInfoStreamResponse, +) from glide.protobuf.redis_request_pb2 import RequestType from glide.routes import Route @@ -3545,6 +3551,146 @@ async def xinfo_consumers( await self._execute_command(RequestType.XInfoConsumers, [key, group_name]), ) + async def xinfo_stream( + self, + key: TEncodable, + ) -> TXInfoStreamResponse: + """ + Returns information about the stream stored at `key`. To get more detailed information, use `xinfo_stream_full`. + + See https://valkey.io/commands/xinfo-stream for more details. + + Args: + key (TEncodable): The key of the stream. + + Returns: + TXInfoStreamResponse: A mapping of stream information for the given `key`. See the example for a sample + response. + + Examples: + >>> await client.xinfo_stream("my_stream") + { + b"length": 4, + b"radix-tree-keys": 1L, + b"radix-tree-nodes": 2L, + b"last-generated-id": b"1719877599564-0", + b"max-deleted-entry-id": b"0-0", # This field was added in Redis version 7.0.0. + b"entries-added": 4L, # This field was added in Redis version 7.0.0. + b"recorded-first-entry-id": b"1719710679916-0", # This field was added in Redis version 7.0.0. + b"groups": 1L, + b"first-entry": [ + b"1719710679916-0", + [b"foo1", b"bar1", b"foo2", b"bar2"], + ], + b"last-entry": [ + b"1719877599564-0", + [b"field1", b"value1"], + ], + } + # Stream information for "my_stream". Note that "first-entry" and "last-entry" could both be `None` if + # the stream is empty. + """ + return cast( + TXInfoStreamResponse, + await self._execute_command(RequestType.XInfoStream, [key]), + ) + + async def xinfo_stream_full( + self, + key: TEncodable, + count: Optional[int] = None, + ) -> TXInfoStreamFullResponse: + """ + Returns verbose information about the stream stored at `key`. + + See https://valkey.io/commands/xinfo-stream for more details. + + Args: + key (TEncodable): The key of the stream. + count (Optional[int]): The number of stream and PEL entries that are returned. A value of `0` means that all + entries will be returned. If not provided, defaults to `10`. + + Returns: + TXInfoStreamFullResponse: A mapping of detailed stream information for the given `key`. See the example for + a sample response. + + Examples: + >>> await client.xinfo_stream_full("my_stream") + { + b"length": 4, + b"radix-tree-keys": 1L, + b"radix-tree-nodes": 2L, + b"last-generated-id": b"1719877599564-0", + b"max-deleted-entry-id": b"0-0", # This field was added in Redis version 7.0.0. + b"entries-added": 4L, # This field was added in Redis version 7.0.0. + b"recorded-first-entry-id": b"1719710679916-0", # This field was added in Redis version 7.0.0. + b"entries": [ + [ + b"1719710679916-0", + [b"foo1", b"bar1", b"foo2", b"bar2"], + ], + [ + b"1719877599564-0": + [b"field1", b"value1"], + ] + ], + b"groups": [ + { + b"name": b"mygroup", + b"last-delivered-id": b"1719710688676-0", + b"entries-read": 2, # This field was added in Redis version 7.0.0. + b"lag": 0, # This field was added in Redis version 7.0.0. + b"pel-count": 2, + b"pending": [ + [ + b"1719710679916-0", + b"Alice", + 1719710707260, + 1, + ], + [ + b"1719710688676-0", + b"Alice", + 1719710718373, + 1, + ], + ], + b"consumers": [ + { + b"name": b"Alice", + b"seen-time": 1719710718373, + b"active-time": 1719710718373, # This field was added in Redis version 7.2.0. + b"pel-count": 2, + b"pending": [ + [ + b"1719710679916-0", + 1719710707260, + 1 + ], + [ + b"1719710688676-0", + 1719710718373, + 1 + ] + ] + } + ] + } + ] + } + # Detailed stream information for "my_stream". + + Since: Redis version 6.0.0. + """ + args = [key, "FULL"] + if count is not None: + args.extend(["COUNT", str(count)]) + + return cast( + TXInfoStreamFullResponse, + await self._execute_command(RequestType.XInfoStream, args), + ) + async def geoadd( self, key: TEncodable, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index ccf25b0759..0db7dfe6ad 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -2591,6 +2591,47 @@ def xinfo_consumers( """ return self.append_command(RequestType.XInfoConsumers, [key, group_name]) + def xinfo_stream( + self: TTransaction, + key: TEncodable, + ) -> TTransaction: + """ + Returns information about the stream stored at `key`. To get more detailed information, use `xinfo_stream_full`. + + See https://valkey.io/commands/xinfo-stream for more details. + + Args: + key (TEncodable): The key of the stream. + + Command response: + TXInfoStreamResponse: A mapping of stream information for the given `key`. + """ + return self.append_command(RequestType.XInfoStream, [key]) + + def xinfo_stream_full( + self: TTransaction, + key: TEncodable, + count: Optional[int] = None, + ) -> TTransaction: + """ + Returns verbose information about the stream stored at `key`. + + See https://valkey.io/commands/xinfo-stream for more details. + + Args: + key (TEncodable): The key of the stream. + count (Optional[int]): The number of stream and PEL entries that are returned. A value of `0` means that all + entries will be returned. If not provided, defaults to `10`. + + Command response: + TXInfoStreamFullResponse: A mapping of detailed stream information for the given `key`. + """ + args = [key, "FULL"] + if count is not None: + args.extend(["COUNT", str(count)]) + + return self.append_command(RequestType.XInfoStream, args) + def geoadd( self: TTransaction, key: TEncodable, diff --git a/python/python/glide/constants.py b/python/python/glide/constants.py index c1c9479c96..33a9c7830a 100644 --- a/python/python/glide/constants.py +++ b/python/python/glide/constants.py @@ -51,3 +51,16 @@ ], ], ] + +TXInfoStreamResponse = Mapping[ + bytes, Union[bytes, int, Mapping[bytes, Optional[List[List[bytes]]]]] +] +TXInfoStreamFullResponse = Mapping[ + bytes, + Union[ + bytes, + int, + Mapping[bytes, List[List[bytes]]], + List[Mapping[bytes, Union[bytes, int, List[List[Union[bytes, int]]]]]], + ], +] diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 00ea71358c..3e2a892be8 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -6814,6 +6814,123 @@ async def test_xinfo_groups_xinfo_consumers_edge_cases_and_failures( with pytest.raises(RequestError): await glide_client.xinfo_consumers(string_key, group_name) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xinfo_stream( + self, glide_client: TGlideClient, cluster_mode, protocol + ): + key = get_random_string(10) + group_name = get_random_string(10) + consumer = get_random_string(10) + stream_id0_0 = "0-0" + stream_id1_0 = "1-0" + stream_id1_1 = "1-1" + + # setup: add stream entry, create consumer group and consumer, read from stream with consumer + assert ( + await glide_client.xadd( + key, [("a", "b"), ("c", "d")], StreamAddOptions(stream_id1_0) + ) + == stream_id1_0.encode() + ) + assert await glide_client.xgroup_create(key, group_name, stream_id0_0) == OK + assert await glide_client.xreadgroup({key: ">"}, group_name, consumer) == { + key.encode(): {stream_id1_0.encode(): [[b"a", b"b"], [b"c", b"d"]]} + } + + result = await glide_client.xinfo_stream(key) + assert result.get(b"length") == 1 + expected_first_entry = [stream_id1_0.encode(), [b"a", b"b", b"c", b"d"]] + assert result.get(b"first-entry") == expected_first_entry + + # only one entry exists, so first and last entry should be the same + assert result.get(b"last-entry") == expected_first_entry + + # call XINFO STREAM with a byte string arg + result2 = await glide_client.xinfo_stream(key.encode()) + assert result2 == result + + # add one more entry + assert ( + await glide_client.xadd( + key, [("foo", "bar")], StreamAddOptions(stream_id1_1) + ) + == stream_id1_1.encode() + ) + + result_full = await glide_client.xinfo_stream_full(key, count=1) + print(result_full) + assert result_full.get(b"length") == 2 + entries = cast(list, result_full.get(b"entries")) + # only the first entry will be returned since we passed count=1 + assert len(entries) == 1 + assert entries[0] == expected_first_entry + + groups = cast(list, result_full.get(b"groups")) + assert len(groups) == 1 + group_info = groups[0] + assert group_info.get(b"name") == group_name.encode() + pending = group_info.get(b"pending") + assert len(pending) == 1 + assert stream_id1_0.encode() in pending[0] + + consumers = group_info.get(b"consumers") + assert len(consumers) == 1 + consumer_info = consumers[0] + assert consumer_info.get(b"name") == consumer.encode() + consumer_pending = consumer_info.get(b"pending") + assert len(consumer_pending) == 1 + assert stream_id1_0.encode() in consumer_pending[0] + + # call XINFO STREAM FULL with byte arg + result_full2 = await glide_client.xinfo_stream_full(key.encode()) + # 2 entries should be returned, since we didn't pass the COUNT arg this time + assert len(cast(list, result_full2.get(b"entries"))) == 2 + + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xinfo_stream_edge_cases_and_failures( + self, glide_client: TGlideClient, cluster_mode, protocol + ): + key = get_random_string(10) + string_key = get_random_string(10) + non_existing_key = get_random_string(10) + stream_id1_0 = "1-0" + + # setup: create empty stream + assert ( + await glide_client.xadd( + key, [("field", "value")], StreamAddOptions(stream_id1_0) + ) + == stream_id1_0.encode() + ) + assert await glide_client.xdel(key, [stream_id1_0]) == 1 + + # XINFO STREAM called against empty stream + result = await glide_client.xinfo_stream(key) + assert result.get(b"length") == 0 + assert result.get(b"first-entry") is None + assert result.get(b"last-entry") is None + + # XINFO STREAM FULL called against empty stream. Negative count values are ignored. + result_full = await glide_client.xinfo_stream_full(key, count=-3) + assert result_full.get(b"length") == 0 + assert result_full.get(b"entries") == [] + assert result_full.get(b"groups") == [] + + # calling XINFO STREAM with a non-existing key raises an error + with pytest.raises(RequestError): + await glide_client.xinfo_stream(non_existing_key) + with pytest.raises(RequestError): + await glide_client.xinfo_stream_full(non_existing_key) + + # key exists, but it is not a stream + assert await glide_client.set(string_key, "foo") + with pytest.raises(RequestError): + await glide_client.xinfo_stream(string_key) + with pytest.raises(RequestError): + await glide_client.xinfo_stream_full(string_key) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_xgroup_set_id( diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 457dc9f893..14e6f096c4 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -1043,6 +1043,35 @@ async def test_transaction_object_commands( finally: await glide_client.config_set({maxmemory_policy_key: maxmemory_policy}) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_transaction_xinfo_stream( + self, glide_client: TGlideClient, cluster_mode: bool, protocol + ): + key = get_random_string(10) + stream_id1_0 = "1-0" + transaction = ClusterTransaction() if cluster_mode else Transaction() + transaction.xadd(key, [("foo", "bar")], StreamAddOptions(stream_id1_0)) + transaction.xinfo_stream(key) + transaction.xinfo_stream_full(key) + + response = await glide_client.exec(transaction) + assert response is not None + # transaction.xadd(key, [("foo", "bar")], StreamAddOptions(stream_id1_0)) + assert response[0] == stream_id1_0.encode() + # transaction.xinfo_stream(key) + info = cast(dict, response[1]) + assert info.get(b"length") == 1 + assert info.get(b"groups") == 0 + assert info.get(b"first-entry") == [stream_id1_0.encode(), [b"foo", b"bar"]] + assert info.get(b"first-entry") == info.get(b"last-entry") + + # transaction.xinfo_stream_full(key) + info_full = cast(dict, response[2]) + assert info_full.get(b"length") == 1 + assert info_full.get(b"entries") == [[stream_id1_0.encode(), [b"foo", b"bar"]]] + assert info_full.get(b"groups") == [] + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_transaction_lastsave(