diff --git a/CHANGELOG.md b/CHANGELOG.md index a149637d19..7575a797c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Node: Added XCLAIM command ([#2092](https://github.com/valkey-io/valkey-glide/pull/2092)) * Node: Added EXPIRETIME and PEXPIRETIME commands ([#2063](https://github.com/valkey-io/valkey-glide/pull/2063)) * Node: Added SORT commands ([#2028](https://github.com/valkey-io/valkey-glide/pull/2028)) * Node: Added LASTSAVE command ([#2059](https://github.com/valkey-io/valkey-glide/pull/2059)) diff --git a/node/npm/glide/index.ts b/node/npm/glide/index.ts index b92c6dd01e..ef1b324437 100644 --- a/node/npm/glide/index.ts +++ b/node/npm/glide/index.ts @@ -138,6 +138,7 @@ function initialize() { StreamTrimOptions, StreamAddOptions, StreamReadOptions, + StreamClaimOptions, ScriptOptions, ClosingError, ConfigurationError, @@ -226,6 +227,7 @@ function initialize() { StreamTrimOptions, StreamAddOptions, StreamReadOptions, + StreamClaimOptions, ScriptOptions, ClosingError, ConfigurationError, diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 1b3d85be63..5dcf0fed8b 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -186,6 +186,8 @@ import { createZRevRankWithScore, createZScan, createZScore, + StreamClaimOptions, + createXClaim, } from "./Commands"; import { ClosingError, @@ -3638,21 +3640,23 @@ export class BaseClient { * @example * ```typescript * const streamResults = await client.xread({"my_stream": "0-0", "writers": "0-0"}); - * console.log(result); // Output: { - * // "my_stream": { - * // "1526984818136-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]], - * // "1526999352406-0": [["duration", "812"], ["event-id", "9"], ["user-id", "388234"]], - * // }, "writers": { - * // "1526985676425-0": [["name", "Virginia"], ["surname", "Woolf"]], - * // "1526985685298-0": [["name", "Jane"], ["surname", "Austen"]], - * // } - * // } + * console.log(result); // Output: + * // { + * // "my_stream": { + * // "1526984818136-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]], + * // "1526999352406-0": [["duration", "812"], ["event-id", "9"], ["user-id", "388234"]], + * // }, + * // "writers": { + * // "1526985676425-0": [["name", "Virginia"], ["surname", "Woolf"]], + * // "1526985685298-0": [["name", "Jane"], ["surname", "Austen"]], + * // } + * // } * ``` */ public xread( keys_and_ids: Record, options?: StreamReadOptions, - ): Promise>> { + ): Promise>> { return this.createWritePromise(createXRead(keys_and_ids, options)); } @@ -3674,6 +3678,76 @@ export class BaseClient { return this.createWritePromise(createXLen(key)); } + /** + * Changes the ownership of a pending message. + * + * See https://valkey.io/commands/xclaim/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param consumer - The group consumer. + * @param minIdleTime - The minimum idle time for the message to be claimed. + * @param ids - An array of entry ids. + * @param options - (Optional) Stream claim options {@link StreamClaimOptions}. + * @returns A `Record` of message entries that are claimed by the consumer. + * + * @example + * ```typescript + * const result = await client.xclaim("myStream", "myGroup", "myConsumer", 42, + * ["1-0", "2-0", "3-0"], { idle: 500, retryCount: 3, isForce: true }); + * console.log(result); // Output: + * // { + * // "2-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]] + * // } + * ``` + */ + public async xclaim( + key: string, + group: string, + consumer: string, + minIdleTime: number, + ids: string[], + options?: StreamClaimOptions, + ): Promise> { + return this.createWritePromise( + createXClaim(key, group, consumer, minIdleTime, ids, options), + ); + } + + /** + * Changes the ownership of a pending message. This function returns an `array` with + * only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API. + * + * See https://valkey.io/commands/xclaim/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param consumer - The group consumer. + * @param minIdleTime - The minimum idle time for the message to be claimed. + * @param ids - An array of entry ids. + * @param options - (Optional) Stream claim options {@link StreamClaimOptions}. + * @returns An `array` of message ids claimed by the consumer. + * + * @example + * ```typescript + * const result = await client.xclaimJustId("my_stream", "my_group", "my_consumer", 42, + * ["1-0", "2-0", "3-0"], { idle: 500, retryCount: 3, isForce: true }); + * console.log(result); // Output: [ "2-0", "3-0" ] + * ``` + */ + public async xclaimJustId( + key: string, + group: string, + consumer: string, + minIdleTime: number, + ids: string[], + options?: StreamClaimOptions, + ): Promise { + return this.createWritePromise( + createXClaim(key, group, consumer, minIdleTime, ids, options, true), + ); + } + /** * Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`. * diff --git a/node/src/Commands.ts b/node/src/Commands.ts index f807d8234e..8353f79c29 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -2312,6 +2312,69 @@ export function createXLen(key: string): command_request.Command { return createCommand(RequestType.XLen, [key]); } +/** Optional parameters for {@link BaseClient.xclaim|xclaim} command. */ +export type StreamClaimOptions = { + /** + * Set the idle time (last time it was delivered) of the message in milliseconds. If `idle` + * is not specified, an `idle` of `0` is assumed, that is, the time count is reset + * because the message now has a new owner trying to process it. + */ + idle?: number; // in milliseconds + + /** + * This is the same as {@link idle} but instead of a relative amount of milliseconds, it sets the + * idle time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF + * file generating `XCLAIM` commands. + */ + idleUnixTime?: number; // in unix-time milliseconds + + /** + * Set the retry counter to the specified value. This counter is incremented every time a message + * is delivered again. Normally {@link BaseClient.xclaim|xclaim} does not alter this counter, + * which is just served to clients when the {@link BaseClient.xpending|xpending} command is called: + * this way clients can detect anomalies, like messages that are never processed for some reason + * after a big number of delivery attempts. + */ + retryCount?: number; + + /** + * Creates the pending message entry in the PEL even if certain specified IDs are not already in + * the PEL assigned to a different client. However, the message must exist in the stream, + * otherwise the IDs of non-existing messages are ignored. + */ + isForce?: boolean; + + /** The last ID of the entry which should be claimed. */ + lastId?: string; +}; + +/** @internal */ +export function createXClaim( + key: string, + group: string, + consumer: string, + minIdleTime: number, + ids: string[], + options?: StreamClaimOptions, + justId?: boolean, +): command_request.Command { + const args = [key, group, consumer, minIdleTime.toString(), ...ids]; + + if (options) { + if (options.idle !== undefined) + args.push("IDLE", options.idle.toString()); + if (options.idleUnixTime !== undefined) + args.push("TIME", options.idleUnixTime.toString()); + if (options.retryCount !== undefined) + args.push("RETRYCOUNT", options.retryCount.toString()); + if (options.isForce) args.push("FORCE"); + if (options.lastId) args.push("LASTID", options.lastId); + } + + if (justId) args.push("JUSTID"); + return createCommand(RequestType.XClaim, args); +} + /** * Optional arguments for {@link BaseClient.xgroupCreate|xgroupCreate}. * diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index e2babf630f..20e35b49ee 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -48,6 +48,7 @@ import { SortClusterOptions, SortOptions, StreamAddOptions, + StreamClaimOptions, StreamGroupOptions, StreamReadOptions, StreamTrimOptions, @@ -183,6 +184,7 @@ import { createType, createUnlink, createXAdd, + createXClaim, createXDel, createXLen, createXRead, @@ -2130,6 +2132,61 @@ export class BaseTransaction> { return this.addAndReturn(createXLen(key)); } + /** + * Changes the ownership of a pending message. + * + * See https://valkey.io/commands/xclaim/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param consumer - The group consumer. + * @param minIdleTime - The minimum idle time for the message to be claimed. + * @param ids - An array of entry ids. + * @param options - (Optional) Stream claim options {@link StreamClaimOptions}. + * + * Command Response - A `Record` of message entries that are claimed by the consumer. + */ + public xclaim( + key: string, + group: string, + consumer: string, + minIdleTime: number, + ids: string[], + options?: StreamClaimOptions, + ): T { + return this.addAndReturn( + createXClaim(key, group, consumer, minIdleTime, ids, options), + ); + } + + /** + * Changes the ownership of a pending message. This function returns an `array` with + * only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API. + * + * See https://valkey.io/commands/xclaim/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param consumer - The group consumer. + * @param minIdleTime - The minimum idle time for the message to be claimed. + * @param ids - An array of entry ids. + * @param options - (Optional) Stream claim options {@link StreamClaimOptions}. + * + * Command Response - An `array` of message ids claimed by the consumer. + */ + public xclaimJustId( + key: string, + group: string, + consumer: string, + minIdleTime: number, + ids: string[], + options?: StreamClaimOptions, + ): T { + return this.addAndReturn( + createXClaim(key, group, consumer, minIdleTime, ids, options, true), + ); + } + /** * Creates a new consumer group uniquely identified by `groupname` for the stream * stored at `key`. diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 1ef69cc727..f41cd4a968 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -6873,6 +6873,126 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xclaim test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const group = uuidv4(); + + expect( + await client.xgroupCreate(key, group, "0", { + mkStream: true, + }), + ).toEqual("OK"); + expect( + await client.customCommand([ + "xgroup", + "createconsumer", + key, + group, + "consumer", + ]), + ).toEqual(true); + + expect( + await client.xadd( + key, + [ + ["entry1_field1", "entry1_value1"], + ["entry1_field2", "entry1_value2"], + ], + { id: "0-1" }, + ), + ).toEqual("0-1"); + expect( + await client.xadd( + key, + [["entry2_field1", "entry2_value1"]], + { id: "0-2" }, + ), + ).toEqual("0-2"); + + expect( + await client.customCommand([ + "xreadgroup", + "group", + group, + "consumer", + "STREAMS", + key, + ">", + ]), + ).toEqual({ + [key]: { + "0-1": [ + ["entry1_field1", "entry1_value1"], + ["entry1_field2", "entry1_value2"], + ], + "0-2": [["entry2_field1", "entry2_value1"]], + }, + }); + + expect( + await client.xclaim(key, group, "consumer", 0, ["0-1"]), + ).toEqual({ + "0-1": [ + ["entry1_field1", "entry1_value1"], + ["entry1_field2", "entry1_value2"], + ], + }); + expect( + await client.xclaimJustId(key, group, "consumer", 0, [ + "0-2", + ]), + ).toEqual(["0-2"]); + + // add one more entry + expect( + await client.xadd( + key, + [["entry3_field1", "entry3_value1"]], + { id: "0-3" }, + ), + ).toEqual("0-3"); + // using force, we can xclaim the message without reading it + expect( + await client.xclaimJustId( + key, + group, + "consumer", + 0, + ["0-3"], + { isForce: true, retryCount: 99 }, + ), + ).toEqual(["0-3"]); + + // incorrect IDs - response is empty + expect( + await client.xclaim(key, group, "consumer", 0, ["000"]), + ).toEqual({}); + expect( + await client.xclaimJustId(key, group, "consumer", 0, [ + "000", + ]), + ).toEqual([]); + + // empty ID array + await expect( + client.xclaim(key, group, "consumer", 0, []), + ).rejects.toThrow(RequestError); + + // key exists, but it is not a stream + const stringKey = uuidv4(); + expect(await client.set(stringKey, "foo")).toEqual("OK"); + await expect( + client.xclaim(stringKey, "_", "_", 0, ["_"]), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `lmpop test_%p`, async (protocol) => { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index f8def33988..22283738f7 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -887,12 +887,68 @@ export async function transactionTest( 'xgroupCreate(key9, groupName2, "0-0", { mkStream: true })', "OK", ]); + baseTransaction.xdel(key9, ["0-3", "0-5"]); + responseData.push(["xdel(key9, [['0-3', '0-5']])", 1]); + + // key9 has one entry here: {"0-2":[["field","value2"]]} + + baseTransaction.customCommand([ + "xgroup", + "createconsumer", + key9, + groupName1, + "consumer1", + ]); + responseData.push([ + 'xgroupCreateConsumer(key9, groupName1, "consumer1")', + true, + ]); + baseTransaction.customCommand([ + "xreadgroup", + "group", + groupName1, + "consumer1", + "STREAMS", + key9, + ">", + ]); + responseData.push([ + 'xreadgroup(groupName1, "consumer1", key9, >)', + { [key9]: { "0-2": [["field", "value2"]] } }, + ]); + baseTransaction.xclaim(key9, groupName1, "consumer1", 0, ["0-2"]); + responseData.push([ + 'xclaim(key9, groupName1, "consumer1", 0, ["0-2"])', + { "0-2": [["field", "value2"]] }, + ]); + baseTransaction.xclaim(key9, groupName1, "consumer1", 0, ["0-2"], { + isForce: true, + retryCount: 0, + idle: 0, + }); + responseData.push([ + 'xclaim(key9, groupName1, "consumer1", 0, ["0-2"], { isForce: true, retryCount: 0, idle: 0})', + { "0-2": [["field", "value2"]] }, + ]); + baseTransaction.xclaimJustId(key9, groupName1, "consumer1", 0, ["0-2"]); + responseData.push([ + 'xclaimJustId(key9, groupName1, "consumer1", 0, ["0-2"])', + ["0-2"], + ]); + baseTransaction.xclaimJustId(key9, groupName1, "consumer1", 0, ["0-2"], { + isForce: true, + retryCount: 0, + idle: 0, + }); + responseData.push([ + 'xclaimJustId(key9, groupName1, "consumer1", 0, ["0-2"], { isForce: true, retryCount: 0, idle: 0})', + ["0-2"], + ]); baseTransaction.xgroupDestroy(key9, groupName1); responseData.push(["xgroupDestroy(key9, groupName1)", true]); baseTransaction.xgroupDestroy(key9, groupName2); responseData.push(["xgroupDestroy(key9, groupName2)", true]); - baseTransaction.xdel(key9, ["0-3", "0-5"]); - responseData.push(["xdel(key9, [['0-3', '0-5']])", 1]); + baseTransaction.rename(key9, key10); responseData.push(["rename(key9, key10)", "OK"]); baseTransaction.exists([key10]);