From 2cb5c311e6449ef41d7d2f27b22af3fc5ffe0166 Mon Sep 17 00:00:00 2001 From: Yi-Pin Chen Date: Fri, 9 Aug 2024 16:47:54 -0700 Subject: [PATCH 1/4] Node: added WAIT command Signed-off-by: Yi-Pin Chen --- node/src/BaseClient.ts | 27 +++++++++++++++++++++++++-- node/src/Commands.ts | 11 +++++++++++ node/src/Transaction.ts | 30 ++++++++++++++++++++++++------ node/tests/SharedTests.ts | 32 ++++++++++++++++++++++++++++++++ node/tests/TestUtilities.ts | 2 ++ 5 files changed, 94 insertions(+), 8 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index e93bcd2c2d..4abdc19b00 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -46,6 +46,7 @@ import { SearchOrigin, SetOptions, StreamAddOptions, + StreamClaimOptions, StreamGroupOptions, StreamReadOptions, StreamTrimOptions, @@ -158,8 +159,10 @@ import { createTouch, createType, createUnlink, + createWait, createWatch, createXAdd, + createXClaim, createXDel, createXGroupCreate, createXGroupDestroy, @@ -196,8 +199,6 @@ import { createZRevRankWithScore, createZScan, createZScore, - StreamClaimOptions, - createXClaim, } from "./Commands"; import { ClosingError, @@ -5174,6 +5175,28 @@ export class BaseClient { return this.createWritePromise(createWatch(keys)); } + /** + * Blocks the current client until all the previous write commands are successfully transferred and + * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns + * even if the specified number of replicas were not yet reached. + * + * See https://valkey.io/commands/wait/ for more details. + * + * @param numreplicas - The number of replicas to reach. + * @param timeout - The timeout value specified in milliseconds. A value of 0 will block indefinitely. + * @returns The number of replicas reached by all the writes performed in the context of the current connection. + * + * @example + * ```typescript + * await client.set(key, value); + * let response = await client.wait(1, 1000); + * console.log(response); // Output: return 1 when a replica is reached or 0 if 1000ms is reached. + * ``` + */ + public async wait(numreplicas: number, timeout: number): Promise { + return this.createWritePromise(createWait(numreplicas, timeout)); + } + /** * Overwrites part of the string stored at `key`, starting at the specified `offset`, * for the entire length of `value`. If the `offset` is larger than the current length of the string at `key`, diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 6f9715ddf4..b6f7cd51fa 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3323,6 +3323,17 @@ export function createUnWatch(): command_request.Command { return createCommand(RequestType.UnWatch, []); } +/** @internal */ +export function createWait( + numreplicas: number, + timeout: number, +): command_request.Command { + return createCommand(RequestType.Wait, [ + numreplicas.toString(), + timeout.toString(), + ]); +} + /** * This base class represents the common set of optional arguments for the SCAN family of commands. * Concrete implementations of this class are tied to specific SCAN commands (SCAN, HSCAN, SSCAN, diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 937909f15b..4618ebc772 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -31,6 +31,7 @@ import { GeoCircleShape, // eslint-disable-line @typescript-eslint/no-unused-vars GeoSearchResultOptions, GeoSearchShape, + GeoSearchStoreResultOptions, GeoUnit, GeospatialData, InfoOptions, @@ -99,6 +100,7 @@ import { createGeoHash, createGeoPos, createGeoSearch, + createGeoSearchStore, createGet, createGetBit, createGetDel, @@ -192,18 +194,19 @@ import { createTouch, createType, createUnlink, + createWait, createXAdd, createXClaim, createXDel, + createXGroupCreate, + createXGroupCreateConsumer, + createXGroupDelConsumer, + createXGroupDestroy, createXInfoConsumers, createXInfoStream, createXLen, createXRead, createXTrim, - createXGroupCreate, - createXGroupDestroy, - createXGroupCreateConsumer, - createXGroupDelConsumer, createZAdd, createZCard, createZCount, @@ -230,8 +233,6 @@ import { createZRevRankWithScore, createZScan, createZScore, - createGeoSearchStore, - GeoSearchStoreResultOptions, } from "./Commands"; import { command_request } from "./ProtobufMessage"; @@ -2600,6 +2601,23 @@ export class BaseTransaction> { return this.addAndReturn(createLolwut(options)); } + /** + * Blocks the current client until all the previous write commands are successfully transferred and + * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns + * even if the specified number of replicas were not yet reached. + * + * See https://valkey.io/commands/wait/ for more details. + * + * @param numreplicas - The number of replicas to reach. + * @param timeout - The timeout value specified in milliseconds. A value of 0 will block indefinitely. + * + * Command Response - The number of replicas reached by all the writes performed in the context of the + * current connection. + */ + public wait(numreplicas: number, timeout: number): T { + return this.addAndReturn(createWait(numreplicas, timeout)); + } + /** * Invokes a previously loaded function. * diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index db27f80007..e56cc5c323 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -5163,6 +5163,38 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "wait test_%p", + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const value1 = uuidv4(); + const value2 = uuidv4(); + + // assert that wait returns 0 under standalone and 1 under cluster mode. + expect(await client.set(key, value1)).toEqual("OK"); + + if (client instanceof GlideClusterClient) { + expect(await client.wait(1, 1000)).toBeGreaterThanOrEqual( + 1, + ); + } else { + expect(await client.wait(1, 1000)).toBeGreaterThanOrEqual( + 0, + ); + } + + // command should fail on a negative timeout value + await expect(client.wait(1, -1)).rejects.toThrow(RequestError); + + // ensure that command doesn't time out even if timeout > request timeout (250ms by default) + expect(await client.set(key, value2)).toEqual("OK"); + expect(await client.wait(100, 500)).toBeGreaterThanOrEqual(0); + }, protocol); + }, + config.timeout, + ); + // Set command tests async function setWithExpiryOptions(client: BaseClient) { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index ef044bd008..1e7d4a7a91 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -1360,5 +1360,7 @@ export async function transactionTest( responseData.push(["sortReadOnly(key21)", ["1", "2", "3"]]); } + baseTransaction.wait(1, 200); + responseData.push(["wait(1, 200)", 1]); return responseData; } From 4dd44d6ae2ace1e7dd073e22ca1e06fb55e8d2ae Mon Sep 17 00:00:00 2001 From: Yi-Pin Chen Date: Fri, 9 Aug 2024 16:51:12 -0700 Subject: [PATCH 2/4] Update CHANGELOG Signed-off-by: Yi-Pin Chen --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa7f24b6d7..c9c2d8d64b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ * Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018)) * Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053)) * Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076)) +* Node: Added WAIT command ([#2113](https://github.com/valkey-io/valkey-glide/pull/2113)) * Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022)) * Node: Added ZREMRANGEBYLEX command ([#2025](https://github.com/valkey-io/valkey-glide/pull/2025)) * Node: Added SRANDMEMBER command ([#2067](https://github.com/valkey-io/valkey-glide/pull/2067)) From 22590c79bc94df12af69202c4f91216f481e2c6d Mon Sep 17 00:00:00 2001 From: Yi-Pin Chen Date: Mon, 12 Aug 2024 11:40:11 -0700 Subject: [PATCH 3/4] Addressed review comments Signed-off-by: Yi-Pin Chen --- node/src/BaseClient.ts | 6 +++--- node/src/Transaction.ts | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 4abdc19b00..c4fd899a4d 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -165,11 +165,11 @@ import { createXClaim, createXDel, createXGroupCreate, + createXGroupCreateConsumer, + createXGroupDelConsumer, createXGroupDestroy, createXInfoConsumers, createXInfoStream, - createXGroupCreateConsumer, - createXGroupDelConsumer, createXLen, createXRead, createXTrim, @@ -5178,7 +5178,7 @@ export class BaseClient { /** * Blocks the current client until all the previous write commands are successfully transferred and * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns - * even if the specified number of replicas were not yet reached. + * the number of replicas were not yet reached. * * See https://valkey.io/commands/wait/ for more details. * diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 4618ebc772..c1e9ad24a4 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -2604,7 +2604,7 @@ export class BaseTransaction> { /** * Blocks the current client until all the previous write commands are successfully transferred and * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns - * even if the specified number of replicas were not yet reached. + * the number of replicas were not yet reached. * * See https://valkey.io/commands/wait/ for more details. * From 6f649066299a1a454a35ca573bcdbfd7c3ee2d6c Mon Sep 17 00:00:00 2001 From: Yi-Pin Chen Date: Tue, 13 Aug 2024 17:35:20 -0700 Subject: [PATCH 4/4] Addressed review comments Signed-off-by: Yi-Pin Chen --- node/src/BaseClient.ts | 2 +- node/src/Transaction.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 521f44f080..38c3698f3e 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -5504,7 +5504,7 @@ export class BaseClient { /** * Blocks the current client until all the previous write commands are successfully transferred and * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns - * the number of replicas were not yet reached. + * the number of replicas that were not yet reached. * * See https://valkey.io/commands/wait/ for more details. * diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index b121ebeded..3f800425a7 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -2758,7 +2758,7 @@ export class BaseTransaction> { /** * Blocks the current client until all the previous write commands are successfully transferred and * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns - * the number of replicas were not yet reached. + * the number of replicas that were not yet reached. * * See https://valkey.io/commands/wait/ for more details. *