Skip to content

Commit

Permalink
Node: Add Binary support for stream commands, part 1
Browse files Browse the repository at this point in the history
Signed-off-by: TJ Zhang <tj.zhang@improving.com>
  • Loading branch information
TJ Zhang committed Aug 27, 2024
1 parent ed9a7af commit 852a965
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 113 deletions.
89 changes: 50 additions & 39 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4610,14 +4610,20 @@ export class BaseClient {
* @param key - The key of the stream.
* @param values - field-value pairs to be added to the entry.
* @param options - options detailing how to add to the stream.
* @param options - (Optional) Additional Parameters:
* - (Optional) options detailing how to add to the stream.
* - (Optional) `decoder`: see {@link DecoderOption}.
* @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
*/
public async xadd(
key: string,
values: [string, string][],
options?: StreamAddOptions,
): Promise<string | null> {
return this.createWritePromise(createXAdd(key, values, options));
key: GlideString,
values: [GlideString, GlideString][],
options?: StreamAddOptions & DecoderOption,
): Promise<GlideString | null> {
return this.createWritePromise(
createXAdd(key, values, options),
options,
);
}

/**
Expand All @@ -4636,7 +4642,7 @@ export class BaseClient {
* // Output is 2 since the stream marked 2 entries as deleted.
* ```
*/
public async xdel(key: string, ids: string[]): Promise<number> {
public async xdel(key: GlideString, ids: GlideString[]): Promise<number> {
return this.createWritePromise(createXDel(key, ids));
}

Expand Down Expand Up @@ -4903,7 +4909,9 @@ export class BaseClient {
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
* @param options - Additional Parameters:
* - (Optional) Stream claim options {@link StreamClaimOptions}.
* - (Optional) `decoder`: see {@link DecoderOption}.
* @returns A `Record` of message entries that are claimed by the consumer.
*
* @example
Expand All @@ -4917,13 +4925,14 @@ export class BaseClient {
* ```
*/
public async xclaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
ids: GlideString[],
options?: StreamClaimOptions & DecoderOption,
): Promise<Record<string, [string, string][]>> {
// TODO: convert Record return type to Object array
return this.createWritePromise(
createXClaim(key, group, consumer, minIdleTime, ids, options),
);
Expand Down Expand Up @@ -4972,13 +4981,14 @@ export class BaseClient {
* ```
*/
public async xautoclaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
start: string,
start: GlideString,
count?: number,
): Promise<[string, Record<string, [string, string][]>, string[]?]> {
// TODO: convert Record return type to Object array
return this.createWritePromise(
createXAutoClaim(key, group, consumer, minIdleTime, start, count),
);
Expand Down Expand Up @@ -5086,8 +5096,10 @@ export class BaseClient {
*
* @param key - The key of the stream.
* @param groupName - The newly created consumer group name.
* @param id - Stream entry ID that specifies the last delivered entry in the stream from the new
* @param id - Additional Parameters:
* - (Optional) Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID `"$"` can be used to specify the last entry in the stream.
* - (Optional) `decoder`: see {@link DecoderOption}.
* @returns `"OK"`.
*
* @example
Expand All @@ -5097,13 +5109,14 @@ export class BaseClient {
* ```
*/
public async xgroupCreate(
key: string,
groupName: string,
id: string,
options?: StreamGroupOptions,
): Promise<string> {
key: GlideString,
groupName: GlideString,
id: GlideString,
options?: StreamGroupOptions & DecoderOption,
): Promise<GlideString> {
return this.createWritePromise(
createXGroupCreate(key, groupName, id, options),
options,
);
}

Expand All @@ -5123,8 +5136,8 @@ export class BaseClient {
* ```
*/
public async xgroupDestroy(
key: string,
groupName: string,
key: GlideString,
groupName: GlideString,
): Promise<boolean> {
return this.createWritePromise(createXGroupDestroy(key, groupName));
}
Expand Down Expand Up @@ -5219,9 +5232,9 @@ export class BaseClient {
* ```
*/
public async xgroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): Promise<boolean> {
return this.createWritePromise(
createXGroupCreateConsumer(key, groupName, consumerName),
Expand All @@ -5245,9 +5258,9 @@ export class BaseClient {
* ```
*/
public async xgroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): Promise<number> {
return this.createWritePromise(
createXGroupDelConsumer(key, groupName, consumerName),
Expand Down Expand Up @@ -5285,9 +5298,9 @@ export class BaseClient {
* ```
*/
public async xack(
key: string,
group: string,
ids: string[],
key: GlideString,
group: GlideString,
ids: GlideString[],
): Promise<number> {
return this.createWritePromise(createXAck(key, group, ids));
}
Expand All @@ -5303,7 +5316,6 @@ export class BaseClient {
* group.
* @param entriesRead - (Optional) A value representing the number of stream entries already read by the group.
* This option can only be specified if you are using Valkey version 7.0.0 or above.
* @param decoder - (Optional) {@link Decoder} type which defines how to handle the response. If not set, the default decoder from the client config will be used.
* @returns `"OK"`.
*
* * @example
Expand All @@ -5312,16 +5324,15 @@ export class BaseClient {
* ```
*/
public async xgroupSetId(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
entriesRead?: number,
decoder?: Decoder,
): Promise<"OK"> {
return this.createWritePromise(
createXGroupSetid(key, groupName, id, entriesRead),
{
decoder: decoder,
decoder: Decoder.String,
},
);
}
Expand Down
62 changes: 31 additions & 31 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@ export type StreamAddOptions = {
trim?: StreamTrimOptions;
};

function addTrimOptions(options: StreamTrimOptions, args: string[]) {
function addTrimOptions(options: StreamTrimOptions, args: GlideString[]) {
if (options.method === "maxlen") {
args.push("MAXLEN");
} else if (options.method === "minid") {
Expand Down Expand Up @@ -2076,8 +2076,8 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) {
* @internal
*/
export function createXAdd(
key: string,
values: [string, string][],
key: GlideString,
values: [GlideString, GlideString][],
options?: StreamAddOptions,
): command_request.Command {
const args = [key];
Expand Down Expand Up @@ -2108,8 +2108,8 @@ export function createXAdd(
* @internal
*/
export function createXDel(
key: string,
ids: string[],
key: GlideString,
ids: GlideString[],
): command_request.Command {
return createCommand(RequestType.XDel, [key, ...ids]);
}
Expand Down Expand Up @@ -2168,9 +2168,9 @@ export function createXRevRange(
* @internal
*/
export function createXGroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupCreateConsumer, [
key,
Expand All @@ -2183,9 +2183,9 @@ export function createXGroupCreateConsumer(
* @internal
*/
export function createXGroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupDelConsumer, [
key,
Expand Down Expand Up @@ -2709,11 +2709,11 @@ export type StreamClaimOptions = {

/** @internal */
export function createXClaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
ids: string[],
ids: GlideString[],
options?: StreamClaimOptions,
justId?: boolean,
): command_request.Command {
Expand All @@ -2735,11 +2735,11 @@ export function createXClaim(

/** @internal */
export function createXAutoClaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
start: string,
start: GlideString,
count?: number,
justId?: boolean,
): command_request.Command {
Expand Down Expand Up @@ -2779,12 +2779,12 @@ export type StreamGroupOptions = {
* @internal
*/
export function createXGroupCreate(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
options?: StreamGroupOptions,
): command_request.Command {
const args: string[] = [key, groupName, id];
const args: GlideString[] = [key, groupName, id];

if (options) {
if (options.mkStream) {
Expand All @@ -2804,8 +2804,8 @@ export function createXGroupCreate(
* @internal
*/
export function createXGroupDestroy(
key: string,
groupName: string,
key: GlideString,
groupName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupDestroy, [key, groupName]);
}
Expand Down Expand Up @@ -3970,9 +3970,9 @@ export function createGetEx(
* @internal
*/
export function createXAck(
key: string,
group: string,
ids: string[],
key: GlideString,
group: GlideString,
ids: GlideString[],
): command_request.Command {
return createCommand(RequestType.XAck, [key, group, ...ids]);
}
Expand All @@ -3981,9 +3981,9 @@ export function createXAck(
* @internal
*/
export function createXGroupSetid(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
entriesRead?: number,
): command_request.Command {
const args = [key, groupName, id];
Expand Down
Loading

0 comments on commit 852a965

Please sign in to comment.