Skip to content

Commit

Permalink
Java: add PUBSUB SHARDNUMSUB (valkey-io#2279)
Browse files Browse the repository at this point in the history
* add PUBSUB SHARDNUMSUB in Java

---------

Signed-off-by: James Xin <james.xin@improving.com>
  • Loading branch information
jamesx-improving authored Sep 12, 2024
1 parent e997eea commit 8bf8282
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122))
* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105))
* Java: Added PUBSUB SHARDCHANNELS command ([#2265](https://github.com/valkey-io/valkey-glide/pull/2265))
* Java: Added PUBSUB SHARDNUMSUB command ([#2279](https://github.com/valkey-io/valkey-glide/pull/2279))
* Java: Added binary support for custom command ([#2109](https://github.com/valkey-io/valkey-glide/pull/2109))
* Node: Added SSCAN command ([#2132](https://github.com/valkey-io/valkey-glide/pull/2132))
* Node: Added HKEYS command ([#2136](https://github.com/valkey-io/valkey-glide/pull/2136))
Expand Down
13 changes: 13 additions & 0 deletions java/client/src/main/java/glide/api/GlideClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static command_request.CommandRequestOuterClass.RequestType.Lolwut;
import static command_request.CommandRequestOuterClass.RequestType.Ping;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub;
import static command_request.CommandRequestOuterClass.RequestType.RandomKey;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
Expand Down Expand Up @@ -1050,6 +1051,18 @@ public CompletableFuture<GlideString[]> pubsubShardChannels(@NonNull GlideString
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<Map<String, Long>> pubsubShardNumSub(@NonNull String[] channels) {
return commandManager.submitNewCommand(PubSubSNumSub, channels, this::handleMapResponse);
}

@Override
public CompletableFuture<Map<GlideString, Long>> pubsubShardNumSub(
@NonNull GlideString[] channels) {
return commandManager.submitNewCommand(
PubSubSNumSub, channels, this::handleBinaryStringMapResponse);
}

@Override
public CompletableFuture<String> unwatch(@NonNull Route route) {
return commandManager.submitNewCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package glide.api.commands;

import glide.api.models.GlideString;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -100,4 +101,38 @@ public interface PubSubClusterCommands {
* }</pre>
*/
CompletableFuture<GlideString[]> pubsubShardChannels(GlideString pattern);

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified shard channels. Note that it is valid to call this command without channels. In this
* case, it will just return an empty map.
*
* @see <a href="https://valkey.io/commands/pubsub-shardnumsub/">valkey.io</a> for details.
* @param channels The list of shard channels to query for the number of subscribers.
* @return An <code>Map</code> where keys are the shard channel names and values are the number of
* subscribers.
* @example
* <pre>{@code
* Map<String, Long> result = client.pubsubShardNumSub(new String[] {"channel1", "channel2"}).get();
* assert result.equals(Map.of("channel1", 3L, "channel2", 5L));
* }</pre>
*/
CompletableFuture<Map<String, Long>> pubsubShardNumSub(String[] channels);

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified shard channels. Note that it is valid to call this command without channels. In this
* case, it will just return an empty map.
*
* @see <a href="https://valkey.io/commands/pubsub-shardnumsub/">valkey.io</a> for details.
* @param channels The list of shard channels to query for the number of subscribers.
* @return An <code>Map</code> where keys are the shard channel names and values are the number of
* subscribers.
* @example
* <pre>{@code
* Map<GlideString, Long> result = client.pubsubShardNumSub(new GlideString[] {gs("channel1"), gs("channel2")}).get();
* assert result.equals(Map.of(gs("channel1"), 3L, gs("channel2"), 5L));
* }</pre>
*/
CompletableFuture<Map<GlideString, Long>> pubsubShardNumSub(GlideString[] channels);
}
19 changes: 19 additions & 0 deletions java/client/src/main/java/glide/api/models/ClusterTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package glide.api.models;

import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
import static command_request.CommandRequestOuterClass.RequestType.SortReadOnly;
Expand Down Expand Up @@ -60,6 +61,24 @@ public <ArgType> ClusterTransaction publish(
return getThis();
}

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified shard channels. Note that it is valid to call this command without channels. In this
* case, it will just return an empty map.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type
* will throw {@link IllegalArgumentException}.
* @see <a href="https://valkey.io/commands/pubsub-shardnumsub/">valkey.io</a> for details.
* @param channels The list of shard channels to query for the number of subscribers.
* @return Command response - An <code>Map</code> where keys are the shard channel names and
* values are the number of subscribers.
*/
public <ArgType> ClusterTransaction pubsubShardNumSub(@NonNull ArgType[] channels) {
checkTypeOrThrow(channels);
protobufTransaction.addCommands(buildCommand(PubSubSNumSub, newArgsBuilder().add(channels)));
return getThis();
}

/**
* Sorts the elements in the list, set, or sorted set at <code>key</code> and returns the result.
* <br>
Expand Down
49 changes: 49 additions & 0 deletions java/client/src/test/java/glide/api/GlideClusterClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static command_request.CommandRequestOuterClass.RequestType.Lolwut;
import static command_request.CommandRequestOuterClass.RequestType.Ping;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub;
import static command_request.CommandRequestOuterClass.RequestType.RandomKey;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
Expand Down Expand Up @@ -2812,6 +2813,54 @@ public void pubsubShardChannelsBinary_with_pattern_returns_success() {
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void pubsubShardNumSub_returns_success() {
// setup
String[] arguments = new String[] {"ch1", "ch2"};
Map<String, Long> value = Map.of();

CompletableFuture<Map<String, Long>> testResponse = new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<Map<String, Long>>submitNewCommand(
eq(PubSubSNumSub), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<String, Long>> response = service.pubsubShardNumSub(arguments);
Map<String, Long> payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void pubsubShardNumSub_binary_returns_success() {
// setup
GlideString[] arguments = new GlideString[] {gs("ch1"), gs("ch2")};
Map<GlideString, Long> value = Map.of();

CompletableFuture<Map<GlideString, Long>> testResponse = new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<Map<GlideString, Long>>submitNewCommand(
eq(PubSubSNumSub), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<GlideString, Long>> response = service.pubsubShardNumSub(arguments);
Map<GlideString, Long> payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void sort_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package glide.api.models;

import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSNumSub;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
import static command_request.CommandRequestOuterClass.RequestType.SortReadOnly;
Expand Down Expand Up @@ -38,6 +39,9 @@ public void cluster_transaction_builds_protobuf_request() {
transaction.pubsubShardChannels("test*");
results.add(Pair.of(PubSubSChannels, buildArgs("test*")));

transaction.pubsubShardNumSub(new String[] {"ch1", "ch2"});
results.add(Pair.of(PubSubSNumSub, buildArgs("ch1", "ch2")));

transaction.sortReadOnly(
"key1",
SortClusterOptions.builder()
Expand Down
53 changes: 53 additions & 0 deletions java/integTest/src/test/java/glide/PubSubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1523,4 +1523,57 @@ public void pubsub_shard_channels() {
assertEquals(0, listener.pubsubShardChannels("non_matching_*").get().length);
assertEquals(0, listener.pubsubShardChannels(gs("non_matching_*")).get().length);
}

@SneakyThrows
@Test
public void pubsub_shardnumsub() {
assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7");

// no channels exists yet
GlideClusterClient client = (GlideClusterClient) createClient(false);
var channels = new String[] {"channel1", "channel2", "channel3"};
assertEquals(
Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)),
client.pubsubNumSub(channels).get());

Map<? extends ChannelMode, Set<GlideString>> subscriptions1 =
Map.of(
PubSubClusterChannelMode.SHARDED,
Set.of(gs("channel1"), gs("channel2"), gs("channel3")));
GlideClusterClient listener1 =
(GlideClusterClient) createClientWithSubscriptions(false, subscriptions1);

Map<? extends ChannelMode, Set<GlideString>> subscriptions2 =
Map.of(PubSubClusterChannelMode.SHARDED, Set.of(gs("channel2"), gs("channel3")));
GlideClusterClient listener2 =
(GlideClusterClient) createClientWithSubscriptions(false, subscriptions2);

Map<? extends ChannelMode, Set<GlideString>> subscriptions3 =
Map.of(PubSubClusterChannelMode.SHARDED, Set.of(gs("channel3")));
GlideClusterClient listener3 =
(GlideClusterClient) createClientWithSubscriptions(false, subscriptions3);

clients.addAll(List.of(client, listener1, listener2, listener3));

var expected = Map.of("channel1", 1L, "channel2", 2L, "channel3", 3L, "channel4", 0L);
assertEquals(
expected, client.pubsubShardNumSub(ArrayUtils.addFirst(channels, "channel4")).get());
assertEquals(
expected, listener1.pubsubShardNumSub(ArrayUtils.addFirst(channels, "channel4")).get());

var expectedGs =
Map.of(gs("channel1"), 1L, gs("channel2"), 2L, gs("channel3"), 3L, gs("channel4"), 0L);
assertEquals(
expectedGs,
client
.pubsubShardNumSub(
new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")})
.get());
assertEquals(
expectedGs,
listener2
.pubsubShardNumSub(
new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")})
.get());
}
}

0 comments on commit 8bf8282

Please sign in to comment.