diff --git a/CHANGELOG.md b/CHANGELOG.md index f9a1decdad..58ba036b18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ * Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112)) * Node: Added XGROUP SETID command ([#2135]((https://github.com/valkey-io/valkey-glide/pull/2135)) * Node: Added binary variant to string commands ([#2183](https://github.com/valkey-io/valkey-glide/pull/2183)) +* Node: Added binary variant to stream commands ([#2200](https://github.com/valkey-io/valkey-glide/pull/2200)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index ce8afe3112..4805e4f02d 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -4731,14 +4731,18 @@ export class BaseClient { * @param key - The key of the stream. * @param values - field-value pairs to be added to the entry. * @param options - options detailing how to add to the stream. + * @param options - (Optional) See {@link StreamAddOptions} and {@link DecoderOption}. * @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. */ public async xadd( - key: string, - values: [string, string][], - options?: StreamAddOptions, - ): Promise { - return this.createWritePromise(createXAdd(key, values, options)); + key: GlideString, + values: [GlideString, GlideString][], + options?: StreamAddOptions & DecoderOption, + ): Promise { + return this.createWritePromise( + createXAdd(key, values, options), + options, + ); } /** @@ -4757,7 +4761,7 @@ export class BaseClient { * // Output is 2 since the stream marked 2 entries as deleted. * ``` */ - public async xdel(key: string, ids: string[]): Promise { + public async xdel(key: GlideString, ids: GlideString[]): Promise { return this.createWritePromise(createXDel(key, ids)); } @@ -5024,7 +5028,7 @@ export class BaseClient { * @param consumer - The group consumer. * @param minIdleTime - The minimum idle time for the message to be claimed. * @param ids - An array of entry ids. - * @param options - (Optional) Stream claim options {@link StreamClaimOptions}. + * @param options - (Optional) See {@link StreamClaimOptions} and {@link DecoderOption}. * @returns A `Record` of message entries that are claimed by the consumer. * * @example @@ -5038,13 +5042,14 @@ export class BaseClient { * ``` */ public async xclaim( - key: string, - group: string, - consumer: string, + key: GlideString, + group: GlideString, + consumer: GlideString, minIdleTime: number, - ids: string[], - options?: StreamClaimOptions, + ids: GlideString[], + options?: StreamClaimOptions & DecoderOption, ): Promise> { + // TODO: convert Record return type to Object array return this.createWritePromise( createXClaim(key, group, consumer, minIdleTime, ids, options), ); @@ -5093,13 +5098,14 @@ export class BaseClient { * ``` */ public async xautoclaim( - key: string, - group: string, - consumer: string, + key: GlideString, + group: GlideString, + consumer: GlideString, minIdleTime: number, - start: string, + start: GlideString, count?: number, ): Promise<[string, Record, string[]?]> { + // TODO: convert Record return type to Object array return this.createWritePromise( createXAutoClaim(key, group, consumer, minIdleTime, start, count), ); @@ -5218,13 +5224,14 @@ export class BaseClient { * ``` */ public async xgroupCreate( - key: string, - groupName: string, - id: string, + key: GlideString, + groupName: GlideString, + id: GlideString, options?: StreamGroupOptions, - ): Promise { + ): Promise<"OK"> { return this.createWritePromise( createXGroupCreate(key, groupName, id, options), + { decoder: Decoder.String }, ); } @@ -5244,8 +5251,8 @@ export class BaseClient { * ``` */ public async xgroupDestroy( - key: string, - groupName: string, + key: GlideString, + groupName: GlideString, ): Promise { return this.createWritePromise(createXGroupDestroy(key, groupName)); } @@ -5340,9 +5347,9 @@ export class BaseClient { * ``` */ public async xgroupCreateConsumer( - key: string, - groupName: string, - consumerName: string, + key: GlideString, + groupName: GlideString, + consumerName: GlideString, ): Promise { return this.createWritePromise( createXGroupCreateConsumer(key, groupName, consumerName), @@ -5366,9 +5373,9 @@ export class BaseClient { * ``` */ public async xgroupDelConsumer( - key: string, - groupName: string, - consumerName: string, + key: GlideString, + groupName: GlideString, + consumerName: GlideString, ): Promise { return this.createWritePromise( createXGroupDelConsumer(key, groupName, consumerName), @@ -5406,9 +5413,9 @@ export class BaseClient { * ``` */ public async xack( - key: string, - group: string, - ids: string[], + key: GlideString, + group: GlideString, + ids: GlideString[], ): Promise { return this.createWritePromise(createXAck(key, group, ids)); } @@ -5424,7 +5431,6 @@ export class BaseClient { * group. * @param entriesRead - (Optional) A value representing the number of stream entries already read by the group. * This option can only be specified if you are using Valkey version 7.0.0 or above. - * @param decoder - (Optional) {@link Decoder} type which defines how to handle the response. If not set, the default decoder from the client config will be used. * @returns `"OK"`. * * * @example @@ -5433,16 +5439,15 @@ export class BaseClient { * ``` */ public async xgroupSetId( - key: string, - groupName: string, - id: string, + key: GlideString, + groupName: GlideString, + id: GlideString, entriesRead?: number, - decoder?: Decoder, ): Promise<"OK"> { return this.createWritePromise( createXGroupSetid(key, groupName, id, entriesRead), { - decoder: decoder, + decoder: Decoder.String, }, ); } diff --git a/node/src/Commands.ts b/node/src/Commands.ts index f3d6d934d1..8a15dd93f0 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -2052,7 +2052,7 @@ export type StreamAddOptions = { trim?: StreamTrimOptions; }; -function addTrimOptions(options: StreamTrimOptions, args: string[]) { +function addTrimOptions(options: StreamTrimOptions, args: GlideString[]) { if (options.method === "maxlen") { args.push("MAXLEN"); } else if (options.method === "minid") { @@ -2081,8 +2081,8 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) { * @internal */ export function createXAdd( - key: string, - values: [string, string][], + key: GlideString, + values: [GlideString, GlideString][], options?: StreamAddOptions, ): command_request.Command { const args = [key]; @@ -2113,8 +2113,8 @@ export function createXAdd( * @internal */ export function createXDel( - key: string, - ids: string[], + key: GlideString, + ids: GlideString[], ): command_request.Command { return createCommand(RequestType.XDel, [key, ...ids]); } @@ -2173,9 +2173,9 @@ export function createXRevRange( * @internal */ export function createXGroupCreateConsumer( - key: string, - groupName: string, - consumerName: string, + key: GlideString, + groupName: GlideString, + consumerName: GlideString, ): command_request.Command { return createCommand(RequestType.XGroupCreateConsumer, [ key, @@ -2188,9 +2188,9 @@ export function createXGroupCreateConsumer( * @internal */ export function createXGroupDelConsumer( - key: string, - groupName: string, - consumerName: string, + key: GlideString, + groupName: GlideString, + consumerName: GlideString, ): command_request.Command { return createCommand(RequestType.XGroupDelConsumer, [ key, @@ -2714,11 +2714,11 @@ export type StreamClaimOptions = { /** @internal */ export function createXClaim( - key: string, - group: string, - consumer: string, + key: GlideString, + group: GlideString, + consumer: GlideString, minIdleTime: number, - ids: string[], + ids: GlideString[], options?: StreamClaimOptions, justId?: boolean, ): command_request.Command { @@ -2740,11 +2740,11 @@ export function createXClaim( /** @internal */ export function createXAutoClaim( - key: string, - group: string, - consumer: string, + key: GlideString, + group: GlideString, + consumer: GlideString, minIdleTime: number, - start: string, + start: GlideString, count?: number, justId?: boolean, ): command_request.Command { @@ -2784,12 +2784,12 @@ export type StreamGroupOptions = { * @internal */ export function createXGroupCreate( - key: string, - groupName: string, - id: string, + key: GlideString, + groupName: GlideString, + id: GlideString, options?: StreamGroupOptions, ): command_request.Command { - const args: string[] = [key, groupName, id]; + const args: GlideString[] = [key, groupName, id]; if (options) { if (options.mkStream) { @@ -2809,8 +2809,8 @@ export function createXGroupCreate( * @internal */ export function createXGroupDestroy( - key: string, - groupName: string, + key: GlideString, + groupName: GlideString, ): command_request.Command { return createCommand(RequestType.XGroupDestroy, [key, groupName]); } @@ -3975,9 +3975,9 @@ export function createGetEx( * @internal */ export function createXAck( - key: string, - group: string, - ids: string[], + key: GlideString, + group: GlideString, + ids: GlideString[], ): command_request.Command { return createCommand(RequestType.XAck, [key, group, ...ids]); } @@ -3986,9 +3986,9 @@ export function createXAck( * @internal */ export function createXGroupSetid( - key: string, - groupName: string, - id: string, + key: GlideString, + groupName: GlideString, + id: GlideString, entriesRead?: number, ): command_request.Command { const args = [key, groupName, id]; diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index c338d01923..92276ed58c 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -2504,8 +2504,8 @@ export class BaseTransaction> { * Command Response - The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. */ public xadd( - key: string, - values: [string, string][], + key: GlideString, + values: [GlideString, GlideString][], options?: StreamAddOptions, ): T { return this.addAndReturn(createXAdd(key, values, options)); @@ -2522,7 +2522,7 @@ export class BaseTransaction> { * Command Response - The number of entries removed from the stream. This number may be less than the number of entries in * `ids`, if the specified `ids` don't exist in the stream. */ - public xdel(key: string, ids: string[]): T { + public xdel(key: GlideString, ids: GlideString[]): T { return this.addAndReturn(createXDel(key, ids)); } @@ -2760,11 +2760,11 @@ export class BaseTransaction> { * Command Response - A `Record` of message entries that are claimed by the consumer. */ public xclaim( - key: string, - group: string, - consumer: string, + key: GlideString, + group: GlideString, + consumer: GlideString, minIdleTime: number, - ids: string[], + ids: GlideString[], options?: StreamClaimOptions, ): T { return this.addAndReturn( @@ -2824,11 +2824,11 @@ export class BaseTransaction> { * These IDs are deleted from the Pending Entries List. */ public xautoclaim( - key: string, - group: string, - consumer: string, + key: GlideString, + group: GlideString, + consumer: GlideString, minIdleTime: number, - start: string, + start: GlideString, count?: number, ): T { return this.addAndReturn( @@ -2894,9 +2894,9 @@ export class BaseTransaction> { * Command Response - `"OK"`. */ public xgroupCreate( - key: string, - groupName: string, - id: string, + key: GlideString, + groupName: GlideString, + id: GlideString, options?: StreamGroupOptions, ): T { return this.addAndReturn( @@ -2914,7 +2914,7 @@ export class BaseTransaction> { * * Command Response - `true` if the consumer group is destroyed. Otherwise, `false`. */ - public xgroupDestroy(key: string, groupName: string): T { + public xgroupDestroy(key: GlideString, groupName: GlideString): T { return this.addAndReturn(createXGroupDestroy(key, groupName)); } @@ -2930,9 +2930,9 @@ export class BaseTransaction> { * Command Response - `true` if the consumer is created. Otherwise, returns `false`. */ public xgroupCreateConsumer( - key: string, - groupName: string, - consumerName: string, + key: GlideString, + groupName: GlideString, + consumerName: GlideString, ): T { return this.addAndReturn( createXGroupCreateConsumer(key, groupName, consumerName), @@ -2951,9 +2951,9 @@ export class BaseTransaction> { * Command Response - The number of pending messages the `consumer` had before it was deleted. */ public xgroupDelConsumer( - key: string, - groupName: string, - consumerName: string, + key: GlideString, + groupName: GlideString, + consumerName: GlideString, ): T { return this.addAndReturn( createXGroupDelConsumer(key, groupName, consumerName), @@ -2972,7 +2972,7 @@ export class BaseTransaction> { * * Command Response - The number of messages that were successfully acknowledged. */ - public xack(key: string, group: string, ids: string[]): T { + public xack(key: GlideString, group: GlideString, ids: GlideString[]): T { return this.addAndReturn(createXAck(key, group, ids)); } @@ -2990,9 +2990,9 @@ export class BaseTransaction> { * Command Response - `"OK"`. */ public xgroupSetId( - key: string, - groupName: string, - id: string, + key: GlideString, + groupName: GlideString, + id: GlideString, entriesRead?: number, ): T { return this.addAndReturn( diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index f93f18e9ce..94ec63220f 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -6912,15 +6912,24 @@ export function runBaseTests(config: { const group = uuidv4(); const consumer = uuidv4(); - // setup data + // setup data & test binary parameters in XGROUP CREATE commands expect( - await client.xgroupCreate(key1, group, "0", { - mkStream: true, - }), + await client.xgroupCreate( + Buffer.from(key1), + Buffer.from(group), + Buffer.from("0"), + { + mkStream: true, + }, + ), ).toEqual("OK"); expect( - await client.xgroupCreateConsumer(key1, group, consumer), + await client.xgroupCreateConsumer( + Buffer.from(key1), + Buffer.from(group), + Buffer.from(consumer), + ), ).toBeTruthy(); const entry1 = (await client.xadd(key1, [ @@ -9917,9 +9926,11 @@ export function runBaseTests(config: { expect(await client.xdel(key, [streamId1, streamId3])).toEqual( 1, ); - expect(await client.xdel(nonExistentKey, [streamId3])).toEqual( - 0, - ); + expect( + await client.xdel(Buffer.from(nonExistentKey), [ + Buffer.from(streamId3), + ]), + ).toEqual(0); // invalid argument - id list should not be empty await expect(client.xdel(key, [])).rejects.toThrow( @@ -10353,6 +10364,15 @@ export function runBaseTests(config: { "OK", ); + // Testing binary parameters with an non-existing ID + expect( + await client.xgroupSetId( + Buffer.from(key), + Buffer.from(groupName), + Buffer.from("99-99"), + ), + ).toBe("OK"); + // key exists, but is not a stream expect(await client.set(stringKey, "xgroup setid")).toBe("OK"); await expect( @@ -10547,7 +10567,13 @@ export function runBaseTests(config: { // incorrect IDs - response is empty expect( - await client.xclaim(key, group, "consumer", 0, ["000"]), + await client.xclaim( + Buffer.from(key), + Buffer.from(group), + Buffer.from("consumer"), + 0, + [Buffer.from("000")], + ), ).toEqual({}); expect( await client.xclaimJustId(key, group, "consumer", 0, [ @@ -10617,12 +10643,13 @@ export function runBaseTests(config: { }, }); + // testing binary parameters let result = await client.xautoclaim( - key, - group, - "consumer", + Buffer.from(key), + Buffer.from(group), + Buffer.from("consumer"), 0, - "0-0", + Buffer.from("0-0"), 1, ); let expected: typeof result = [ @@ -10757,6 +10784,15 @@ export function runBaseTests(config: { ]), ).toBe(0); + // testing binary parameters + expect( + await client.xack( + Buffer.from(key), + Buffer.from(groupName), + [Buffer.from(stream_id1_0), Buffer.from(stream_id1_1)], + ), + ).toBe(0); + // read the last unacknowledged entry expect( await client.xreadgroup(groupName, consumerName, { @@ -11042,12 +11078,14 @@ export function runBaseTests(config: { ).toEqual(0); // Add two stream entries - const streamid1: string | null = await client.xadd(key, [ + const streamid1: GlideString | null = await client.xadd(key, [ ["field1", "value1"], ]); expect(streamid1).not.toBeNull(); - const streamid2 = await client.xadd(key, [ - ["field2", "value2"], + + // testing binary parameters + const streamid2 = await client.xadd(Buffer.from(key), [ + [Buffer.from("field2"), Buffer.from("value2")], ]); expect(streamid2).not.toBeNull(); @@ -11063,9 +11101,13 @@ export function runBaseTests(config: { }, }); - // delete one of the streams + // delete one of the streams & testing binary parameters expect( - await client.xgroupDelConsumer(key, groupName, consumer), + await client.xgroupDelConsumer( + Buffer.from(key), + Buffer.from(groupName), + Buffer.from(consumer), + ), ).toEqual(2); // attempting to call XGROUP CREATECONSUMER or XGROUP DELCONSUMER with a non-existing key should raise an error @@ -11141,6 +11183,13 @@ export function runBaseTests(config: { expect(await client.xgroupDestroy(key, groupName1)).toEqual( false, ); + // calling again with binary parameters, expecting the same result + expect( + await client.xgroupDestroy( + Buffer.from(key), + Buffer.from(groupName1), + ), + ).toEqual(false); // attempting to destroy a group for a non-existing key should raise an error await expect(