Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: JSON.MGET. #2514

Open
wants to merge 1 commit into
base: release-1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Java: Added `FT.SEARCH` ([#2439](https://github.com/valkey-io/valkey-glide/pull/2439))
* Java: Added `FT.AGGREGATE` ([#2466](https://github.com/valkey-io/valkey-glide/pull/2466))
* Java: Added `JSON.SET` and `JSON.GET` ([#2462](https://github.com/valkey-io/valkey-glide/pull/2462))
* Java: Added `JSON.MGET` ([#2514](https://github.com/valkey-io/valkey-glide/pull/2514))
* Node: Added `FT.CREATE` ([#2501](https://github.com/valkey-io/valkey-glide/pull/2501))
* Java: Added `JSON.ARRINSERT` and `JSON.ARRLEN` ([#2476](https://github.com/valkey-io/valkey-glide/pull/2476))
* Java: Added `JSON.OBJLEN` and `JSON.OBJKEYS` ([#2492](https://github.com/valkey-io/valkey-glide/pull/2492))
Expand Down
64 changes: 43 additions & 21 deletions glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ pub(crate) fn combine_and_sort_array_results<'a>(
for (key_indices, value) in sorting_order.into_iter().zip(values) {
match value {
Value::Array(values) => {
assert_eq!(values.len(), key_indices.len());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert isn't correct, because key_indices contains argument indices, where not all args are keys. It also produces a useless message if fails:

thread '<unnamed>' panicked at glide-core/redis-rs/redis/src/cluster_routing.rs:269:17:
assertion `left == right` failed
  left: 1
 right: 2
stack backtrace:
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
2024-10-24T19:14:37.701195Z  WARN logger_core: received error - Received connection error `redis_cluster: Unable to receive command`. Will attempt to reconnect

As an option we can restore the assert, but make it producing a meaningful error and check an extra condition shipped in ResponsePolicy::CombineArrays

for (index, value) in key_indices.iter().zip(values) {
results[*index] = value;
}
Expand Down Expand Up @@ -301,26 +300,44 @@ fn multi_shard<R>(
routable: &R,
cmd: &[u8],
first_key_index: usize,
has_values: bool,
values_position: MultiShardValues,
) -> Option<RoutingInfo>
where
R: Routable + ?Sized,
{
let is_readonly = is_readonly_cmd(cmd);
let mut routes = HashMap::new();
let mut key_index = 0;
while let Some(key) = routable.arg_idx(first_key_index + key_index) {
let mut arg_index = 0;

// determine arg count, because it is needed for MultiShardValues::ValueAfterAllKeys
let mut arg_count = first_key_index;
while routable.arg_idx(arg_count).is_some() {
arg_count += 1;
}
let last_arg_idx = arg_count - 1;

while let Some(key) = routable.arg_idx(first_key_index + arg_index) {
let route = get_route(is_readonly, key);
let entry = routes.entry(route);
let keys = entry.or_insert(Vec::new());
keys.push(key_index);

if has_values {
key_index += 1;
routable.arg_idx(first_key_index + key_index)?; // check that there's a value for the key
keys.push(key_index);
let args = entry.or_insert(Vec::new());
args.push(arg_index);

match values_position {
MultiShardValues::ValueAfterEachKey => {
arg_index += 1;
routable.arg_idx(first_key_index + arg_index)?; // check that there's a value for the key
args.push(arg_index);
}
MultiShardValues::ValueAfterAllKeys => {
args.push(last_arg_idx - first_key_index);
// break if we're on the last key (next arg is the value)
if arg_index + first_key_index == last_arg_idx - 1 {
break;
}
}
MultiShardValues::NoValues => (), // no-op
}
key_index += 1;
arg_index += 1;
}

let mut routes: Vec<(Route, Vec<usize>)> = routes.into_iter().collect();
Expand Down Expand Up @@ -358,6 +375,7 @@ impl ResponsePolicy {
b"KEYS"
| b"FT._ALIASLIST"
| b"FT._LIST"
| b"JSON.MGET"
| b"MGET"
| b"SLOWLOG GET"
| b"PUBSUB CHANNELS"
Expand Down Expand Up @@ -390,8 +408,7 @@ enum RouteBy {
AllNodes,
AllPrimaries,
FirstKey,
MultiShardNoValues,
MultiShardWithValues,
MultiShard(MultiShardValues),
Random,
SecondArg,
SecondArgAfterKeyCount,
Expand All @@ -401,6 +418,13 @@ enum RouteBy {
Undefined,
}

/// Defines values' positions in a command line for commands which could be forwarded to multiple shards
enum MultiShardValues {
NoValues, // for example, `MGET key1 key2`
ValueAfterEachKey, // for example, `MSET key1 value1 key2 value2`
ValueAfterAllKeys, // for example, `JSON.MGET key1 key2 key3 value`
}

fn base_routing(cmd: &[u8]) -> RouteBy {
match cmd {
b"ACL SETUSER"
Expand Down Expand Up @@ -454,9 +478,10 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"WAITAOF" => RouteBy::AllPrimaries,

b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" | b"WATCH" => {
RouteBy::MultiShardNoValues
RouteBy::MultiShard(MultiShardValues::NoValues)
}
b"MSET" => RouteBy::MultiShardWithValues,
b"MSET" => RouteBy::MultiShard(MultiShardValues::ValueAfterEachKey),
b"JSON.MGET" => RouteBy::MultiShard(MultiShardValues::ValueAfterAllKeys),

// TODO - special handling - b"SCAN"
b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" => RouteBy::Undefined,
Expand Down Expand Up @@ -572,8 +597,7 @@ impl RoutingInfo {
| RouteBy::ThirdArgAfterKeyCount
| RouteBy::SecondArgSlot
| RouteBy::StreamsIndex
| RouteBy::MultiShardNoValues
| RouteBy::MultiShardWithValues => {
| RouteBy::MultiShard(_) => {
if matches!(cmd, b"SPUBLISH") {
// SPUBLISH does not return MOVED errors within the slot's shard. This means that even if READONLY wasn't sent to a replica,
// executing SPUBLISH FOO BAR on that replica will succeed. This behavior differs from true key-based commands,
Expand Down Expand Up @@ -608,9 +632,7 @@ impl RoutingInfo {
ResponsePolicy::for_command(cmd),
))),

RouteBy::MultiShardWithValues => multi_shard(r, cmd, 1, true),

RouteBy::MultiShardNoValues => multi_shard(r, cmd, 1, false),
RouteBy::MultiShard(values_position) => multi_shard(r, cmd, 1, values_position),

RouteBy::Random => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package glide.api.commands.servermodules;

import static glide.api.models.GlideString.gs;
import static glide.utils.ArrayTransformUtils.castArray;
import static glide.utils.ArrayTransformUtils.concatenateArrays;

import glide.api.BaseClient;
Expand All @@ -22,6 +23,7 @@ public class Json {
private static final String JSON_PREFIX = "JSON.";
private static final String JSON_SET = JSON_PREFIX + "SET";
private static final String JSON_GET = JSON_PREFIX + "GET";
private static final String JSON_MGET = JSON_PREFIX + "MGET";
private static final String JSON_ARRAPPEND = JSON_PREFIX + "ARRAPPEND";
private static final String JSON_ARRINSERT = JSON_PREFIX + "ARRINSERT";
private static final String JSON_ARRLEN = JSON_PREFIX + "ARRLEN";
Expand Down Expand Up @@ -189,11 +191,12 @@ public static CompletableFuture<GlideString> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -226,11 +229,12 @@ public static CompletableFuture<String> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -317,11 +321,12 @@ public static CompletableFuture<GlideString> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -363,11 +368,12 @@ public static CompletableFuture<String> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -396,6 +402,77 @@ public static CompletableFuture<GlideString> get(
new ArgsBuilder().add(gs(JSON_GET)).add(key).add(options.toArgs()).add(paths).toArray());
}

/**
* Retrieves the JSON values at the specified <code>path</code> stored at multiple <code>keys
* </code>.
*
* @apiNote When in cluster mode, the command may route to multiple nodes when <code>keys</code>
* map to different hash slots.
* @param client The client to execute the command.
* @param keys The keys of the JSON documents.
* @param path The path within the JSON documents.
* @return An array with requested values for each key.
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array, if
* path doesn't exist.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>path</code>. If <code>path</code> doesn't exist,
* the corresponding array element will be <code>null</code>.
* </ul>
* If a <code>key</code> doesn't exist, the corresponding array element will be <code>null
* </code>.
* @example
* <pre>{@code
* Json.set(client, "doc1", "$", "{\"a\": 1, \"b\": [\"one\", \"two\"]}").get();
* Json.set(client, "doc2", "$", "{\"a\": 1, \"c\": false}").get();
* var res = Json.mget(client, new String[] { "doc1", "doc2", "doc3" }, "$.c").get();
* assert Arrays.equals(res, new String[] { "[]", "[false]", null });
* }</pre>
*/
public static CompletableFuture<String[]> mget(
@NonNull BaseClient client, @NonNull String[] keys, @NonNull String path) {
return Json.<Object[]>executeCommand(
client, concatenateArrays(new String[] {JSON_MGET}, keys, new String[] {path}))
.thenApply(res -> castArray(res, String.class));
}

/**
* Retrieves the JSON values at the specified <code>path</code> stored at multiple <code>keys
* </code>.
*
* @apiNote When in cluster mode, the command may route to multiple nodes when <code>keys</code>
* map to different hash slots.
* @param client The client to execute the command.
* @param keys The keys of the JSON documents.
* @param path The path within the JSON documents.
* @return An array with requested values for each key.
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array, if
* path doesn't exist.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>path</code>. If <code>path</code> doesn't exist,
* the corresponding array element will be <code>null</code>.
* </ul>
* If a <code>key</code> doesn't exist, the corresponding array element will be <code>null
* </code>.
* @example
* <pre>{@code
* Json.set(client, "doc1", "$", "{\"a\": 1, \"b\": [\"one\", \"two\"]}").get();
* Json.set(client, "doc2", "$", "{\"a\": 1, \"c\": false}").get();
* var res = Json.mget(client, new GlideString[] { "doc1", "doc2", "doc3" }, gs("$.c")).get();
* assert Arrays.equals(res, new GlideString[] { gs("[]"), gs("[false]"), null });
* }</pre>
*/
public static CompletableFuture<GlideString[]> mget(
@NonNull BaseClient client, @NonNull GlideString[] keys, @NonNull GlideString path) {
return Json.<Object[]>executeCommand(
client,
concatenateArrays(new GlideString[] {gs(JSON_MGET)}, keys, new GlideString[] {path}))
.thenApply(res -> castArray(res, GlideString.class));
}

/**
* Appends one or more <code>values</code> to the JSON array at the specified <code>path</code>
* within the JSON document stored at <code>key</code>.
Expand Down
23 changes: 23 additions & 0 deletions java/integTest/src/test/java/glide/modules/JsonTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import glide.api.models.commands.FlushMode;
import glide.api.models.commands.InfoOptions.Section;
import glide.api.models.commands.json.JsonGetOptions;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -383,6 +384,28 @@ public void objkeys() {
assertArrayEquals(new Object[] {gs("a"), gs("b")}, res);
}

@Test
@SneakyThrows
public void mget() {
String key1 = UUID.randomUUID().toString();
String key2 = UUID.randomUUID().toString();
var data =
Map.of(
key1, "{\"a\": 1, \"b\": [\"one\", \"two\"]}",
key2, "{\"a\": 1, \"c\": false}");

for (var entry : data.entrySet()) {
assertEquals("OK", Json.set(client, entry.getKey(), "$", entry.getValue()).get());
}

var res1 =
Json.mget(client, new String[] {key1, key2, UUID.randomUUID().toString()}, "$.c").get();
assertArrayEquals(new String[] {"[]", "[false]", null}, res1);

var res2 = Json.mget(client, new GlideString[] {gs(key1), gs(key2)}, gs(".b[*]")).get();
assertArrayEquals(new GlideString[] {gs("\"one\""), null}, res2);
}

@Test
@SneakyThrows
public void json_forget() {
Expand Down
Loading