Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add Binary support for stream commands, part 1 #2200

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))
* Node: Added XGROUP SETID command ([#2135]((https://github.com/valkey-io/valkey-glide/pull/2135))
* Node: Added binary variant to string commands ([#2183](https://github.com/valkey-io/valkey-glide/pull/2183))
* Node: Added binary variant to stream commands ([#2200](https://github.com/valkey-io/valkey-glide/pull/2200))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
79 changes: 42 additions & 37 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4731,14 +4731,18 @@ export class BaseClient {
* @param key - The key of the stream.
* @param values - field-value pairs to be added to the entry.
* @param options - options detailing how to add to the stream.
* @param options - (Optional) See {@link StreamAddOptions} and {@link DecoderOption}.
* @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
*/
public async xadd(
key: string,
values: [string, string][],
options?: StreamAddOptions,
): Promise<string | null> {
return this.createWritePromise(createXAdd(key, values, options));
key: GlideString,
values: [GlideString, GlideString][],
options?: StreamAddOptions & DecoderOption,
): Promise<GlideString | null> {
return this.createWritePromise(
createXAdd(key, values, options),
options,
);
}

/**
Expand All @@ -4757,7 +4761,7 @@ export class BaseClient {
* // Output is 2 since the stream marked 2 entries as deleted.
* ```
*/
public async xdel(key: string, ids: string[]): Promise<number> {
public async xdel(key: GlideString, ids: GlideString[]): Promise<number> {
return this.createWritePromise(createXDel(key, ids));
}

Expand Down Expand Up @@ -5024,7 +5028,7 @@ export class BaseClient {
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
* @param options - (Optional) See {@link StreamClaimOptions} and {@link DecoderOption}.
* @returns A `Record` of message entries that are claimed by the consumer.
*
* @example
Expand All @@ -5038,13 +5042,14 @@ export class BaseClient {
* ```
*/
public async xclaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
ids: GlideString[],
options?: StreamClaimOptions & DecoderOption,
): Promise<Record<string, [string, string][]>> {
// TODO: convert Record return type to Object array
return this.createWritePromise(
createXClaim(key, group, consumer, minIdleTime, ids, options),
);
Expand Down Expand Up @@ -5093,13 +5098,14 @@ export class BaseClient {
* ```
*/
public async xautoclaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
start: string,
start: GlideString,
count?: number,
): Promise<[string, Record<string, [string, string][]>, string[]?]> {
// TODO: convert Record return type to Object array
return this.createWritePromise(
createXAutoClaim(key, group, consumer, minIdleTime, start, count),
);
Expand Down Expand Up @@ -5218,13 +5224,14 @@ export class BaseClient {
* ```
*/
public async xgroupCreate(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
options?: StreamGroupOptions,
): Promise<string> {
): Promise<"OK"> {
return this.createWritePromise(
createXGroupCreate(key, groupName, id, options),
{ decoder: Decoder.String },
);
}

Expand All @@ -5244,8 +5251,8 @@ export class BaseClient {
* ```
*/
public async xgroupDestroy(
key: string,
groupName: string,
key: GlideString,
groupName: GlideString,
): Promise<boolean> {
return this.createWritePromise(createXGroupDestroy(key, groupName));
}
Expand Down Expand Up @@ -5340,9 +5347,9 @@ export class BaseClient {
* ```
*/
public async xgroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): Promise<boolean> {
return this.createWritePromise(
createXGroupCreateConsumer(key, groupName, consumerName),
Expand All @@ -5366,9 +5373,9 @@ export class BaseClient {
* ```
*/
public async xgroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): Promise<number> {
return this.createWritePromise(
createXGroupDelConsumer(key, groupName, consumerName),
Expand Down Expand Up @@ -5406,9 +5413,9 @@ export class BaseClient {
* ```
*/
public async xack(
key: string,
group: string,
ids: string[],
key: GlideString,
group: GlideString,
ids: GlideString[],
): Promise<number> {
return this.createWritePromise(createXAck(key, group, ids));
}
Expand All @@ -5424,7 +5431,6 @@ export class BaseClient {
* group.
* @param entriesRead - (Optional) A value representing the number of stream entries already read by the group.
* This option can only be specified if you are using Valkey version 7.0.0 or above.
* @param decoder - (Optional) {@link Decoder} type which defines how to handle the response. If not set, the default decoder from the client config will be used.
* @returns `"OK"`.
*
* * @example
Expand All @@ -5433,16 +5439,15 @@ export class BaseClient {
* ```
*/
public async xgroupSetId(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
entriesRead?: number,
decoder?: Decoder,
): Promise<"OK"> {
return this.createWritePromise(
createXGroupSetid(key, groupName, id, entriesRead),
{
decoder: decoder,
decoder: Decoder.String,
},
);
}
Expand Down
62 changes: 31 additions & 31 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2052,7 +2052,7 @@ export type StreamAddOptions = {
trim?: StreamTrimOptions;
};

function addTrimOptions(options: StreamTrimOptions, args: string[]) {
function addTrimOptions(options: StreamTrimOptions, args: GlideString[]) {
if (options.method === "maxlen") {
args.push("MAXLEN");
} else if (options.method === "minid") {
Expand Down Expand Up @@ -2081,8 +2081,8 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) {
* @internal
*/
export function createXAdd(
key: string,
values: [string, string][],
key: GlideString,
values: [GlideString, GlideString][],
options?: StreamAddOptions,
): command_request.Command {
const args = [key];
Expand Down Expand Up @@ -2113,8 +2113,8 @@ export function createXAdd(
* @internal
*/
export function createXDel(
key: string,
ids: string[],
key: GlideString,
ids: GlideString[],
): command_request.Command {
return createCommand(RequestType.XDel, [key, ...ids]);
}
Expand Down Expand Up @@ -2173,9 +2173,9 @@ export function createXRevRange(
* @internal
*/
export function createXGroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupCreateConsumer, [
key,
Expand All @@ -2188,9 +2188,9 @@ export function createXGroupCreateConsumer(
* @internal
*/
export function createXGroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupDelConsumer, [
key,
Expand Down Expand Up @@ -2714,11 +2714,11 @@ export type StreamClaimOptions = {

/** @internal */
export function createXClaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
ids: string[],
ids: GlideString[],
options?: StreamClaimOptions,
justId?: boolean,
): command_request.Command {
Expand All @@ -2740,11 +2740,11 @@ export function createXClaim(

/** @internal */
export function createXAutoClaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
start: string,
start: GlideString,
count?: number,
justId?: boolean,
): command_request.Command {
Expand Down Expand Up @@ -2784,12 +2784,12 @@ export type StreamGroupOptions = {
* @internal
*/
export function createXGroupCreate(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
options?: StreamGroupOptions,
): command_request.Command {
const args: string[] = [key, groupName, id];
const args: GlideString[] = [key, groupName, id];

if (options) {
if (options.mkStream) {
Expand All @@ -2809,8 +2809,8 @@ export function createXGroupCreate(
* @internal
*/
export function createXGroupDestroy(
key: string,
groupName: string,
key: GlideString,
groupName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupDestroy, [key, groupName]);
}
Expand Down Expand Up @@ -3975,9 +3975,9 @@ export function createGetEx(
* @internal
*/
export function createXAck(
key: string,
group: string,
ids: string[],
key: GlideString,
group: GlideString,
ids: GlideString[],
): command_request.Command {
return createCommand(RequestType.XAck, [key, group, ...ids]);
}
Expand All @@ -3986,9 +3986,9 @@ export function createXAck(
* @internal
*/
export function createXGroupSetid(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
entriesRead?: number,
): command_request.Command {
const args = [key, groupName, id];
Expand Down
Loading
Loading