diff --git a/CHANGELOG.md b/CHANGELOG.md index 817b975922..6ae18dab00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ * Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088)) * Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107)) * Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146)) +* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 450eb9f774..cd4f4dc66b 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -171,6 +171,7 @@ import { createUnlink, createWait, createWatch, + createXAck, createXAdd, createXAutoClaim, createXClaim, @@ -5165,6 +5166,36 @@ export class BaseClient { preferReplica: connection_request.ReadFrom.PreferReplica, }; + /** + * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream. + * This command should be called on a pending message so that such message does not get processed again. + * + * @see {@link https://valkey.io/commands/xack/|valkey.io} for details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param ids - An array of entry ids. + * @returns The number of messages that were successfully acknowledged. + * + * @example + * ```typescript + *
{@code
+     * const entryId = await client.xadd("mystream", ["myfield", "mydata"]);
+     * // read messages from streamId
+     * const readResult = await client.xreadgroup(["myfield", "mydata"], "mygroup", "my0consumer");
+     * // acknowledge messages on stream
+     * console.log(await client.xack("mystream", "mygroup", [entryId])); // Output: 1L
+     * 
+ * ``` + */ + public async xack( + key: string, + group: string, + ids: string[], + ): Promise { + return this.createWritePromise(createXAck(key, group, ids)); + } + /** Returns the element at index `index` in the list stored at `key`. * The index is zero-based, so 0 means the first element, 1 the second element and so on. * Negative indices can be used to designate elements starting at the tail of the list. diff --git a/node/src/Commands.ts b/node/src/Commands.ts index ba07f24a41..ff337a5bdf 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3929,3 +3929,14 @@ export function createGetEx( return createCommand(RequestType.GetEx, args); } + +/** + * @internal + */ +export function createXAck( + key: string, + group: string, + ids: string[], +): command_request.Command { + return createCommand(RequestType.XAck, [key, group, ...ids]); +} diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index df89635de6..49b58d9f04 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -207,6 +207,7 @@ import { createType, createUnlink, createWait, + createXAck, createXAdd, createXAutoClaim, createXClaim, @@ -2892,6 +2893,22 @@ export class BaseTransaction> { ); } + /** + * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream. + * This command should be called on a pending message so that such message does not get processed again. + * + * @see {@link https://valkey.io/commands/xack/|valkey.io} for details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param ids - An array of entry ids. + * + * Command Response - The number of messages that were successfully acknowledged. + */ + public xack(key: string, group: string, ids: string[]): T { + return this.addAndReturn(createXAck(key, group, ids)); + } + /** * Renames `key` to `newkey`. * If `newkey` already exists it is overwritten. diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 0443ccb837..035f5f34f0 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -9792,6 +9792,119 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xack test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = "{testKey}:1-" + uuidv4(); + const nonExistingKey = "{testKey}:2-" + uuidv4(); + const string_key = "{testKey}:3-" + uuidv4(); + const groupName = uuidv4(); + const consumerName = uuidv4(); + const stream_id0 = "0"; + const stream_id1_0 = "1-0"; + const stream_id1_1 = "1-1"; + const stream_id1_2 = "1-2"; + + // setup: add 2 entries to the stream, create consumer group and read to mark them as pending + expect( + await client.xadd(key, [["f0", "v0"]], { + id: stream_id1_0, + }), + ).toEqual(stream_id1_0); + expect( + await client.xadd(key, [["f1", "v1"]], { + id: stream_id1_1, + }), + ).toEqual(stream_id1_1); + expect( + await client.xgroupCreate(key, groupName, stream_id0), + ).toBe("OK"); + expect( + await client.xreadgroup(groupName, consumerName, { + [key]: ">", + }), + ).toEqual({ + [key]: { + [stream_id1_0]: [["f0", "v0"]], + [stream_id1_1]: [["f1", "v1"]], + }, + }); + + // add one more entry + expect( + await client.xadd(key, [["f2", "v2"]], { + id: stream_id1_2, + }), + ).toEqual(stream_id1_2); + + // acknowledge the first 2 entries + expect( + await client.xack(key, groupName, [ + stream_id1_0, + stream_id1_1, + ]), + ).toBe(2); + + // attempt to acknowledge the first 2 entries again, returns 0 since they were already acknowledged + expect( + await client.xack(key, groupName, [ + stream_id1_0, + stream_id1_1, + ]), + ).toBe(0); + + // read the last unacknowledged entry + expect( + await client.xreadgroup(groupName, consumerName, { + [key]: ">", + }), + ).toEqual({ [key]: { [stream_id1_2]: [["f2", "v2"]] } }); + + // deleting the consumer, returns 1 since the last entry still hasn't been acknowledged + expect( + await client.xgroupDelConsumer( + key, + groupName, + consumerName, + ), + ).toBe(1); + + // attempt to acknowledge a non-existing key, returns 0 + expect( + await client.xack(nonExistingKey, groupName, [ + stream_id1_0, + ]), + ).toBe(0); + + // attempt to acknowledge a non-existing group name, returns 0 + expect( + await client.xack(key, "nonExistingGroup", [stream_id1_0]), + ).toBe(0); + + // attempt to acknowledge a non-existing ID, returns 0 + expect(await client.xack(key, groupName, ["99-99"])).toBe(0); + + // invalid argument - ID list must not be empty + await expect(client.xack(key, groupName, [])).rejects.toThrow( + RequestError, + ); + + // invalid argument - invalid stream ID format + await expect( + client.xack(key, groupName, ["invalid stream ID format"]), + ).rejects.toThrow(RequestError); + + // key exists, but is not a stream + expect(await client.set(string_key, "xack")).toBe("OK"); + await expect( + client.xack(string_key, groupName, [stream_id1_0]), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `lmpop test_%p`, async (protocol) => { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 1e6ada1a34..da6641426a 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -1215,6 +1215,8 @@ export async function transactionTest( ]); } + baseTransaction.xack(key9, groupName1, ["0-3"]); + responseData.push(["xack(key9, groupName1, ['0-3'])", 0]); baseTransaction.xgroupDelConsumer(key9, groupName1, consumer); responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]); baseTransaction.xgroupDestroy(key9, groupName1);