diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fbc104b35..fde5a19522 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122)) * Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105)) * Java: Added PUBSUB SHARDCHANNELS command ([#2265](https://github.com/valkey-io/valkey-glide/pull/2265)) +* Java: Added PUBSUB SHARDNUMSUB command ([#2279](https://github.com/valkey-io/valkey-glide/pull/2279)) * Java: Added binary support for custom command ([#2109](https://github.com/valkey-io/valkey-glide/pull/2109)) * Node: Added SSCAN command ([#2132](https://github.com/valkey-io/valkey-glide/pull/2132)) * Node: Added HKEYS command ([#2136](https://github.com/valkey-io/valkey-glide/pull/2136)) diff --git a/java/client/src/main/java/glide/api/GlideClusterClient.java b/java/client/src/main/java/glide/api/GlideClusterClient.java index b86bab6670..ac12215148 100644 --- a/java/client/src/main/java/glide/api/GlideClusterClient.java +++ b/java/client/src/main/java/glide/api/GlideClusterClient.java @@ -27,6 +27,7 @@ import static command_request.CommandRequestOuterClass.RequestType.Lolwut; import static command_request.CommandRequestOuterClass.RequestType.Ping; import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub; import static command_request.CommandRequestOuterClass.RequestType.RandomKey; import static command_request.CommandRequestOuterClass.RequestType.SPublish; import static command_request.CommandRequestOuterClass.RequestType.Sort; @@ -1050,6 +1051,18 @@ public CompletableFuture pubsubShardChannels(@NonNull GlideString response -> castArray(handleArrayResponseBinary(response), GlideString.class)); } + @Override + public CompletableFuture> pubsubShardNumSub(@NonNull String[] channels) { + return commandManager.submitNewCommand(PubSubSNumSub, channels, this::handleMapResponse); + } + + @Override + public CompletableFuture> pubsubShardNumSub( + @NonNull GlideString[] channels) { + return commandManager.submitNewCommand( + PubSubSNumSub, channels, this::handleBinaryStringMapResponse); + } + @Override public CompletableFuture unwatch(@NonNull Route route) { return commandManager.submitNewCommand( diff --git a/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java b/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java index bdee3da4b4..c989c9d7ee 100644 --- a/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java @@ -2,6 +2,7 @@ package glide.api.commands; import glide.api.models.GlideString; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -100,4 +101,38 @@ public interface PubSubClusterCommands { * } */ CompletableFuture pubsubShardChannels(GlideString pattern); + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified shard channels. Note that it is valid to call this command without channels. In this + * case, it will just return an empty map. + * + * @see valkey.io for details. + * @param channels The list of shard channels to query for the number of subscribers. + * @return An Map where keys are the shard channel names and values are the number of + * subscribers. + * @example + *
{@code
+     * Map result = client.pubsubShardNumSub(new String[] {"channel1", "channel2"}).get();
+     * assert result.equals(Map.of("channel1", 3L, "channel2", 5L));
+     * }
+ */ + CompletableFuture> pubsubShardNumSub(String[] channels); + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified shard channels. Note that it is valid to call this command without channels. In this + * case, it will just return an empty map. + * + * @see valkey.io for details. + * @param channels The list of shard channels to query for the number of subscribers. + * @return An Map where keys are the shard channel names and values are the number of + * subscribers. + * @example + *
{@code
+     * Map result = client.pubsubShardNumSub(new GlideString[] {gs("channel1"), gs("channel2")}).get();
+     * assert result.equals(Map.of(gs("channel1"), 3L, gs("channel2"), 5L));
+     * }
+ */ + CompletableFuture> pubsubShardNumSub(GlideString[] channels); } diff --git a/java/client/src/main/java/glide/api/models/ClusterTransaction.java b/java/client/src/main/java/glide/api/models/ClusterTransaction.java index fdfa27b5c1..9a1bb2ef1c 100644 --- a/java/client/src/main/java/glide/api/models/ClusterTransaction.java +++ b/java/client/src/main/java/glide/api/models/ClusterTransaction.java @@ -2,6 +2,7 @@ package glide.api.models; import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub; import static command_request.CommandRequestOuterClass.RequestType.SPublish; import static command_request.CommandRequestOuterClass.RequestType.Sort; import static command_request.CommandRequestOuterClass.RequestType.SortReadOnly; @@ -60,6 +61,24 @@ public ClusterTransaction publish( return getThis(); } + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified shard channels. Note that it is valid to call this command without channels. In this + * case, it will just return an empty map. + * + * @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type + * will throw {@link IllegalArgumentException}. + * @see valkey.io for details. + * @param channels The list of shard channels to query for the number of subscribers. + * @return Command response - An Map where keys are the shard channel names and + * values are the number of subscribers. + */ + public ClusterTransaction pubsubShardNumSub(@NonNull ArgType[] channels) { + checkTypeOrThrow(channels); + protobufTransaction.addCommands(buildCommand(PubSubSNumSub, newArgsBuilder().add(channels))); + return getThis(); + } + /** * Sorts the elements in the list, set, or sorted set at key and returns the result. *
diff --git a/java/client/src/test/java/glide/api/GlideClusterClientTest.java b/java/client/src/test/java/glide/api/GlideClusterClientTest.java index 73c1413624..f049cdbe3c 100644 --- a/java/client/src/test/java/glide/api/GlideClusterClientTest.java +++ b/java/client/src/test/java/glide/api/GlideClusterClientTest.java @@ -26,6 +26,7 @@ import static command_request.CommandRequestOuterClass.RequestType.Lolwut; import static command_request.CommandRequestOuterClass.RequestType.Ping; import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub; import static command_request.CommandRequestOuterClass.RequestType.RandomKey; import static command_request.CommandRequestOuterClass.RequestType.SPublish; import static command_request.CommandRequestOuterClass.RequestType.Sort; @@ -2812,6 +2813,54 @@ public void pubsubShardChannelsBinary_with_pattern_returns_success() { assertEquals(value, payload); } + @SneakyThrows + @Test + public void pubsubShardNumSub_returns_success() { + // setup + String[] arguments = new String[] {"ch1", "ch2"}; + Map value = Map.of(); + + CompletableFuture> testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.>submitNewCommand( + eq(PubSubSNumSub), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture> response = service.pubsubShardNumSub(arguments); + Map payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubShardNumSub_binary_returns_success() { + // setup + GlideString[] arguments = new GlideString[] {gs("ch1"), gs("ch2")}; + Map value = Map.of(); + + CompletableFuture> testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.>submitNewCommand( + eq(PubSubSNumSub), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture> response = service.pubsubShardNumSub(arguments); + Map payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + @SneakyThrows @Test public void sort_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java b/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java index 1f9a6c43af..95f9d3ca52 100644 --- a/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java +++ b/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java @@ -2,6 +2,7 @@ package glide.api.models; import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub; import static command_request.CommandRequestOuterClass.RequestType.SPublish; import static command_request.CommandRequestOuterClass.RequestType.Sort; import static command_request.CommandRequestOuterClass.RequestType.SortReadOnly; @@ -38,6 +39,9 @@ public void cluster_transaction_builds_protobuf_request() { transaction.pubsubShardChannels("test*"); results.add(Pair.of(PubSubSChannels, buildArgs("test*"))); + transaction.pubsubShardNumSub(new String[] {"ch1", "ch2"}); + results.add(Pair.of(PubSubSNumSub, buildArgs("ch1", "ch2"))); + transaction.sortReadOnly( "key1", SortClusterOptions.builder() diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index ffbd29bc17..ff2220c0dd 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -1523,4 +1523,57 @@ public void pubsub_shard_channels() { assertEquals(0, listener.pubsubShardChannels("non_matching_*").get().length); assertEquals(0, listener.pubsubShardChannels(gs("non_matching_*")).get().length); } + + @SneakyThrows + @Test + public void pubsub_shardnumsub() { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + GlideClusterClient client = (GlideClusterClient) createClient(false); + var channels = new String[] {"channel1", "channel2", "channel3"}; + assertEquals( + Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)), + client.pubsubNumSub(channels).get()); + + Map> subscriptions1 = + Map.of( + PubSubClusterChannelMode.SHARDED, + Set.of(gs("channel1"), gs("channel2"), gs("channel3"))); + GlideClusterClient listener1 = + (GlideClusterClient) createClientWithSubscriptions(false, subscriptions1); + + Map> subscriptions2 = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(gs("channel2"), gs("channel3"))); + GlideClusterClient listener2 = + (GlideClusterClient) createClientWithSubscriptions(false, subscriptions2); + + Map> subscriptions3 = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(gs("channel3"))); + GlideClusterClient listener3 = + (GlideClusterClient) createClientWithSubscriptions(false, subscriptions3); + + clients.addAll(List.of(client, listener1, listener2, listener3)); + + var expected = Map.of("channel1", 1L, "channel2", 2L, "channel3", 3L, "channel4", 0L); + assertEquals( + expected, client.pubsubShardNumSub(ArrayUtils.addFirst(channels, "channel4")).get()); + assertEquals( + expected, listener1.pubsubShardNumSub(ArrayUtils.addFirst(channels, "channel4")).get()); + + var expectedGs = + Map.of(gs("channel1"), 1L, gs("channel2"), 2L, gs("channel3"), 3L, gs("channel4"), 0L); + assertEquals( + expectedGs, + client + .pubsubShardNumSub( + new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")}) + .get()); + assertEquals( + expectedGs, + listener2 + .pubsubShardNumSub( + new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")}) + .get()); + } }