From cf02d122b53e36e83a91a0e1d50fdb7ff4818f4f Mon Sep 17 00:00:00 2001 From: Yi-Pin Chen Date: Tue, 13 Aug 2024 17:53:53 -0700 Subject: [PATCH] Node: added WAIT command (#2113) * Node: added WAIT command Signed-off-by: Yi-Pin Chen Signed-off-by: lior sventitzky --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 23 +++++++++++++++++++++++ node/src/Commands.ts | 11 +++++++++++ node/src/Transaction.ts | 18 ++++++++++++++++++ node/tests/SharedTests.ts | 32 ++++++++++++++++++++++++++++++++ node/tests/TestUtilities.ts | 2 ++ 6 files changed, 87 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e33086d73..7be482a0cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ * Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018)) * Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053)) * Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076)) +* Node: Added WAIT command ([#2113](https://github.com/valkey-io/valkey-glide/pull/2113)) * Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022)) * Node: Added ZREMRANGEBYLEX command ([#2025](https://github.com/valkey-io/valkey-glide/pull/2025)) * Node: Added ZRANGESTORE command ([#2068](https://github.com/valkey-io/valkey-glide/pull/2068)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index bd0632f83b..e30c4199d6 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -161,6 +161,7 @@ import { createTouch, createType, createUnlink, + createWait, createWatch, createXAdd, createXAutoClaim, @@ -5551,6 +5552,28 @@ export class BaseClient { return this.createWritePromise(createWatch(keys)); } + /** + * Blocks the current client until all the previous write commands are successfully transferred and + * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns + * the number of replicas that were not yet reached. + * + * See https://valkey.io/commands/wait/ for more details. + * + * @param numreplicas - The number of replicas to reach. + * @param timeout - The timeout value specified in milliseconds. A value of 0 will block indefinitely. + * @returns The number of replicas reached by all the writes performed in the context of the current connection. + * + * @example + * ```typescript + * await client.set(key, value); + * let response = await client.wait(1, 1000); + * console.log(response); // Output: return 1 when a replica is reached or 0 if 1000ms is reached. + * ``` + */ + public async wait(numreplicas: number, timeout: number): Promise { + return this.createWritePromise(createWait(numreplicas, timeout)); + } + /** * Overwrites part of the string stored at `key`, starting at the specified `offset`, * for the entire length of `value`. If the `offset` is larger than the current length of the string at `key`, diff --git a/node/src/Commands.ts b/node/src/Commands.ts index e831492622..5ed5f5a421 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3452,6 +3452,17 @@ export function createUnWatch(): command_request.Command { return createCommand(RequestType.UnWatch, []); } +/** @internal */ +export function createWait( + numreplicas: number, + timeout: number, +): command_request.Command { + return createCommand(RequestType.Wait, [ + numreplicas.toString(), + timeout.toString(), + ]); +} + /** * This base class represents the common set of optional arguments for the SCAN family of commands. * Concrete implementations of this class are tied to specific SCAN commands (SCAN, HSCAN, SSCAN, diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 2933e5bac7..b1441ea566 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -198,6 +198,7 @@ import { createTouch, createType, createUnlink, + createWait, createXAdd, createXAutoClaim, createXClaim, @@ -2774,6 +2775,23 @@ export class BaseTransaction> { return this.addAndReturn(createLolwut(options)); } + /** + * Blocks the current client until all the previous write commands are successfully transferred and + * acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns + * the number of replicas that were not yet reached. + * + * See https://valkey.io/commands/wait/ for more details. + * + * @param numreplicas - The number of replicas to reach. + * @param timeout - The timeout value specified in milliseconds. A value of 0 will block indefinitely. + * + * Command Response - The number of replicas reached by all the writes performed in the context of the + * current connection. + */ + public wait(numreplicas: number, timeout: number): T { + return this.addAndReturn(createWait(numreplicas, timeout)); + } + /** * Invokes a previously loaded function. * diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 0cb8300023..eb92b36c83 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -5638,6 +5638,38 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "wait test_%p", + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const value1 = uuidv4(); + const value2 = uuidv4(); + + // assert that wait returns 0 under standalone and 1 under cluster mode. + expect(await client.set(key, value1)).toEqual("OK"); + + if (client instanceof GlideClusterClient) { + expect(await client.wait(1, 1000)).toBeGreaterThanOrEqual( + 1, + ); + } else { + expect(await client.wait(1, 1000)).toBeGreaterThanOrEqual( + 0, + ); + } + + // command should fail on a negative timeout value + await expect(client.wait(1, -1)).rejects.toThrow(RequestError); + + // ensure that command doesn't time out even if timeout > request timeout (250ms by default) + expect(await client.set(key, value2)).toEqual("OK"); + expect(await client.wait(100, 500)).toBeGreaterThanOrEqual(0); + }, protocol); + }, + config.timeout, + ); + // Set command tests async function setWithExpiryOptions(client: BaseClient) { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index f1a8ade345..25f9c77d24 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -1518,5 +1518,7 @@ export async function transactionTest( responseData.push(["sortReadOnly(key21)", ["1", "2", "3"]]); } + baseTransaction.wait(1, 200); + responseData.push(["wait(1, 200)", 1]); return responseData; }