From 1f7120889cb43b3e679fef5ed59d142024349338 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Mon, 12 Aug 2024 15:13:44 -0700 Subject: [PATCH 1/5] Add `XINFO GROUPS` command. Signed-off-by: Yury-Fridlyand --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 50 ++++++++-- node/src/Commands.ts | 5 + node/src/Transaction.ts | 15 +++ node/tests/SharedTests.ts | 191 ++++++++++++++++++++++++++++++++++++ node/tests/TestUtilities.ts | 2 + 6 files changed, 257 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cca05bfb2..4224ef4c2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122)) * Node: Added XAUTOCLAIM command ([#2108](https://github.com/valkey-io/valkey-glide/pull/2108)) * Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085)) * Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index d7b5af1a7d..ea65cd9bec 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -21,22 +21,18 @@ import { BitOffsetMultiplier, // eslint-disable-line @typescript-eslint/no-unused-vars BitOffsetOptions, BitmapIndexType, - BitwiseOperation, - CoordOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars + BitwiseOperation, // eslint-disable-line @typescript-eslint/no-unused-vars ExpireOptions, GeoAddOptions, - GeoBoxShape, // eslint-disable-line @typescript-eslint/no-unused-vars - GeoCircleShape, // eslint-disable-line @typescript-eslint/no-unused-vars GeoSearchResultOptions, GeoSearchShape, GeoSearchStoreResultOptions, GeoUnit, GeospatialData, InsertPosition, - KeyWeight, // eslint-disable-line @typescript-eslint/no-unused-vars + KeyWeight, LPosOptions, - ListDirection, - MemberOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars + ListDirection, // eslint-disable-line @typescript-eslint/no-unused-vars RangeByIndex, RangeByLex, RangeByScore, @@ -170,6 +166,7 @@ import { createXGroupDelConsumer, createXGroupDestroy, createXInfoConsumers, + createXInfoGroups, createXInfoStream, createXLen, createXPending, @@ -4118,6 +4115,45 @@ export class BaseClient { return this.createWritePromise(createXInfoConsumers(key, group)); } + /** + * Returns the list of all consumer groups and their attributes for the stream stored at `key`. + * + * See https://valkey.io/commands/xinfo-groups/ for more details. + * + * @param key - The key of the stream. + * @returns An `Array` of `Records`, where each mapping represents the + * attributes of a consumer group for the stream at `key`. + * @example + * ```typescript + *
{@code
+     * const result = await client.xinfoGroups("my_stream");
+     * console.log(result); // Output:
+     * // [
+     * //     {
+     * //         "name": "mygroup",
+     * //         "consumers": 2,
+     * //         "pending": 2,
+     * //         "last-delivered-id": "1638126030001-0",
+     * //         "entries-read": 2,                       // Added in version 7.0.0
+     * //         "lag": 0                                 // Added in version 7.0.0
+     * //     },
+     * //     {
+     * //         "name": "some-other-group",
+     * //         "consumers": 1,
+     * //         "pending": 0,
+     * //         "last-delivered-id": "0-0",
+     * //         "entries-read": null,                    // Added in version 7.0.0
+     * //         "lag": 1                                 // Added in version 7.0.0
+     * //     }
+     * // ]
+     * ```
+     */
+    public async xinfoGroups(
+        key: string,
+    ): Promise[]> {
+        return this.createWritePromise(createXInfoGroups(key));
+    }
+
     /**
      * Changes the ownership of a pending message.
      *
diff --git a/node/src/Commands.ts b/node/src/Commands.ts
index c5c8578c21..2fbb10b249 100644
--- a/node/src/Commands.ts
+++ b/node/src/Commands.ts
@@ -2431,6 +2431,11 @@ export function createXInfoStream(
     return createCommand(RequestType.XInfoStream, args);
 }
 
+/** @internal */
+export function createXInfoGroups(key: string): command_request.Command {
+    return createCommand(RequestType.XInfoGroups, [key]);
+}
+
 /**
  * @internal
  */
diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts
index c044f9eeff..923af0abc6 100644
--- a/node/src/Transaction.ts
+++ b/node/src/Transaction.ts
@@ -206,6 +206,7 @@ import {
     createXGroupDelConsumer,
     createXGroupDestroy,
     createXInfoConsumers,
+    createXInfoGroups,
     createXInfoStream,
     createXLen,
     createXPending,
@@ -2275,6 +2276,20 @@ export class BaseTransaction> {
         return this.addAndReturn(createXInfoStream(key, fullOptions ?? false));
     }
 
+    /**
+     * Returns the list of all consumer groups and their attributes for the stream stored at `key`.
+     *
+     * See https://valkey.io/commands/xinfo-groups/ for more details.
+     *
+     * @param key - The key of the stream.
+     *
+     * Command Response -  An `Array` of `Records`, where each mapping represents the
+     *     attributes of a consumer group for the stream at `key`.
+     */
+    public xinfoGroups(key: string): T {
+        return this.addAndReturn(createXInfoGroups(key));
+    }
+
     /** Returns the server time.
      * See https://valkey.io/commands/time/ for details.
      *
diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts
index ab7ae4fef0..e78521b608 100644
--- a/node/tests/SharedTests.ts
+++ b/node/tests/SharedTests.ts
@@ -7706,6 +7706,197 @@ export function runBaseTests(config: {
         config.timeout,
     );
 
+    it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
+        `xinfogroups xinfo groups %p`,
+        async (protocol) => {
+            await runTest(async (client: BaseClient, cluster) => {
+                const key = uuidv4();
+                const stringKey = uuidv4();
+                const groupName1 = uuidv4();
+                const consumer1 = uuidv4();
+                const streamId1 = "0-1";
+                const streamId2 = "0-2";
+                const streamId3 = "0-3";
+
+                expect(
+                    await client.xgroupCreate(key, groupName1, "0-0", {
+                        mkStream: true,
+                    }),
+                ).toEqual("OK");
+
+                // one empty group exists
+                expect(await client.xinfoGroups(key)).toEqual(
+                    cluster.checkIfServerVersionLessThan("7.0.0")
+                        ? [
+                              {
+                                  name: groupName1,
+                                  consumers: 0,
+                                  pending: 0,
+                                  "last-delivered-id": "0-0",
+                              },
+                          ]
+                        : [
+                              {
+                                  name: groupName1,
+                                  consumers: 0,
+                                  pending: 0,
+                                  "last-delivered-id": "0-0",
+                                  "entries-read": null,
+                                  lag: 0,
+                              },
+                          ],
+                );
+
+                expect(
+                    await client.xadd(
+                        key,
+                        [
+                            ["entry1_field1", "entry1_value1"],
+                            ["entry1_field2", "entry1_value2"],
+                        ],
+                        { id: streamId1 },
+                    ),
+                ).toEqual(streamId1);
+
+                expect(
+                    await client.xadd(
+                        key,
+                        [
+                            ["entry2_field1", "entry2_value1"],
+                            ["entry2_field2", "entry2_value2"],
+                        ],
+                        { id: streamId2 },
+                    ),
+                ).toEqual(streamId2);
+
+                expect(
+                    await client.xadd(
+                        key,
+                        [["entry3_field1", "entry3_value1"]],
+                        { id: streamId3 },
+                    ),
+                ).toEqual(streamId3);
+
+                // same as previous check, bug lag = 3, there are 3 messages unread
+                expect(await client.xinfoGroups(key)).toEqual(
+                    cluster.checkIfServerVersionLessThan("7.0.0")
+                        ? [
+                              {
+                                  name: groupName1,
+                                  consumers: 0,
+                                  pending: 0,
+                                  "last-delivered-id": "0-0",
+                              },
+                          ]
+                        : [
+                              {
+                                  name: groupName1,
+                                  consumers: 0,
+                                  pending: 0,
+                                  "last-delivered-id": "0-0",
+                                  "entries-read": null,
+                                  lag: 3,
+                              },
+                          ],
+                );
+
+                expect(
+                    await client.customCommand([
+                        "XREADGROUP",
+                        "GROUP",
+                        groupName1,
+                        consumer1,
+                        "STREAMS",
+                        key,
+                        ">",
+                    ]),
+                ).toEqual({
+                    [key]: {
+                        [streamId1]: [
+                            ["entry1_field1", "entry1_value1"],
+                            ["entry1_field2", "entry1_value2"],
+                        ],
+                        [streamId2]: [
+                            ["entry2_field1", "entry2_value1"],
+                            ["entry2_field2", "entry2_value2"],
+                        ],
+                        [streamId3]: [["entry3_field1", "entry3_value1"]],
+                    },
+                });
+                // after reading, `lag` is reset, and `pending`, consumer count and last ID are set
+                expect(await client.xinfoGroups(key)).toEqual(
+                    cluster.checkIfServerVersionLessThan("7.0.0")
+                        ? [
+                              {
+                                  name: groupName1,
+                                  consumers: 1,
+                                  pending: 3,
+                                  "last-delivered-id": streamId3,
+                              },
+                          ]
+                        : [
+                              {
+                                  name: groupName1,
+                                  consumers: 1,
+                                  pending: 3,
+                                  "last-delivered-id": streamId3,
+                                  "entries-read": 3,
+                                  lag: 0,
+                              },
+                          ],
+                );
+
+                expect(
+                    await client.customCommand([
+                        "XACK",
+                        key,
+                        groupName1,
+                        streamId1,
+                    ]),
+                ).toEqual(1);
+                // once message ack'ed, pending counter decreased
+                expect(await client.xinfoGroups(key)).toEqual(
+                    cluster.checkIfServerVersionLessThan("7.0.0")
+                        ? [
+                              {
+                                  name: groupName1,
+                                  consumers: 1,
+                                  pending: 2,
+                                  "last-delivered-id": streamId3,
+                              },
+                          ]
+                        : [
+                              {
+                                  name: groupName1,
+                                  consumers: 1,
+                                  pending: 2,
+                                  "last-delivered-id": streamId3,
+                                  "entries-read": 3,
+                                  lag: 0,
+                              },
+                          ],
+                );
+
+                // key exists, but it is not a stream
+                expect(await client.set(stringKey, "foo")).toEqual("OK");
+                await expect(client.xinfoGroups(stringKey)).rejects.toThrow(
+                    RequestError,
+                );
+
+                // Passing a non-existing key raises an error
+                const key2 = uuidv4();
+                await expect(client.xinfoGroups(key2)).rejects.toThrow(
+                    RequestError,
+                );
+                // create a second stream
+                await client.xadd(key2, [["a", "b"]]);
+                // no group yet exists
+                expect(await client.xinfoGroups(key2)).toEqual([]);
+            }, protocol);
+        },
+        config.timeout,
+    );
+
     it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
         `xpending test_%p`,
         async (protocol) => {
diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts
index e081fb0ab8..5186389217 100644
--- a/node/tests/TestUtilities.ts
+++ b/node/tests/TestUtilities.ts
@@ -1021,6 +1021,8 @@ export async function transactionTest(
         'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }',
         1,
     ]);
+    baseTransaction.xinfoGroups(key9);
+    responseData.push(["xinfoGroups(key9)", []]);
     baseTransaction.xgroupCreate(key9, groupName1, "0-0");
     responseData.push(['xgroupCreate(key9, groupName1, "0-0")', "OK"]);
     baseTransaction.xgroupCreate(key9, groupName2, "0-0", { mkStream: true });

From 7c8f44e359e9e49222c9361321d832fba67a5887 Mon Sep 17 00:00:00 2001
From: Yury-Fridlyand 
Date: Mon, 12 Aug 2024 15:27:54 -0700
Subject: [PATCH 2/5] Signed-off-by: Yury-Fridlyand
 

---
 node/tests/SharedTests.ts | 1 -
 1 file changed, 1 deletion(-)

diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts
index e78521b608..c6bba86057 100644
--- a/node/tests/SharedTests.ts
+++ b/node/tests/SharedTests.ts
@@ -5076,7 +5076,6 @@ export function runBaseTests(config: {
                     "last-entry": (string | number | string[])[];
                     groups: number;
                 };
-                console.log(result);
 
                 // verify result:
                 expect(result.length).toEqual(1);

From ac5ed8e73ed4bfa15fc43dfd043d2716c6493b0c Mon Sep 17 00:00:00 2001
From: Yury-Fridlyand 
Date: Tue, 13 Aug 2024 13:14:28 -0700
Subject: [PATCH 3/5] Signed-off-by: Yury-Fridlyand
 

---
 node/src/BaseClient.ts | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts
index ea65cd9bec..0c83605650 100644
--- a/node/src/BaseClient.ts
+++ b/node/src/BaseClient.ts
@@ -21,9 +21,12 @@ import {
     BitOffsetMultiplier, // eslint-disable-line @typescript-eslint/no-unused-vars
     BitOffsetOptions,
     BitmapIndexType,
-    BitwiseOperation, // eslint-disable-line @typescript-eslint/no-unused-vars
+    BitwiseOperation,
+    CoordOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars
     ExpireOptions,
     GeoAddOptions,
+    GeoBoxShape, // eslint-disable-line @typescript-eslint/no-unused-vars
+    GeoCircleShape, // eslint-disable-line @typescript-eslint/no-unused-vars
     GeoSearchResultOptions,
     GeoSearchShape,
     GeoSearchStoreResultOptions,
@@ -32,7 +35,8 @@ import {
     InsertPosition,
     KeyWeight,
     LPosOptions,
-    ListDirection, // eslint-disable-line @typescript-eslint/no-unused-vars
+    ListDirection,
+    MemberOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars
     RangeByIndex,
     RangeByLex,
     RangeByScore,

From 9e60b8b93d7cc5050c6a9192eab6d680af38c26e Mon Sep 17 00:00:00 2001
From: Yury-Fridlyand 
Date: Wed, 14 Aug 2024 17:25:10 -0700
Subject: [PATCH 4/5] Apply suggestions from code review

Co-authored-by: Andrew Carbonetto 
Signed-off-by: Yury-Fridlyand 
---
 node/src/BaseClient.ts  | 2 +-
 node/src/Transaction.ts | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts
index 0c83605650..4c3f1becd3 100644
--- a/node/src/BaseClient.ts
+++ b/node/src/BaseClient.ts
@@ -4122,7 +4122,7 @@ export class BaseClient {
     /**
      * Returns the list of all consumer groups and their attributes for the stream stored at `key`.
      *
-     * See https://valkey.io/commands/xinfo-groups/ for more details.
+     * @see {@link https://valkey.io/commands/xinfo-groups/|valkey.io} for details.
      *
      * @param key - The key of the stream.
      * @returns An `Array` of `Records`, where each mapping represents the
diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts
index 923af0abc6..2fce0b7e3c 100644
--- a/node/src/Transaction.ts
+++ b/node/src/Transaction.ts
@@ -2279,7 +2279,7 @@ export class BaseTransaction> {
     /**
      * Returns the list of all consumer groups and their attributes for the stream stored at `key`.
      *
-     * See https://valkey.io/commands/xinfo-groups/ for more details.
+     * @see {@link https://valkey.io/commands/xinfo-groups/|valkey.io} for details.
      *
      * @param key - The key of the stream.
      *

From ebdbf77e4cf1b4c71e6bc01c27775d0f8a17738d Mon Sep 17 00:00:00 2001
From: Yury-Fridlyand 
Date: Fri, 16 Aug 2024 11:13:32 -0700
Subject: [PATCH 5/5] Update node/src/BaseClient.ts

Co-authored-by: Andrew Carbonetto 
Signed-off-by: Yury-Fridlyand 
---
 node/src/BaseClient.ts | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts
index fcc26c5440..9c78351a34 100644
--- a/node/src/BaseClient.ts
+++ b/node/src/BaseClient.ts
@@ -4412,7 +4412,7 @@ export class BaseClient {
      * @see {@link https://valkey.io/commands/xinfo-groups/|valkey.io} for details.
      *
      * @param key - The key of the stream.
-     * @returns An `Array` of `Records`, where each mapping represents the
+     * @returns An array of maps, where each mapping represents the
      *     attributes of a consumer group for the stream at `key`.
      * @example
      * ```typescript