diff --git a/CHANGELOG.md b/CHANGELOG.md index 33e6b49599..13fa2176c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ * Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084)) * Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077)) * Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088)) +* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 31c2d65544..550cbe48a6 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -203,6 +203,7 @@ import { createZRevRankWithScore, createZScan, createZScore, + createXAck, } from "./Commands"; import { ClosingError, @@ -4532,6 +4533,36 @@ export class BaseClient { preferReplica: connection_request.ReadFrom.PreferReplica, }; + /** + * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream. + * This command should be called on a pending message so that such message does not get processed again. + * + * See https://valkey.io/commands/xack/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param ids - An array of entry ids. + * @returns The number of messages that were successfully acknowledged. + * + * @example + * ```typescript + *
{@code
+     * const entryId = await client.xadd("mystream", ["myfield", "mydata"]);
+     * // read messages from streamId
+     * const readResult = await client.xreadgroup(["myfield", "mydata"], "mygroup", "my0consumer");
+     * // acknowledge messages on stream
+     * console.log(await client.xack("mystream", "mygroup", [entryId])); // Output: 1L
+     * 
+ * ``` + */ + public async xack( + key: string, + group: string, + ids: string[], + ): Promise { + return this.createWritePromise(createXAck(key, group, ids)); + } + /** Returns the element at index `index` in the list stored at `key`. * The index is zero-based, so 0 means the first element, 1 the second element and so on. * Negative indices can be used to designate elements starting at the tail of the list. diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 30dab3bcd3..a40a3af20f 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3628,3 +3628,14 @@ export function createBZPopMin( ): command_request.Command { return createCommand(RequestType.BZPopMin, [...keys, timeout.toString()]); } + +/** + * @internal + */ +export function createXAck( + key: string, + group: string, + ids: string[], +): command_request.Command { + return createCommand(RequestType.XAck, [key, group, ...ids]); +} diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 2933e5bac7..4e39d74da2 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -198,6 +198,7 @@ import { createTouch, createType, createUnlink, + createXAck, createXAdd, createXAutoClaim, createXClaim, @@ -2604,6 +2605,22 @@ export class BaseTransaction> { ); } + /** + * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream. + * This command should be called on a pending message so that such message does not get processed again. + * + * See https://valkey.io/commands/xack/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param ids - An array of entry ids. + * + * Command Response - The number of messages that were successfully acknowledged. + */ + public xack(key: string, group: string, ids: string[]): T { + return this.addAndReturn(createXAck(key, group, ids)); + } + /** * Renames `key` to `newkey`. * If `newkey` already exists it is overwritten. diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 151ac96d3e..cc66086bf8 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -8245,6 +8245,41 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xack test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient, cluster: RedisCluster) => { + const key = "{testKey}:1-" + uuidv4(); + const nonExistingKey = "{testKey}:2-" + uuidv4(); + const string_key = "{testKey}:3-" + uuidv4(); + const groupName = uuidv4(); + const consumerName = uuidv4(); + const stream_id0 = "0"; + const stream_id1_0 = "1-0"; + const stream_id1_1 = "1-1"; + const stream_id1_2 = "1-2"; + + // setup: add 2 entries to the stream, create consumer group and read to mark them as pending + expect( + await client.xadd(key, [["f0", "v0"]], { + id: stream_id1_0, + }), + ).toEqual(stream_id1_0); + expect( + await client.xadd(key, [["f1", "v1"]], { + id: stream_id1_1, + }), + ).toEqual(stream_id1_1); + expect( + await client.xgroupCreate(key, groupName, stream_id0), + ).toBe("OK"); + //expect(await client.xread) + //TODO: finish test with xreadGroup + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `lmpop test_%p`, async (protocol) => { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index dba0f69f17..abb5c58e87 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -1118,6 +1118,13 @@ export async function transactionTest( ]); } + baseTransaction.xgroupCreateConsumer(key9, groupName1, consumer); + responseData.push([ + "xgroupCreateConsumer(key9, groupName1, consumer)", + true, + ]); + baseTransaction.xack(key9, groupName1, ["0-3"]); + responseData.push(["xack(key9, groupName1, ['0-3'])", 0]); baseTransaction.xgroupDelConsumer(key9, groupName1, consumer); responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]); baseTransaction.xgroupDestroy(key9, groupName1);