Skip to content

Commit

Permalink
JSON.MGET.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand committed Oct 24, 2024
1 parent b44368e commit 7653331
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Java: Added `FT.SEARCH` ([#2439](https://github.com/valkey-io/valkey-glide/pull/2439))
* Java: Added `FT.AGGREGATE` ([#2466](https://github.com/valkey-io/valkey-glide/pull/2466))
* Java: Added `JSON.SET` and `JSON.GET` ([#2462](https://github.com/valkey-io/valkey-glide/pull/2462))
* Java: Added `JSON.MGET` ([#2514](https://github.com/valkey-io/valkey-glide/pull/2514))
* Node: Added `FT.CREATE` ([#2501](https://github.com/valkey-io/valkey-glide/pull/2501))
* Java: Added `JSON.ARRINSERT` and `JSON.ARRLEN` ([#2476](https://github.com/valkey-io/valkey-glide/pull/2476))
* Java: Added `JSON.OBJLEN` and `JSON.OBJKEYS` ([#2492](https://github.com/valkey-io/valkey-glide/pull/2492))
Expand Down
64 changes: 43 additions & 21 deletions glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ pub(crate) fn combine_and_sort_array_results<'a>(
for (key_indices, value) in sorting_order.into_iter().zip(values) {
match value {
Value::Array(values) => {
assert_eq!(values.len(), key_indices.len());
for (index, value) in key_indices.iter().zip(values) {
results[*index] = value;
}
Expand Down Expand Up @@ -301,26 +300,44 @@ fn multi_shard<R>(
routable: &R,
cmd: &[u8],
first_key_index: usize,
has_values: bool,
values_position: MultiShardValues,
) -> Option<RoutingInfo>
where
R: Routable + ?Sized,
{
let is_readonly = is_readonly_cmd(cmd);
let mut routes = HashMap::new();
let mut key_index = 0;
while let Some(key) = routable.arg_idx(first_key_index + key_index) {
let mut arg_index = 0;

// determine arg count, because it is needed for MultiShardValues::ValueAfterAllKeys
let mut arg_count = first_key_index;
while routable.arg_idx(arg_count).is_some() {
arg_count += 1;
}
let last_arg_idx = arg_count - 1;

while let Some(key) = routable.arg_idx(first_key_index + arg_index) {
let route = get_route(is_readonly, key);
let entry = routes.entry(route);
let keys = entry.or_insert(Vec::new());
keys.push(key_index);

if has_values {
key_index += 1;
routable.arg_idx(first_key_index + key_index)?; // check that there's a value for the key
keys.push(key_index);
let args = entry.or_insert(Vec::new());
args.push(arg_index);

match values_position {
MultiShardValues::ValueAfterEachKey => {
arg_index += 1;
routable.arg_idx(first_key_index + arg_index)?; // check that there's a value for the key
args.push(arg_index);
}
MultiShardValues::ValueAfterAllKeys => {
args.push(last_arg_idx - first_key_index);
// break if we're on the last key (next arg is the value)
if arg_index + first_key_index == last_arg_idx - 1 {
break;
}
}
MultiShardValues::NoValues => (), // no-op
}
key_index += 1;
arg_index += 1;
}

let mut routes: Vec<(Route, Vec<usize>)> = routes.into_iter().collect();
Expand Down Expand Up @@ -358,6 +375,7 @@ impl ResponsePolicy {
b"KEYS"
| b"FT._ALIASLIST"
| b"FT._LIST"
| b"JSON.MGET"
| b"MGET"
| b"SLOWLOG GET"
| b"PUBSUB CHANNELS"
Expand Down Expand Up @@ -390,8 +408,7 @@ enum RouteBy {
AllNodes,
AllPrimaries,
FirstKey,
MultiShardNoValues,
MultiShardWithValues,
MultiShard(MultiShardValues),
Random,
SecondArg,
SecondArgAfterKeyCount,
Expand All @@ -401,6 +418,13 @@ enum RouteBy {
Undefined,
}

/// Defines values' positions in a command line for commands which could be forwarded to multiple shards
enum MultiShardValues {
NoValues, // for example, `MGET key1 key2`
ValueAfterEachKey, // for example, `MSET key1 value1 key2 value2`
ValueAfterAllKeys, // for example, `JSON.MGET key1 key2 key3 value`
}

fn base_routing(cmd: &[u8]) -> RouteBy {
match cmd {
b"ACL SETUSER"
Expand Down Expand Up @@ -454,9 +478,10 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"WAITAOF" => RouteBy::AllPrimaries,

b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" | b"WATCH" => {
RouteBy::MultiShardNoValues
RouteBy::MultiShard(MultiShardValues::NoValues)
}
b"MSET" => RouteBy::MultiShardWithValues,
b"MSET" => RouteBy::MultiShard(MultiShardValues::ValueAfterEachKey),
b"JSON.MGET" => RouteBy::MultiShard(MultiShardValues::ValueAfterAllKeys),

// TODO - special handling - b"SCAN"
b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" => RouteBy::Undefined,
Expand Down Expand Up @@ -572,8 +597,7 @@ impl RoutingInfo {
| RouteBy::ThirdArgAfterKeyCount
| RouteBy::SecondArgSlot
| RouteBy::StreamsIndex
| RouteBy::MultiShardNoValues
| RouteBy::MultiShardWithValues => {
| RouteBy::MultiShard(_) => {
if matches!(cmd, b"SPUBLISH") {
// SPUBLISH does not return MOVED errors within the slot's shard. This means that even if READONLY wasn't sent to a replica,
// executing SPUBLISH FOO BAR on that replica will succeed. This behavior differs from true key-based commands,
Expand Down Expand Up @@ -608,9 +632,7 @@ impl RoutingInfo {
ResponsePolicy::for_command(cmd),
))),

RouteBy::MultiShardWithValues => multi_shard(r, cmd, 1, true),

RouteBy::MultiShardNoValues => multi_shard(r, cmd, 1, false),
RouteBy::MultiShard(values_position) => multi_shard(r, cmd, 1, values_position),

RouteBy::Random => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package glide.api.commands.servermodules;

import static glide.api.models.GlideString.gs;
import static glide.utils.ArrayTransformUtils.castArray;
import static glide.utils.ArrayTransformUtils.concatenateArrays;

import glide.api.BaseClient;
Expand All @@ -22,6 +23,7 @@ public class Json {
private static final String JSON_PREFIX = "JSON.";
private static final String JSON_SET = JSON_PREFIX + "SET";
private static final String JSON_GET = JSON_PREFIX + "GET";
private static final String JSON_MGET = JSON_PREFIX + "MGET";
private static final String JSON_ARRAPPEND = JSON_PREFIX + "ARRAPPEND";
private static final String JSON_ARRINSERT = JSON_PREFIX + "ARRINSERT";
private static final String JSON_ARRLEN = JSON_PREFIX + "ARRLEN";
Expand Down Expand Up @@ -189,11 +191,12 @@ public static CompletableFuture<GlideString> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -226,11 +229,12 @@ public static CompletableFuture<String> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -317,11 +321,12 @@ public static CompletableFuture<GlideString> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -363,11 +368,12 @@ public static CompletableFuture<String> get(
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array,
* if path doesn't exist. If <code>key</code> doesn't exist, returns None.
* if path doesn't exist. If <code>key</code> doesn't exist, returns <code>null
* </code>.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>paths</code>. If <code>paths</code>
* doesn't exist, an error is raised. If <code>key</code> doesn't exist, returns
* None.
* <code>null</code>.
* </ul>
* <li>If multiple paths are given: Returns a stringified JSON, in which each path is a key,
* and it's corresponding value, is the value as if the path was executed in the command
Expand Down Expand Up @@ -396,6 +402,77 @@ public static CompletableFuture<GlideString> get(
new ArgsBuilder().add(gs(JSON_GET)).add(key).add(options.toArgs()).add(paths).toArray());
}

/**
* Retrieves the JSON values at the specified <code>path</code> stored at multiple <code>keys
* </code>.
*
* @apiNote When in cluster mode, the command may route to multiple nodes when <code>keys</code>
* map to different hash slots.
* @param client The client to execute the command.
* @param keys The keys of the JSON documents.
* @param path The path within the JSON documents.
* @return An array with requested values for each key.
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array, if
* path doesn't exist.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>path</code>. If <code>path</code> doesn't exist,
* the corresponding array element will be <code>null</code>.
* </ul>
* If a <code>key</code> doesn't exist, the corresponding array element will be <code>null
* </code>.
* @example
* <pre>{@code
* Json.set(client, "doc1", "$", "{\"a\": 1, \"b\": [\"one\", \"two\"]}").get();
* Json.set(client, "doc2", "$", "{\"a\": 1, \"c\": false}").get();
* var res = Json.mget(client, new String[] { "doc1", "doc2", "doc3" }, "$.c").get();
* assert Arrays.equals(res, new String[] { "[]", "[false]", null });
* }</pre>
*/
public static CompletableFuture<String[]> mget(
@NonNull BaseClient client, @NonNull String[] keys, @NonNull String path) {
return Json.<Object[]>executeCommand(
client, concatenateArrays(new String[] {JSON_MGET}, keys, new String[] {path}))
.thenApply(res -> castArray(res, String.class));
}

/**
* Retrieves the JSON values at the specified <code>path</code> stored at multiple <code>keys
* </code>.
*
* @apiNote When in cluster mode, the command may route to multiple nodes when <code>keys</code>
* map to different hash slots.
* @param client The client to execute the command.
* @param keys The keys of the JSON documents.
* @param path The path within the JSON documents.
* @return An array with requested values for each key.
* <ul>
* <li>For JSONPath (path starts with <code>$</code>): Returns a stringified JSON list
* replies for every possible path, or a string representation of an empty array, if
* path doesn't exist.
* <li>For legacy path (path doesn't start with <code>$</code>): Returns a string
* representation of the value in <code>path</code>. If <code>path</code> doesn't exist,
* the corresponding array element will be <code>null</code>.
* </ul>
* If a <code>key</code> doesn't exist, the corresponding array element will be <code>null
* </code>.
* @example
* <pre>{@code
* Json.set(client, "doc1", "$", "{\"a\": 1, \"b\": [\"one\", \"two\"]}").get();
* Json.set(client, "doc2", "$", "{\"a\": 1, \"c\": false}").get();
* var res = Json.mget(client, new GlideString[] { "doc1", "doc2", "doc3" }, gs("$.c")).get();
* assert Arrays.equals(res, new GlideString[] { gs("[]"), gs("[false]"), null });
* }</pre>
*/
public static CompletableFuture<GlideString[]> mget(
@NonNull BaseClient client, @NonNull GlideString[] keys, @NonNull GlideString path) {
return Json.<Object[]>executeCommand(
client,
concatenateArrays(new GlideString[] {gs(JSON_MGET)}, keys, new GlideString[] {path}))
.thenApply(res -> castArray(res, GlideString.class));
}

/**
* Appends one or more <code>values</code> to the JSON array at the specified <code>path</code>
* within the JSON document stored at <code>key</code>.
Expand Down
23 changes: 23 additions & 0 deletions java/integTest/src/test/java/glide/modules/JsonTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import glide.api.models.commands.FlushMode;
import glide.api.models.commands.InfoOptions.Section;
import glide.api.models.commands.json.JsonGetOptions;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -383,6 +384,28 @@ public void objkeys() {
assertArrayEquals(new Object[] {gs("a"), gs("b")}, res);
}

@Test
@SneakyThrows
public void mget() {
String key1 = UUID.randomUUID().toString();
String key2 = UUID.randomUUID().toString();
var data =
Map.of(
key1, "{\"a\": 1, \"b\": [\"one\", \"two\"]}",
key2, "{\"a\": 1, \"c\": false}");

for (var entry : data.entrySet()) {
assertEquals("OK", Json.set(client, entry.getKey(), "$", entry.getValue()).get());
}

var res1 =
Json.mget(client, new String[] {key1, key2, UUID.randomUUID().toString()}, "$.c").get();
assertArrayEquals(new String[] {"[]", "[false]", null}, res1);

var res2 = Json.mget(client, new GlideString[] {gs(key1), gs(key2)}, gs(".b[*]")).get();
assertArrayEquals(new GlideString[] {gs("\"one\""), null}, res2);
}

@Test
@SneakyThrows
public void json_forget() {
Expand Down

0 comments on commit 7653331

Please sign in to comment.