Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: added WATCH and UNWATCH commands #2076

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* Node: Added ZINCRBY command ([#2009](https://github.com/valkey-io/valkey-glide/pull/2009))
* Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018))
* Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053))
* Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076))
* Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022))
* Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025))
* Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061))
Expand Down
37 changes: 35 additions & 2 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ import {
createTouch,
createType,
createUnlink,
createWatch,
createXAdd,
createXDel,
createXLen,
Expand Down Expand Up @@ -170,8 +171,8 @@ import {
createZRemRangeByScore,
createZRevRank,
createZRevRankWithScore,
createZScore,
createZScan,
createZScore,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -4482,10 +4483,42 @@ export class BaseClient {
* console.log(result); // Output: 2 - The last access time of 2 keys has been updated.
* ```
*/
public touch(keys: string[]): Promise<number> {
public async touch(keys: string[]): Promise<number> {
return this.createWritePromise(createTouch(keys));
}

/**
* Marks the given keys to be watched for conditional execution of a transaction. Transactions
* will only execute commands if the watched keys are not modified before execution of the
* transaction.
*
* See https://valkey.io/commands/watch/ for more details.
*
* @remarks When in cluster mode, the command may route to multiple nodes when `keys` map to different hash slots.
* @param keys - The keys to watch.
* @returns A simple "OK" response.
*
* @example
* ```typescript
* const response = await client.watch("sampleKey");
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
* console.log(response); // Output: "OK"
* transaction.set("SampleKey", "foobar");
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
* const result = await client.exec(transaction);
* console.log(result); // Output: ["OK"] - Executes successfully and keys are unwatched.
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
* ```
* ```typescript
* const response = await client.watch("sampleKey");
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
* console.log(response); // Output: "OK"
* transaction.set("SampleKey", "foobar");
* await client.set("sampleKey", "hello world");
* const result = await client.exec(transaction);
* console.log(result); // Output: null - null is returned when the watched key is modified before transaction execution.
* ```
*/
public async watch(keys: string[]): Promise<"OK"> {
return this.createWritePromise(createWatch(keys));
}

/**
* Overwrites part of the string stored at `key`, starting at the specified `offset`,
* for the entire length of `value`. If the `offset` is larger than the current length of the string at `key`,
Expand Down
10 changes: 10 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3013,6 +3013,16 @@ export function createRandomKey(): command_request.Command {
return createCommand(RequestType.RandomKey, []);
}

/** @internal */
export function createWatch(keys: string[]): command_request.Command {
return createCommand(RequestType.Watch, keys);
}

/** @internal */
export function createUnWatch(): command_request.Command {
return createCommand(RequestType.UnWatch, []);
}

/**
* This base class represents the common set of optional arguments for the SCAN family of commands.
* Concrete implementations of this class are tied to specific SCAN commands (SCAN, HSCAN, SSCAN,
Expand Down
23 changes: 22 additions & 1 deletion node/src/GlideClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
createSort,
createSortReadOnly,
createTime,
createUnWatch,
} from "./Commands";
import { connection_request } from "./ProtobufMessage";
import { Transaction } from "./Transaction";
Expand Down Expand Up @@ -742,7 +743,27 @@ export class GlideClient extends BaseClient {
* console.log(result); // Output: "key12" - "key12" is a random existing key name from the currently selected database.
* ```
*/
public randomKey(): Promise<string | null> {
public async randomKey(): Promise<string | null> {
return this.createWritePromise(createRandomKey());
}

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* automatically flush all previously watched keys.
*
* See https://valkey.io/commands/unwatch/ for more details.
*
* @returns A simple "OK" response.
*
* @example
* ```typescript
* let response = await client.watch("sampleKey");
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
* console.log(response); // Output: "OK"
* response = await client.unwatch();
* console.log(response); // Output: "OK"
* ```
*/
public async unwatch(): Promise<"OK"> {
return this.createWritePromise(createUnWatch());
}
}
25 changes: 24 additions & 1 deletion node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
createSort,
createSortReadOnly,
createTime,
createUnWatch,
} from "./Commands";
import { RequestError } from "./Errors";
import { command_request, connection_request } from "./ProtobufMessage";
Expand Down Expand Up @@ -1117,10 +1118,32 @@ export class GlideClusterClient extends BaseClient {
* console.log(result); // Output: "key12" - "key12" is a random existing key name.
* ```
*/
public randomKey(route?: Routes): Promise<string | null> {
public async randomKey(route?: Routes): Promise<string | null> {
return this.createWritePromise(
createRandomKey(),
toProtobufRoute(route),
);
}

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys.
*
* See https://valkey.io/commands/unwatch/ for more details.
*
* @param route - (Optional) The command will be routed to all primary nodes, unless `route` is provided,
* in which case the client will route the command to the nodes defined by `route`.
* @returns A simple "OK" response.
*
* @example
* ```typescript
* let response = await client.watch("sampleKey");
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
* console.log(response); // Output: "OK"
* response = await client.unwatch();
* console.log(response); // Output: "OK"
* ```
*/
public async unwatch(route?: Routes): Promise<"OK"> {
return this.createWritePromise(createUnWatch(), toProtobufRoute(route));
}
}
112 changes: 110 additions & 2 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ import {
} from "@jest/globals";
import { BufferReader, BufferWriter } from "protobufjs";
import { v4 as uuidv4 } from "uuid";
import { GlideClient, ListDirection, ProtocolVersion, Transaction } from "..";
import {
GlideClient,
ListDirection,
ProtocolVersion,
RequestError,
Transaction,
} from "..";
import { RedisCluster } from "../../utils/TestUtils.js";
import { FlushMode, SortOrder } from "../build-ts/src/Commands";
import { command_request } from "../src/ProtobufMessage";
Expand Down Expand Up @@ -214,7 +220,7 @@ describe("GlideClient", () => {
);
const transaction = new Transaction();
transaction.get("key");
const result1 = await client1.customCommand(["WATCH", "key"]);
const result1 = await client1.watch(["key"]);
expect(result1).toEqual("OK");

const result2 = await client2.set("key", "foo");
Expand Down Expand Up @@ -926,6 +932,108 @@ describe("GlideClient", () => {
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"watch test_%p",
async (protocol) => {
const client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);

const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();
const key3 = "{key}-3" + uuidv4();
const key4 = "{key}-4" + uuidv4();
const setFoobarTransaction = new Transaction();
const setHelloTransaction = new Transaction();

// Returns null when a watched key is modified before it is executed in a transaction command.
// Transaction commands are not performed.
expect(await client.watch([key1, key2, key3])).toEqual("OK");
expect(await client.set(key2, "hello")).toEqual("OK");
setFoobarTransaction
.set(key1, "foobar")
.set(key2, "foobar")
.set(key3, "foobar");
let results = await client.exec(setFoobarTransaction);
expect(results).toEqual(null);
// sanity check
expect(await client.get(key1)).toEqual(null);
expect(await client.get(key2)).toEqual("hello");
expect(await client.get(key3)).toEqual(null);

// Transaction executes command successfully with a read command on the watch key before
// transaction is executed.
expect(await client.watch([key1, key2, key3])).toEqual("OK");
expect(await client.get(key2)).toEqual("hello");
results = await client.exec(setFoobarTransaction);
expect(results).toEqual(["OK", "OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("foobar");
expect(await client.get(key2)).toEqual("foobar");
expect(await client.get(key3)).toEqual("foobar");

// Transaction executes command successfully with unmodified watched keys
expect(await client.watch([key1, key2, key3])).toEqual("OK");
results = await client.exec(setFoobarTransaction);
expect(results).toEqual(["OK", "OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("foobar");
expect(await client.get(key2)).toEqual("foobar");
expect(await client.get(key3)).toEqual("foobar");

// Transaction executes command successfully with a modified watched key but is not in the
// transaction.
expect(await client.watch([key4])).toEqual("OK");
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
setHelloTransaction
.set(key1, "hello")
.set(key2, "hello")
.set(key3, "hello");
results = await client.exec(setHelloTransaction);
expect(results).toEqual(["OK", "OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("hello");
expect(await client.get(key2)).toEqual("hello");
expect(await client.get(key3)).toEqual("hello");

// WATCH can not have an empty String array parameter
await expect(client.watch([])).rejects.toThrow(RequestError);

client.close();
},
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"unwatch test_%p",
async (protocol) => {
const client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);

const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();

const setFoobarTransaction = new Transaction();

// UNWATCH returns OK when there no watched keys
expect(await client.unwatch()).toEqual("OK");

// Transaction executes successfully after modifying a watched key then calling UNWATCH
expect(await client.watch([key1, key2])).toEqual("OK");
expect(await client.set(key2, "hello")).toEqual("OK");
expect(await client.unwatch()).toEqual("OK");
setFoobarTransaction.set(key1, "foobar").set(key2, "foobar");
const results = await client.exec(setFoobarTransaction);
expect(results).toEqual(["OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("foobar");
expect(await client.get(key2)).toEqual("foobar");

client.close();
},
TIMEOUT,
);

runBaseTests<Context>({
init: async (protocol, clientName?) => {
const options = getClientConfigurationOption(
Expand Down
Loading
Loading