diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index c9616db5e11..9a9c02a64c7 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -896,15 +896,27 @@ class CommandXPending : public Commander { static Status SendResults([[maybe_unused]] Connection *conn, std::string *output, StreamGetPendingEntryResult &results) { - output->append(redis::MultiLen(3 + results.consumer_infos.size())); - output->append(redis::Integer(results.pending_number)); - output->append(redis::BulkString(results.first_entry_id.ToString())); - output->append(redis::BulkString(results.last_entry_id.ToString())); - output->append(redis::MultiLen(results.consumer_infos.size())); - for (const auto &entry : results.consumer_infos) { - output->append(redis::MultiLen(2)); - output->append(redis::BulkString(entry.first)); - output->append(redis::BulkString(std::to_string(entry.second))); + // NOTE: In the case that our stream has no pending elements, Redis will + // return NilString for the first and last entry IDs, and a nil array + // for the consumer infos. Make this a special case to maintain consistency + // with Redis. + if (results.pending_number == 0) { + output->append(redis::MultiLen(4)); + output->append(redis::Integer(0)); + output->append(conn->NilString()); + output->append(conn->NilString()); + output->append(conn->NilArray()); + } else { + output->append(redis::MultiLen(4)); + output->append(redis::Integer(results.pending_number)); + output->append(redis::BulkString(results.first_entry_id.ToString())); + output->append(redis::BulkString(results.last_entry_id.ToString())); + output->append(redis::MultiLen(results.consumer_infos.size())); + for (const auto &entry : results.consumer_infos) { + output->append(redis::MultiLen(2)); + output->append(redis::BulkString(entry.first)); + output->append(redis::BulkString(std::to_string(entry.second))); + } } return Status::OK(); diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index d852bbcf0e2..2bae5c54963 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2086,16 +2086,19 @@ func TestStreamOffset(t *testing.T) { t.Run("XPending with different kinds of commands", func(t *testing.T) { streamName := "mystream" groupName := "mygroup" + require.NoError(t, rdb.Del(ctx, streamName).Err()) r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() require.NoError(t, err) require.Equal(t, int64(0), r) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ Stream: streamName, ID: "1-0", Values: []string{"field1", "data1"}, }).Err()) require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + consumerName := "myconsumer" err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: groupName, @@ -2113,7 +2116,7 @@ func TestStreamOffset(t *testing.T) { Count: 1, Lower: "1-0", Higher: "1-0", - Consumers: map[string]int64{"myconsumer": 1}, + Consumers: map[string]int64{consumerName: 1}, }, r1) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ @@ -2143,7 +2146,7 @@ func TestStreamOffset(t *testing.T) { Count: 3, Lower: "1-0", Higher: "2-2", - Consumers: map[string]int64{"myconsumer": 3}, + Consumers: map[string]int64{consumerName: 3}, }, r1) require.NoError(t, rdb.XAck(ctx, streamName, groupName, "2-0").Err()) @@ -2155,9 +2158,79 @@ func TestStreamOffset(t *testing.T) { Count: 2, Lower: "1-0", Higher: "2-2", - Consumers: map[string]int64{"myconsumer": 2}, + Consumers: map[string]int64{consumerName: 2}, + }, r1) + + // Add a second consumer and check that XPENDING still works + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "3-0", + Values: []string{"field1", "data1"}, + }).Err()) + + consumerName2 := "myconsumer2" + err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName2, + Streams: []string{streamName, ">"}, + Count: 1, + NoAck: false, + }).Err() + require.NoError(t, err) + + r1, err1 = rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err1) + + require.Equal(t, &redis.XPending{ + Count: 3, + Lower: "1-0", + Higher: "3-0", + Consumers: map[string]int64{consumerName: 2, consumerName2: 1}, }, r1) }) + + t.Run("XPENDING on a consumer group with no pending messages", func(t *testing.T) { + streamName := "stream" + groupName := "group" + consumerName := "consumer" + messageID := "1-0" + + // Remove any existing data + require.NoError(t, rdb.Del(ctx, streamName).Err()) + r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() + require.NoError(t, err) + require.Equal(t, int64(0), r) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: messageID, + Values: []string{"key", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + + // Have the consumer claim the message with ID [messageID] + err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 1, + NoAck: false, + }).Err() + require.NoError(t, err) + + // Acknowledge the message, so no messages remain pending + require.NoError(t, rdb.XAck(ctx, streamName, groupName, messageID).Err()) + + // Check that XPENDING sets the min and max to nil, matching Redis' behavior + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, &redis.XPending{ + Count: 0, + Lower: "", + Higher: "", + Consumers: map[string]int64{}, + }, pending) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {