Skip to content

Commit

Permalink
Node: add command XACK
Browse files Browse the repository at this point in the history
Signed-off-by: TJ Zhang <tj.zhang@improving.com>
  • Loading branch information
TJ Zhang committed Aug 13, 2024
1 parent dcd31b9 commit be23892
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077))
* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))
* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
31 changes: 31 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
createXAck,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -4532,6 +4533,36 @@ export class BaseClient {
preferReplica: connection_request.ReadFrom.PreferReplica,
};

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
* This command should be called on a pending message so that such message does not get processed again.
*
* See https://valkey.io/commands/xack/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param ids - An array of entry ids.
* @returns The number of messages that were successfully acknowledged.
*
* @example
* ```typescript
* <pre>{@code
* const entryId = await client.xadd("mystream", ["myfield", "mydata"]);
* // read messages from streamId
* const readResult = await client.xreadgroup(["myfield", "mydata"], "mygroup", "my0consumer");
* // acknowledge messages on stream
* console.log(await client.xack("mystream", "mygroup", [entryId])); // Output: 1L
* </pre>
* ```
*/
public async xack(
key: string,
group: string,
ids: string[],
): Promise<number> {
return this.createWritePromise(createXAck(key, group, ids));
}

/** Returns the element at index `index` in the list stored at `key`.
* The index is zero-based, so 0 means the first element, 1 the second element and so on.
* Negative indices can be used to designate elements starting at the tail of the list.
Expand Down
11 changes: 11 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3628,3 +3628,14 @@ export function createBZPopMin(
): command_request.Command {
return createCommand(RequestType.BZPopMin, [...keys, timeout.toString()]);
}

/**
* @internal
*/
export function createXAck(
key: string,
group: string,
ids: string[],
): command_request.Command {
return createCommand(RequestType.XAck, [key, group, ...ids]);
}
17 changes: 17 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ import {
createTouch,
createType,
createUnlink,
createXAck,
createXAdd,
createXAutoClaim,
createXClaim,
Expand Down Expand Up @@ -2604,6 +2605,22 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
);
}

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
* This command should be called on a pending message so that such message does not get processed again.
*
* See https://valkey.io/commands/xack/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param ids - An array of entry ids.
*
* Command Response - The number of messages that were successfully acknowledged.
*/
public xack(key: string, group: string, ids: string[]): T {
return this.addAndReturn(createXAck(key, group, ids));
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
35 changes: 35 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8245,6 +8245,41 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xack test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster: RedisCluster) => {
const key = "{testKey}:1-" + uuidv4();
const nonExistingKey = "{testKey}:2-" + uuidv4();
const string_key = "{testKey}:3-" + uuidv4();
const groupName = uuidv4();
const consumerName = uuidv4();
const stream_id0 = "0";
const stream_id1_0 = "1-0";
const stream_id1_1 = "1-1";
const stream_id1_2 = "1-2";

// setup: add 2 entries to the stream, create consumer group and read to mark them as pending
expect(
await client.xadd(key, [["f0", "v0"]], {
id: stream_id1_0,
}),
).toEqual(stream_id1_0);
expect(
await client.xadd(key, [["f1", "v1"]], {
id: stream_id1_1,
}),
).toEqual(stream_id1_1);
expect(
await client.xgroupCreate(key, groupName, stream_id0),
).toBe("OK");
//expect(await client.xread)
//TODO: finish test with xreadGroup
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`lmpop test_%p`,
async (protocol) => {
Expand Down
7 changes: 7 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,13 @@ export async function transactionTest(
]);
}

baseTransaction.xgroupCreateConsumer(key9, groupName1, consumer);
responseData.push([
"xgroupCreateConsumer(key9, groupName1, consumer)",
true,
]);
baseTransaction.xack(key9, groupName1, ["0-3"]);
responseData.push(["xack(key9, groupName1, ['0-3'])", 0]);
baseTransaction.xgroupDelConsumer(key9, groupName1, consumer);
responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]);
baseTransaction.xgroupDestroy(key9, groupName1);
Expand Down

0 comments on commit be23892

Please sign in to comment.