Skip to content

Commit

Permalink
Node: Add XCLAIM command. (valkey-io#2092)
Browse files Browse the repository at this point in the history
* Add `XCLAIM` command.

---------

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Co-authored-by: Guian Gumpac <guian.gumpac@improving.com>
Signed-off-by: Chloe Yip <chloe.yip@improving.com>
  • Loading branch information
2 people authored and cyip10 committed Aug 12, 2024
1 parent 77864c7 commit 62899f3
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XCLAIM command ([#2092](https://github.com/valkey-io/valkey-glide/pull/2092))
* Node: Added EXPIRETIME and PEXPIRETIME commands ([#2063](https://github.com/valkey-io/valkey-glide/pull/2063))
* Node: Added SORT commands ([#2028](https://github.com/valkey-io/valkey-glide/pull/2028))
* Node: Added LASTSAVE command ([#2059](https://github.com/valkey-io/valkey-glide/pull/2059))
Expand Down
2 changes: 2 additions & 0 deletions node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ function initialize() {
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down Expand Up @@ -226,6 +227,7 @@ function initialize() {
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down
94 changes: 84 additions & 10 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
StreamClaimOptions,
createXClaim,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -3688,21 +3690,23 @@ export class BaseClient {
* @example
* ```typescript
* const streamResults = await client.xread({"my_stream": "0-0", "writers": "0-0"});
* console.log(result); // Output: {
* // "my_stream": {
* // "1526984818136-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]],
* // "1526999352406-0": [["duration", "812"], ["event-id", "9"], ["user-id", "388234"]],
* // }, "writers": {
* // "1526985676425-0": [["name", "Virginia"], ["surname", "Woolf"]],
* // "1526985685298-0": [["name", "Jane"], ["surname", "Austen"]],
* // }
* // }
* console.log(result); // Output:
* // {
* // "my_stream": {
* // "1526984818136-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]],
* // "1526999352406-0": [["duration", "812"], ["event-id", "9"], ["user-id", "388234"]],
* // },
* // "writers": {
* // "1526985676425-0": [["name", "Virginia"], ["surname", "Woolf"]],
* // "1526985685298-0": [["name", "Jane"], ["surname", "Austen"]],
* // }
* // }
* ```
*/
public xread(
keys_and_ids: Record<string, string>,
options?: StreamReadOptions,
): Promise<Record<string, Record<string, string[][]>>> {
): Promise<Record<string, Record<string, [string, string][]>>> {
return this.createWritePromise(createXRead(keys_and_ids, options));
}

Expand All @@ -3724,6 +3728,76 @@ export class BaseClient {
return this.createWritePromise(createXLen(key));
}

/**
* Changes the ownership of a pending message.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
* @returns A `Record` of message entries that are claimed by the consumer.
*
* @example
* ```typescript
* const result = await client.xclaim("myStream", "myGroup", "myConsumer", 42,
* ["1-0", "2-0", "3-0"], { idle: 500, retryCount: 3, isForce: true });
* console.log(result); // Output:
* // {
* // "2-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]]
* // }
* ```
*/
public async xclaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): Promise<Record<string, [string, string][]>> {
return this.createWritePromise(
createXClaim(key, group, consumer, minIdleTime, ids, options),
);
}

/**
* Changes the ownership of a pending message. This function returns an `array` with
* only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
* @returns An `array` of message ids claimed by the consumer.
*
* @example
* ```typescript
* const result = await client.xclaimJustId("my_stream", "my_group", "my_consumer", 42,
* ["1-0", "2-0", "3-0"], { idle: 500, retryCount: 3, isForce: true });
* console.log(result); // Output: [ "2-0", "3-0" ]
* ```
*/
public async xclaimJustId(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): Promise<string[]> {
return this.createWritePromise(
createXClaim(key, group, consumer, minIdleTime, ids, options, true),
);
}

/**
* Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
*
Expand Down
63 changes: 63 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2335,6 +2335,69 @@ export function createXLen(key: string): command_request.Command {
return createCommand(RequestType.XLen, [key]);
}

/** Optional parameters for {@link BaseClient.xclaim|xclaim} command. */
export type StreamClaimOptions = {
/**
* Set the idle time (last time it was delivered) of the message in milliseconds. If `idle`
* is not specified, an `idle` of `0` is assumed, that is, the time count is reset
* because the message now has a new owner trying to process it.
*/
idle?: number; // in milliseconds

/**
* This is the same as {@link idle} but instead of a relative amount of milliseconds, it sets the
* idle time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF
* file generating `XCLAIM` commands.
*/
idleUnixTime?: number; // in unix-time milliseconds

/**
* Set the retry counter to the specified value. This counter is incremented every time a message
* is delivered again. Normally {@link BaseClient.xclaim|xclaim} does not alter this counter,
* which is just served to clients when the {@link BaseClient.xpending|xpending} command is called:
* this way clients can detect anomalies, like messages that are never processed for some reason
* after a big number of delivery attempts.
*/
retryCount?: number;

/**
* Creates the pending message entry in the PEL even if certain specified IDs are not already in
* the PEL assigned to a different client. However, the message must exist in the stream,
* otherwise the IDs of non-existing messages are ignored.
*/
isForce?: boolean;

/** The last ID of the entry which should be claimed. */
lastId?: string;
};

/** @internal */
export function createXClaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
justId?: boolean,
): command_request.Command {
const args = [key, group, consumer, minIdleTime.toString(), ...ids];

if (options) {
if (options.idle !== undefined)
args.push("IDLE", options.idle.toString());
if (options.idleUnixTime !== undefined)
args.push("TIME", options.idleUnixTime.toString());
if (options.retryCount !== undefined)
args.push("RETRYCOUNT", options.retryCount.toString());
if (options.isForce) args.push("FORCE");
if (options.lastId) args.push("LASTID", options.lastId);
}

if (justId) args.push("JUSTID");
return createCommand(RequestType.XClaim, args);
}

/**
* Optional arguments for {@link BaseClient.xgroupCreate|xgroupCreate}.
*
Expand Down
57 changes: 57 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
SortClusterOptions,
SortOptions,
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -184,6 +185,7 @@ import {
createType,
createUnlink,
createXAdd,
createXClaim,
createXDel,
createXLen,
createXRead,
Expand Down Expand Up @@ -2150,6 +2152,61 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXLen(key));
}

/**
* Changes the ownership of a pending message.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
*
* Command Response - A `Record` of message entries that are claimed by the consumer.
*/
public xclaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): T {
return this.addAndReturn(
createXClaim(key, group, consumer, minIdleTime, ids, options),
);
}

/**
* Changes the ownership of a pending message. This function returns an `array` with
* only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
*
* Command Response - An `array` of message ids claimed by the consumer.
*/
public xclaimJustId(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): T {
return this.addAndReturn(
createXClaim(key, group, consumer, minIdleTime, ids, options, true),
);
}

/**
* Creates a new consumer group uniquely identified by `groupname` for the stream
* stored at `key`.
Expand Down
Loading

0 comments on commit 62899f3

Please sign in to comment.