Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jduo committed Jul 3, 2024
1 parent 839b49a commit f3a0600
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import static glide.api.BaseClient.OK;
import static glide.api.commands.ServerManagementCommands.VERSION_REDIS_API;
import static glide.api.models.GlideString.gs;
import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*
* @see <a href="https://valkey.io/commands/publish/">Pub/Sub Commands</a>
*/
public interface PubSubClusterCommands extends PubSubBaseCommands {
public interface PubSubClusterCommands {

/**
* Publishes message on pubsub channel.
Expand All @@ -17,7 +17,8 @@ public interface PubSubClusterCommands extends PubSubBaseCommands {
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return <code>OK</code>.
* @example
* <pre>{@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ protected ClusterTransaction getThis() {
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return Command response - The number of clients that received the message.
*/
public <ArgType> ClusterTransaction spublish(@NonNull ArgType message, @NonNull ArgType channel, sharded) {
public <ArgType> ClusterTransaction publish(
@NonNull ArgType message, @NonNull ArgType channel, boolean sharded) {
if (!sharded) {
return super.publish(message, channel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration.StandaloneSubscriptionConfigurationBuilder;
import glide.api.models.exceptions.ConfigurationError;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;

import glide.api.models.exceptions.ConfigurationError;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -90,7 +89,8 @@ protected <M extends ChannelMode> void addSubscription(
*/
public B callback(MessageCallback callback, Object context) {
if (context != null && callback == null) {
throw new ConfigurationError("PubSub subscriptions with a context require a callback function to be configured.");
throw new ConfigurationError(
"PubSub subscriptions with a context require a callback function to be configured.");
}
this.callback = Optional.ofNullable(callback);
this.context = Optional.ofNullable(context);
Expand All @@ -105,7 +105,8 @@ public B callback(MessageCallback callback, Object context) {
*/
public B callback(MessageCallback callback) {
if (callback == null && this.context.isPresent()) {
throw new ConfigurationError("PubSub subscriptions with a context require a callback function to be configured.");
throw new ConfigurationError(
"PubSub subscriptions with a context require a callback function to be configured.");
}
this.callback = Optional.ofNullable(callback);
return self();
Expand Down
87 changes: 46 additions & 41 deletions java/integTest/src/test/java/glide/PubSubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -699,25 +699,30 @@ public void three_publishing_clients_same_name_with_sharded(boolean useCallback)
var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), channel, channel);
var shardedMessage = new PubSubMessage(UUID.randomUUID().toString(), channel);
Map<PubSubClusterChannelMode, Set<String>> subscriptionsExact =
Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel));
Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel));
Map<PubSubClusterChannelMode, Set<String>> subscriptionsPattern =
Map.of(PubSubClusterChannelMode.PATTERN, Set.of(channel));
Map.of(PubSubClusterChannelMode.PATTERN, Set.of(channel));
Map<PubSubClusterChannelMode, Set<String>> subscriptionsSharded =
Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel));
Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel));

var listenerExact = !useCallback ? (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsExact) :
(RedisClusterClient) createListener(
false, true, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact);
var listenerExact =
!useCallback
? (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsExact)
: (RedisClusterClient)
createListener(
false, true, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact);

var listenerPattern = !useCallback ?
(RedisClusterClient) createClientWithSubscriptions(false, subscriptionsPattern) :
createListener(
false, true, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern);
var listenerPattern =
!useCallback
? (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsPattern)
: createListener(
false, true, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern);

var listenerSharded = !useCallback ?
(RedisClusterClient) createClientWithSubscriptions(false, subscriptionsSharded) :
createListener(
false, true, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded);
var listenerSharded =
!useCallback
? (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsSharded)
: createListener(
false, true, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded);

clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded));

Expand All @@ -729,37 +734,37 @@ public void three_publishing_clients_same_name_with_sharded(boolean useCallback)

if (!useCallback) {
verifyReceivedPubsubMessages(
Set.of(
Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage),
Pair.of(
PubSubClusterChannelMode.EXACT.ordinal(),
new PubSubMessage(patternMessage.getMessage(), channel))),
listenerExact,
false);
Set.of(
Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage),
Pair.of(
PubSubClusterChannelMode.EXACT.ordinal(),
new PubSubMessage(patternMessage.getMessage(), channel))),
listenerExact,
false);
verifyReceivedPubsubMessages(
Set.of(
Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage),
Pair.of(
PubSubClusterChannelMode.PATTERN.ordinal(),
new PubSubMessage(exactMessage.getMessage(), channel, channel))),
listenerPattern,
false);
Set.of(
Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage),
Pair.of(
PubSubClusterChannelMode.PATTERN.ordinal(),
new PubSubMessage(exactMessage.getMessage(), channel, channel))),
listenerPattern,
false);
verifyReceivedPubsubMessages(
Set.of(Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)),
listenerSharded,
false);
Set.of(Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)),
listenerSharded,
false);
} else {
var expected =
Set.of(
Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage),
Pair.of(
PubSubClusterChannelMode.EXACT.ordinal(),
new PubSubMessage(patternMessage.getMessage(), channel)),
Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage),
Pair.of(
PubSubClusterChannelMode.PATTERN.ordinal(),
new PubSubMessage(exactMessage.getMessage(), channel, channel)),
Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage));
Set.of(
Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage),
Pair.of(
PubSubClusterChannelMode.EXACT.ordinal(),
new PubSubMessage(patternMessage.getMessage(), channel)),
Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage),
Pair.of(
PubSubClusterChannelMode.PATTERN.ordinal(),
new PubSubMessage(exactMessage.getMessage(), channel, channel)),
Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage));

verifyReceivedPubsubMessages(expected, listenerExact, true);
}
Expand Down

0 comments on commit f3a0600

Please sign in to comment.