Skip to content

Commit

Permalink
fix(stream): Fix XPENDING serialization issue (#2566)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleks Lozovyuk <aleks.raiden@gmail.com>
Co-authored-by: Twice <twice.mliu@gmail.com>
  • Loading branch information
3 people authored Oct 4, 2024
1 parent 8365087 commit 290012e
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 12 deletions.
30 changes: 21 additions & 9 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -896,15 +896,27 @@ class CommandXPending : public Commander {

static Status SendResults([[maybe_unused]] Connection *conn, std::string *output,
StreamGetPendingEntryResult &results) {
output->append(redis::MultiLen(3 + results.consumer_infos.size()));
output->append(redis::Integer(results.pending_number));
output->append(redis::BulkString(results.first_entry_id.ToString()));
output->append(redis::BulkString(results.last_entry_id.ToString()));
output->append(redis::MultiLen(results.consumer_infos.size()));
for (const auto &entry : results.consumer_infos) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(entry.first));
output->append(redis::BulkString(std::to_string(entry.second)));
// NOTE: In the case that our stream has no pending elements, Redis will
// return NilString for the first and last entry IDs, and a nil array
// for the consumer infos. Make this a special case to maintain consistency
// with Redis.
if (results.pending_number == 0) {
output->append(redis::MultiLen(4));
output->append(redis::Integer(0));
output->append(conn->NilString());
output->append(conn->NilString());
output->append(conn->NilArray());
} else {
output->append(redis::MultiLen(4));
output->append(redis::Integer(results.pending_number));
output->append(redis::BulkString(results.first_entry_id.ToString()));
output->append(redis::BulkString(results.last_entry_id.ToString()));
output->append(redis::MultiLen(results.consumer_infos.size()));
for (const auto &entry : results.consumer_infos) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(entry.first));
output->append(redis::BulkString(std::to_string(entry.second)));
}
}

return Status::OK();
Expand Down
79 changes: 76 additions & 3 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2086,16 +2086,19 @@ func TestStreamOffset(t *testing.T) {
t.Run("XPending with different kinds of commands", func(t *testing.T) {
streamName := "mystream"
groupName := "mygroup"

require.NoError(t, rdb.Del(ctx, streamName).Err())
r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
require.NoError(t, err)
require.Equal(t, int64(0), r)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"field1", "data1"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())

consumerName := "myconsumer"
err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Expand All @@ -2113,7 +2116,7 @@ func TestStreamOffset(t *testing.T) {
Count: 1,
Lower: "1-0",
Higher: "1-0",
Consumers: map[string]int64{"myconsumer": 1},
Consumers: map[string]int64{consumerName: 1},
}, r1)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Expand Down Expand Up @@ -2143,7 +2146,7 @@ func TestStreamOffset(t *testing.T) {
Count: 3,
Lower: "1-0",
Higher: "2-2",
Consumers: map[string]int64{"myconsumer": 3},
Consumers: map[string]int64{consumerName: 3},
}, r1)

require.NoError(t, rdb.XAck(ctx, streamName, groupName, "2-0").Err())
Expand All @@ -2155,9 +2158,79 @@ func TestStreamOffset(t *testing.T) {
Count: 2,
Lower: "1-0",
Higher: "2-2",
Consumers: map[string]int64{"myconsumer": 2},
Consumers: map[string]int64{consumerName: 2},
}, r1)

// Add a second consumer and check that XPENDING still works
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "3-0",
Values: []string{"field1", "data1"},
}).Err())

consumerName2 := "myconsumer2"
err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName2,
Streams: []string{streamName, ">"},
Count: 1,
NoAck: false,
}).Err()
require.NoError(t, err)

r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
require.NoError(t, err1)

require.Equal(t, &redis.XPending{
Count: 3,
Lower: "1-0",
Higher: "3-0",
Consumers: map[string]int64{consumerName: 2, consumerName2: 1},
}, r1)
})

t.Run("XPENDING on a consumer group with no pending messages", func(t *testing.T) {
streamName := "stream"
groupName := "group"
consumerName := "consumer"
messageID := "1-0"

// Remove any existing data
require.NoError(t, rdb.Del(ctx, streamName).Err())
r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
require.NoError(t, err)
require.Equal(t, int64(0), r)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: messageID,
Values: []string{"key", "value"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())

// Have the consumer claim the message with ID [messageID]
err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
NoAck: false,
}).Err()
require.NoError(t, err)

// Acknowledge the message, so no messages remain pending
require.NoError(t, rdb.XAck(ctx, streamName, groupName, messageID).Err())

// Check that XPENDING sets the min and max to nil, matching Redis' behavior
pending, err := rdb.XPending(ctx, streamName, groupName).Result()
require.NoError(t, err)
require.Equal(t, &redis.XPending{
Count: 0,
Lower: "",
Higher: "",
Consumers: map[string]int64{},
}, pending)
})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down

0 comments on commit 290012e

Please sign in to comment.