Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add XCLAIM command. #2092

Merged
merged 5 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XCLAIM command ([#2092](https://github.com/valkey-io/valkey-glide/pull/2092))
* Node: Added EXPIRETIME and PEXPIRETIME commands ([#2063](https://github.com/valkey-io/valkey-glide/pull/2063))
* Node: Added SORT commands ([#2028](https://github.com/valkey-io/valkey-glide/pull/2028))
* Node: Added LASTSAVE command ([#2059](https://github.com/valkey-io/valkey-glide/pull/2059))
Expand Down
2 changes: 2 additions & 0 deletions node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ function initialize() {
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down Expand Up @@ -226,6 +227,7 @@ function initialize() {
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down
94 changes: 84 additions & 10 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
StreamClaimOptions,
createXClaim,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -3636,21 +3638,23 @@ export class BaseClient {
* @example
* ```typescript
* const streamResults = await client.xread({"my_stream": "0-0", "writers": "0-0"});
* console.log(result); // Output: {
* // "my_stream": {
* // "1526984818136-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]],
* // "1526999352406-0": [["duration", "812"], ["event-id", "9"], ["user-id", "388234"]],
* // }, "writers": {
* // "1526985676425-0": [["name", "Virginia"], ["surname", "Woolf"]],
* // "1526985685298-0": [["name", "Jane"], ["surname", "Austen"]],
* // }
* // }
* console.log(result); // Output:
* // {
* // "my_stream": {
* // "1526984818136-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]],
* // "1526999352406-0": [["duration", "812"], ["event-id", "9"], ["user-id", "388234"]],
* // },
* // "writers": {
* // "1526985676425-0": [["name", "Virginia"], ["surname", "Woolf"]],
* // "1526985685298-0": [["name", "Jane"], ["surname", "Austen"]],
* // }
* // }
* ```
*/
public xread(
keys_and_ids: Record<string, string>,
options?: StreamReadOptions,
): Promise<Record<string, Record<string, string[][]>>> {
): Promise<Record<string, Record<string, [string, string][]>>> {
return this.createWritePromise(createXRead(keys_and_ids, options));
}

Expand All @@ -3672,6 +3676,76 @@ export class BaseClient {
return this.createWritePromise(createXLen(key));
}

/**
* Changes the ownership of a pending message.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
* @returns A `Record` of message entries that are claimed by the consumer.
*
* @example
* ```typescript
* const result = await client.xclaim("myStream", "myGroup", "myConsumer", 42,
* ["1-0", "2-0", "3-0"], { idle: 500, retryCount: 3, isForce: true });
* console.log(result); // Output:
* // {
* // "2-0": [["duration", "1532"], ["event-id", "5"], ["user-id", "7782813"]]
* // }
* ```
*/
public async xclaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): Promise<Record<string, [string, string][]>> {
return this.createWritePromise(
createXClaim(key, group, consumer, minIdleTime, ids, options),
);
}

/**
* Changes the ownership of a pending message. This function returns an `array` with
* only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
* @returns An `array` of message ids claimed by the consumer.
*
* @example
* ```typescript
* const result = await client.xclaimJustId("my_stream", "my_group", "my_consumer", 42,
* ["1-0", "2-0", "3-0"], { idle: 500, retryCount: 3, isForce: true });
* console.log(result); // Output: [ "2-0", "3-0" ]
* ```
*/
public async xclaimJustId(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): Promise<string[]> {
return this.createWritePromise(
createXClaim(key, group, consumer, minIdleTime, ids, options, true),
);
}

/**
* Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
*
Expand Down
63 changes: 63 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,69 @@ export function createXLen(key: string): command_request.Command {
return createCommand(RequestType.XLen, [key]);
}

/** Optional parameters for {@link BaseClient.xclaim|xclaim} command. */
export type StreamClaimOptions = {
/**
* Set the idle time (last time it was delivered) of the message in milliseconds. If `idle`
* is not specified, an `idle` of `0` is assumed, that is, the time count is reset
* because the message now has a new owner trying to process it.
*/
idle?: number; // in milliseconds

/**
* This is the same as {@link idle} but instead of a relative amount of milliseconds, it sets the
* idle time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF
* file generating `XCLAIM` commands.
*/
idleUnixTime?: number; // in unix-time milliseconds

/**
* Set the retry counter to the specified value. This counter is incremented every time a message
* is delivered again. Normally {@link BaseClient.xclaim|xclaim} does not alter this counter,
* which is just served to clients when the {@link BaseClient.xpending|xpending} command is called:
* this way clients can detect anomalies, like messages that are never processed for some reason
* after a big number of delivery attempts.
*/
retryCount?: number;

/**
* Creates the pending message entry in the PEL even if certain specified IDs are not already in
* the PEL assigned to a different client. However, the message must exist in the stream,
* otherwise the IDs of non-existing messages are ignored.
*/
isForce?: boolean;

/** The last ID of the entry which should be claimed. */
lastId?: string;
};

/** @internal */
export function createXClaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
justId?: boolean,
): command_request.Command {
const args = [key, group, consumer, minIdleTime.toString(), ...ids];

if (options) {
if (options.idle !== undefined)
args.push("IDLE", options.idle.toString());
if (options.idleUnixTime !== undefined)
args.push("TIME", options.idleUnixTime.toString());
if (options.retryCount !== undefined)
args.push("RETRYCOUNT", options.retryCount.toString());
if (options.isForce) args.push("FORCE");
if (options.lastId) args.push("LASTID", options.lastId);
}

if (justId) args.push("JUSTID");
return createCommand(RequestType.XClaim, args);
}

/**
* Optional arguments for {@link BaseClient.xgroupCreate|xgroupCreate}.
*
Expand Down
57 changes: 57 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
SortClusterOptions,
SortOptions,
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -183,6 +184,7 @@ import {
createType,
createUnlink,
createXAdd,
createXClaim,
createXDel,
createXLen,
createXRead,
Expand Down Expand Up @@ -2128,6 +2130,61 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXLen(key));
}

/**
* Changes the ownership of a pending message.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
*
* Command Response - A `Record` of message entries that are claimed by the consumer.
*/
public xclaim(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): T {
return this.addAndReturn(
createXClaim(key, group, consumer, minIdleTime, ids, options),
);
}

/**
* Changes the ownership of a pending message. This function returns an `array` with
* only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
*
* See https://valkey.io/commands/xclaim/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
*
* Command Response - An `array` of message ids claimed by the consumer.
*/
public xclaimJustId(
key: string,
group: string,
consumer: string,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
): T {
return this.addAndReturn(
createXClaim(key, group, consumer, minIdleTime, ids, options, true),
);
}

/**
* Creates a new consumer group uniquely identified by `groupname` for the stream
* stored at `key`.
Expand Down
Loading
Loading