Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: added ZMPOP command #1994

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Node: Added FUNCTION LOAD command ([#1969](https://github.com/valkey-io/valkey-glide/pull/1969))
* Node: Added FUNCTION DELETE command ([#1990](https://github.com/valkey-io/valkey-glide/pull/1990))
* Node: Added FUNCTION FLUSH command ([#1984](https://github.com/valkey-io/valkey-glide/pull/1984))
* Node: Added ZMPOP command ([#1994](https://github.com/valkey-io/valkey-glide/pull/1994))

## 1.0.0 (2024-07-09)

Expand Down
2 changes: 2 additions & 0 deletions node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ function initialize() {
ClusterTransaction,
Transaction,
PubSubMsg,
ScoreFilter,
createLeakedArray,
createLeakedAttribute,
createLeakedBigint,
Expand Down Expand Up @@ -168,6 +169,7 @@ function initialize() {
ClusterTransaction,
Transaction,
PubSubMsg,
ScoreFilter,
createLeakedArray,
createLeakedAttribute,
createLeakedBigint,
Expand Down
35 changes: 35 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
RangeByLex,
RangeByScore,
ScoreBoundary,
ScoreFilter,
SetOptions,
StreamAddOptions,
StreamReadOptions,
Expand Down Expand Up @@ -112,6 +113,7 @@ import {
createZDiffWithScores,
createZInterCard,
createZInterstore,
createZMPop,
createZMScore,
createZPopMax,
createZPopMin,
Expand Down Expand Up @@ -3416,6 +3418,39 @@ export class BaseClient {
);
}

/**
* Pops a member-score pair from the first non-empty sorted set, with the given `keys`
* being checked in the order they are provided.
*
* See https://valkey.io/commands/zmpop/ for more details.
*
* @remarks When in cluster mode, all `keys` must map to the same hash slot.
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param count - The number of elements to pop.
* @returns A two-element `array` containing the key name of the set from which the element
* was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
*
* since Valkey version 7.0.0.
*
* @example
* ```typescript
* await client.zadd("zSet1", { one: 1.0, two: 2.0, three: 3.0 });
* await client.zadd("zSet2", { four: 4.0 });
* console.log(await client.zmpop(["zSet1", "zSet2"], ScoreFilter.MAX, 2));
* // Output: [ "zSet1", { three: 3, two: 2 } ] - "three" with score 3 and "two" with score 2 were popped from "zSet1".
* ```
*/
public zmpop(
key: string[],
modifier: ScoreFilter,
count?: number,
): Promise<[string, [Record<string, number>]] | null> {
return this.createWritePromise(createZMPop(key, modifier, count));
}

/**
* @internal
*/
Expand Down
30 changes: 30 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1869,3 +1869,33 @@ export function createZRevRankWithScore(
): command_request.Command {
return createCommand(RequestType.ZRevRank, [key, member, "WITHSCORE"]);
}

/**
* Mandatory option for zmpop.
* Defines which elements to pop from the sorted set.
*/
export enum ScoreFilter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put it into src/commands dir?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the two necessary occurrences for ScoreFilter in npm/glide/index.ts?

Copy link
Collaborator Author

@GumpacG GumpacG Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put it into src/commands dir?

This has yet to be discussed if we separate enums and option types in to separate files.

/** Pop elements with the highest scores. */
MAX = "MAX",
/** Pop elements with the lowest scores. */
MIN = "MIN",
}

/**
* @internal
*/
export function createZMPop(
keys: string[],
modifier: ScoreFilter,
count?: number,
): command_request.Command {
const args: string[] = [keys.length.toString()].concat(keys);
args.push(modifier);

if (count !== undefined) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
args.push("COUNT");
args.push(count.toString());
}

return createCommand(RequestType.ZMPop, args);
}
25 changes: 24 additions & 1 deletion node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
RangeByLex,
RangeByScore,
ScoreBoundary,
ScoreFilter,
SetOptions,
StreamAddOptions,
StreamReadOptions,
Expand Down Expand Up @@ -136,6 +137,7 @@ import {
createZRemRangeByScore,
createZRevRank,
createZRevRankWithScore,
createZMPop,
createZScore,
} from "./Commands";
import { command_request } from "./ProtobufMessage";
Expand Down Expand Up @@ -1940,7 +1942,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* @param element - The value to search for within the list.
* @param options - The LPOS options.
*
* Command Response - The index of `element`, or `null` if `element` is not in the list. If the `count`
* Command Response - The index of `element`, or `null` if `element` is not in the list. If the `count`
* option is specified, then the function returns an `array` of indices of matching elements within the list.
*
* since - Valkey version 6.0.6.
Expand Down Expand Up @@ -2001,6 +2003,27 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
createGeoAdd(key, membersToGeospatialData, options),
);
}

/**
* Pops a member-score pair from the first non-empty sorted set, with the given `keys`
* being checked in the order they are provided.
*
* See https://valkey.io/commands/zmpop/ for more details.
*
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param count - The number of elements to pop.
*
* Command Response - A two-element `array` containing the key name of the set from which the
* element was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
*
* since Valkey version 7.0.0.
*/
public zmpop(keys: string[], modifier: ScoreFilter, count?: number): T {
return this.addAndReturn(createZMPop(keys, modifier, count));
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions node/tests/RedisClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
InfoOptions,
ProtocolVersion,
Routes,
ScoreFilter,
} from "..";
import { RedisCluster } from "../../utils/TestUtils.js";
import { FlushMode } from "../build-ts/src/commands/FlushMode";
Expand Down Expand Up @@ -323,6 +324,9 @@ describe("GlideClusterClient", () => {

if (!versionLessThan7) {
promises.push(client.zintercard(["abc", "zxy", "lkn"]));
promises.push(
client.zmpop(["abc", "zxy", "lkn"], ScoreFilter.MAX),
);
}

for (const promise of promises) {
Expand Down
66 changes: 66 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
InsertPosition,
ProtocolVersion,
RequestError,
ScoreFilter,
Script,
parseInfoResponse,
} from "../";
Expand Down Expand Up @@ -4545,6 +4546,71 @@ export function runBaseTests<Context>(config: {
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`zmpop test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
if (await checkIfServerVersionLessThan("7.0.0")) return;
const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();
const nonExistingKey = "{key}-0" + uuidv4();
const stringKey = "{key}-string" + uuidv4();

expect(await client.zadd(key1, { a1: 1, b1: 2 })).toEqual(2);
expect(await client.zadd(key2, { a2: 0.1, b2: 0.2 })).toEqual(
2,
);

checkSimple(
await client.zmpop([key1, key2], ScoreFilter.MAX),
).toEqual([key1, { b1: 2 }]);
checkSimple(
await client.zmpop([key2, key1], ScoreFilter.MAX, 10),
).toEqual([key2, { a2: 0.1, b2: 0.2 }]);
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved

expect(await client.zmpop([nonExistingKey], ScoreFilter.MIN))
.toBeNull;
expect(await client.zmpop([nonExistingKey], ScoreFilter.MIN, 1))
.toBeNull;

// key exists, but it is not a sorted set
expect(await client.set(stringKey, "value")).toEqual("OK");
await expect(
client.zmpop([stringKey], ScoreFilter.MAX),
).rejects.toThrow(RequestError);
await expect(
client.zmpop([stringKey], ScoreFilter.MAX, 1),
).rejects.toThrow(RequestError);

// incorrect argument: key list should not be empty
await expect(
client.zmpop([], ScoreFilter.MAX, 1),
).rejects.toThrow(RequestError);

// incorrect argument: count should be greater than 0
await expect(
client.zmpop([key1], ScoreFilter.MAX, 0),
).rejects.toThrow(RequestError);

// check that order of entries in the response is preserved
const entries: Record<string, number> = {};

for (let i = 0; i < 10; i++) {
// a0 => 0, a1 => 1 etc
entries["a" + i] = i;
}

expect(await client.zadd(key2, entries)).toEqual(10);
const result = await client.zmpop([key2], ScoreFilter.MIN, 10);

if (result) {
expect(result[1]).toEqual(entries);
}
}, protocol);
},
config.timeout,
);
}

export function runCommonTests<Context>(config: {
Expand Down
5 changes: 5 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
Logger,
ProtocolVersion,
ReturnType,
ScoreFilter,
Transaction,
} from "..";
import {
Expand Down Expand Up @@ -593,6 +594,10 @@ export async function transactionTest(
args.push(0);
baseTransaction.zintercard([key8, key14], 1);
args.push(0);
baseTransaction.zmpop([key14], ScoreFilter.MAX);
args.push([key14, { two: 2.0 }]);
baseTransaction.zmpop([key14], ScoreFilter.MAX, 1);
args.push([key14, { one: 1.0 }]);
}

baseTransaction.xadd(key9, [["field", "value1"]], { id: "0-1" });
Expand Down
Loading