Skip to content

Commit

Permalink
Node: Add XAUTOCLAIM command. (#2108)
Browse files Browse the repository at this point in the history
* Add `XAUTOCLAIM` command.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand authored Aug 12, 2024
1 parent e4e39dd commit 73a9a78
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XAUTOCLAIM command ([#2108](https://github.com/valkey-io/valkey-glide/pull/2108))
* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085))
* Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093))
* Node: Added HRANDFIELD command ([#2096](https://github.com/valkey-io/valkey-glide/pull/2096))
Expand Down
119 changes: 119 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ import {
createUnlink,
createWatch,
createXAdd,
createXAutoClaim,
createXClaim,
createXDel,
createXGroupCreate,
Expand Down Expand Up @@ -4153,6 +4154,124 @@ export class BaseClient {
);
}

/**
* Transfers ownership of pending stream entries that match the specified criteria.
*
* See https://valkey.io/commands/xautoclaim/ for more details.
*
* since Valkey version 6.2.0.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param start - Filters the claimed entries to those that have an ID equal or greater than the
* specified value.
* @param count - (Optional) Limits the number of claimed entries to the specified value.
* @returns A `tuple` containing the following elements:
* - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
* equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
* the entire stream was scanned.
* - A `Record` of the claimed entries.
* - If you are using Valkey 7.0.0 or above, the response list will also include a list containing
* the message IDs that were in the Pending Entries List but no longer exist in the stream.
* These IDs are deleted from the Pending Entries List.
*
* @example
* ```typescript
* const result = await client.xautoclaim("myStream", "myGroup", "myConsumer", 42, "0-0", 25);
* console.log(result); // Output:
* // [
* // "1609338788321-0", // value to be used as `start` argument
* // // for the next `xautoclaim` call
* // {
* // "1609338752495-0": [ // claimed entries
* // ["field 1", "value 1"],
* // ["field 2", "value 2"]
* // ]
* // },
* // [
* // "1594324506465-0", // array of IDs of deleted messages,
* // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
* // ]
* // ]
* ```
*/
public async xautoclaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
start: string,
count?: number,
): Promise<[string, Record<string, [string, string][]>, string[]?]> {
return this.createWritePromise(
createXAutoClaim(key, group, consumer, minIdleTime, start, count),
);
}

/**
* Transfers ownership of pending stream entries that match the specified criteria.
*
* See https://valkey.io/commands/xautoclaim/ for more details.
*
* since Valkey version 6.2.0.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param start - Filters the claimed entries to those that have an ID equal or greater than the
* specified value.
* @param count - (Optional) Limits the number of claimed entries to the specified value.
* @returns An `array` containing the following elements:
* - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
* equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
* the entire stream was scanned.
* - A list of the IDs for the claimed entries.
* - If you are using Valkey 7.0.0 or above, the response list will also include a list containing
* the message IDs that were in the Pending Entries List but no longer exist in the stream.
* These IDs are deleted from the Pending Entries List.
*
* @example
* ```typescript
* const result = await client.xautoclaim("myStream", "myGroup", "myConsumer", 42, "0-0", 25);
* console.log(result); // Output:
* // [
* // "1609338788321-0", // value to be used as `start` argument
* // // for the next `xautoclaim` call
* // [
* // "1609338752495-0", // claimed entries
* // "1609338752495-1",
* // ],
* // [
* // "1594324506465-0", // array of IDs of deleted messages,
* // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
* // ]
* // ]
* ```
*/
public async xautoclaimJustId(
key: string,
group: string,
consumer: string,
minIdleTime: number,
start: string,
count?: number,
): Promise<[string, string[], string[]?]> {
return this.createWritePromise(
createXAutoClaim(
key,
group,
consumer,
minIdleTime,
start,
count,
true,
),
);
}

/**
* Changes the ownership of a pending message. This function returns an `array` with
* only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
Expand Down
22 changes: 22 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,28 @@ export function createXClaim(
return createCommand(RequestType.XClaim, args);
}

/** @internal */
export function createXAutoClaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
start: string,
count?: number,
justId?: boolean,
): command_request.Command {
const args = [
key,
group,
consumer,
minIdleTime.toString(),
start.toString(),
];
if (count !== undefined) args.push("COUNT", count.toString());
if (justId) args.push("JUSTID");
return createCommand(RequestType.XAutoClaim, args);
}

/**
* Optional arguments for {@link BaseClient.xgroupCreate|xgroupCreate}.
*
Expand Down
83 changes: 83 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ import {
createType,
createUnlink,
createXAdd,
createXAutoClaim,
createXClaim,
createXDel,
createXGroupCreate,
Expand Down Expand Up @@ -2421,6 +2422,88 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
);
}

/**
* Transfers ownership of pending stream entries that match the specified criteria.
*
* See https://valkey.io/commands/xautoclaim/ for more details.
*
* since Valkey version 6.2.0.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param start - Filters the claimed entries to those that have an ID equal or greater than the
* specified value.
* @param count - (Optional) Limits the number of claimed entries to the specified value.
*
* Command Response - An `array` containing the following elements:
* - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
* equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
* the entire stream was scanned.
* - A mapping of the claimed entries.
* - If you are using Valkey 7.0.0 or above, the response list will also include a list containing
* the message IDs that were in the Pending Entries List but no longer exist in the stream.
* These IDs are deleted from the Pending Entries List.
*/
public xautoclaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
start: string,
count?: number,
): T {
return this.addAndReturn(
createXAutoClaim(key, group, consumer, minIdleTime, start, count),
);
}

/**
* Transfers ownership of pending stream entries that match the specified criteria.
*
* See https://valkey.io/commands/xautoclaim/ for more details.
*
* since Valkey version 6.2.0.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param start - Filters the claimed entries to those that have an ID equal or greater than the
* specified value.
* @param count - (Optional) Limits the number of claimed entries to the specified value.
*
* Command Response - An `array` containing the following elements:
* - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
* equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
* the entire stream was scanned.
* - A list of the IDs for the claimed entries.
* - If you are using Valkey 7.0.0 or above, the response list will also include a list containing
* the message IDs that were in the Pending Entries List but no longer exist in the stream.
* These IDs are deleted from the Pending Entries List.
*/
public xautoclaimJustId(
key: string,
group: string,
consumer: string,
minIdleTime: number,
start: string,
count?: number,
): T {
return this.addAndReturn(
createXAutoClaim(
key,
group,
consumer,
minIdleTime,
start,
count,
true,
),
);
}

/**
* Creates a new consumer group uniquely identified by `groupname` for the stream
* stored at `key`.
Expand Down
Loading

0 comments on commit 73a9a78

Please sign in to comment.