diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 48fe7928c45..b03dc378ba9 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -250,7 +250,7 @@ StreamConsumerMetadata Stream::decodeStreamConsumerMetadataValue(const std::stri return consumer_metadata; } -StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) { +StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const { InternalKey ikey(key, storage_->IsSlotIdEncoded()); Slice subkey = ikey.GetSubKey(); const size_t entry_id_size = sizeof(StreamEntryID); @@ -618,7 +618,9 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const StreamLenOptions &op } for (; iter->Valid(); options.to_first ? iter->Prev() : iter->Next()) { - *size += 1; + if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamEntry) { + *size += 1; + } } return rocksdb::Status::OK(); @@ -674,6 +676,9 @@ rocksdb::Status Stream::range(const std::string &ns_key, const StreamMetadata &m for (; iter->Valid() && (options.reverse ? iter->key().ToString() >= end_key : iter->key().ToString() <= end_key); options.reverse ? iter->Prev() : iter->Next()) { + if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry) { + continue; + } if (options.exclude_start && iter->key().ToString() == start_key) { continue; } diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h index 8ae5a14dbad..8f6367a7196 100644 --- a/src/types/redis_stream.h +++ b/src/types/redis_stream.h @@ -82,7 +82,7 @@ class Stream : public SubKeyScanner { std::string consumerNameFromInternalKey(rocksdb::Slice key) const; static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata); static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const std::string &value); - StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key); + StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const; }; } // namespace redis diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 7dee10b6b3e..7bbce02b6c2 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -985,6 +985,21 @@ func TestStreamOffset(t *testing.T) { r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val() require.Equal(t, consumer3, r1[0].Name) }) + + t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { + streamName := "test-stream" + group := "group" + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"data1", "b"}, + }).Err()) + require.NoError(t, rdb.XGroupCreate(ctx, streamName, group, "0").Err()) + require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group, "consumer").Err()) + require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{ + Streams: []string{streamName, "0"}, + }).Err()) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {