Skip to content

Commit

Permalink
PubSubMessages should use GlideStrings instead of String
Browse files Browse the repository at this point in the history
  • Loading branch information
jduo committed Jul 4, 2024
1 parent 4854543 commit 9311b9a
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 96 deletions.
10 changes: 5 additions & 5 deletions java/client/src/main/java/glide/api/models/PubSubMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
@EqualsAndHashCode
public class PubSubMessage {
/** An incoming message received. */
private final String message;
private final GlideString message;

/** A name of the originating channel. */
private final String channel;
private final GlideString channel;

/** A pattern matched to the channel name. */
private final Optional<String> pattern;
private final Optional<GlideString> pattern;

public PubSubMessage(String message, String channel, String pattern) {
public PubSubMessage(GlideString message, GlideString channel, GlideString pattern) {
this.message = message;
this.channel = channel;
this.pattern = Optional.ofNullable(pattern);
}

public PubSubMessage(String message, String channel) {
public PubSubMessage(GlideString message, GlideString channel) {
this.message = message;
this.channel = channel;
this.pattern = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
package glide.api.models.configuration;

import glide.api.BaseClient;
import glide.api.models.GlideString;
import glide.api.models.PubSubMessage;
import glide.api.models.configuration.ClusterSubscriptionConfiguration.ClusterSubscriptionConfigurationBuilder;
import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration.StandaloneSubscriptionConfigurationBuilder;
import glide.api.models.exceptions.ConfigurationError;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -70,9 +72,10 @@ public abstract static class BaseSubscriptionConfigurationBuilder<
protected Optional<Object> context = Optional.empty();

protected <M extends ChannelMode> void addSubscription(
Map<M, Set<String>> subscriptions, M mode, String channelOrPattern) {
Map<M, Set<GlideString>> subscriptions, M mode, GlideString channelOrPattern) {
if (!subscriptions.containsKey(mode)) {
subscriptions.put(mode, new HashSet<>());
// Note: Use a LinkedHashSet to preserve order for ease of debugging and unit testing.
subscriptions.put(mode, new LinkedHashSet<>());
}
subscriptions.get(mode).add(channelOrPattern);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import glide.api.models.GlideString;
import lombok.Getter;

/**
Expand Down Expand Up @@ -55,13 +57,13 @@ public enum PubSubClusterChannelMode implements ChannelMode {
* Will be applied via <code>SUBSCRIBE</code>/<code>PSUBSCRIBE</code>/<code>SSUBSCRIBE</code>
* commands during connection establishment.
*/
private final Map<PubSubClusterChannelMode, Set<String>> subscriptions;
private final Map<PubSubClusterChannelMode, Set<GlideString>> subscriptions;

// All code below is a custom implementation of `SuperBuilder`
private ClusterSubscriptionConfiguration(
Optional<MessageCallback> callback,
Optional<Object> context,
Map<PubSubClusterChannelMode, Set<String>> subscriptions) {
Map<PubSubClusterChannelMode, Set<GlideString>> subscriptions) {
super(callback, context);
this.subscriptions = subscriptions;
}
Expand All @@ -77,15 +79,15 @@ public static final class ClusterSubscriptionConfigurationBuilder

private ClusterSubscriptionConfigurationBuilder() {}

private Map<PubSubClusterChannelMode, Set<String>> subscriptions = new HashMap<>(3);
private Map<PubSubClusterChannelMode, Set<GlideString>> subscriptions = new HashMap<>(3);

/**
* Add a subscription to a channel or to multiple channels if {@link
* PubSubClusterChannelMode#PATTERN} is used.<br>
* See {@link ClusterSubscriptionConfiguration#subscriptions}.
*/
public ClusterSubscriptionConfigurationBuilder subscription(
PubSubClusterChannelMode mode, String channelOrPattern) {
PubSubClusterChannelMode mode, GlideString channelOrPattern) {
addSubscription(subscriptions, mode, channelOrPattern);
return this;
}
Expand All @@ -95,7 +97,7 @@ public ClusterSubscriptionConfigurationBuilder subscription(
* See {@link ClusterSubscriptionConfiguration#subscriptions}.
*/
public ClusterSubscriptionConfigurationBuilder subscriptions(
Map<PubSubClusterChannelMode, Set<String>> subscriptions) {
Map<PubSubClusterChannelMode, Set<GlideString>> subscriptions) {
this.subscriptions = subscriptions;
return this;
}
Expand All @@ -106,7 +108,7 @@ public ClusterSubscriptionConfigurationBuilder subscriptions(
* See {@link ClusterSubscriptionConfiguration#subscriptions}.
*/
public ClusterSubscriptionConfigurationBuilder subscriptions(
PubSubClusterChannelMode mode, Set<String> subscriptions) {
PubSubClusterChannelMode mode, Set<GlideString> subscriptions) {
this.subscriptions.put(mode, subscriptions);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
package glide.api.models.configuration;

import glide.api.RedisClient;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import glide.api.models.GlideString;
import lombok.Getter;

/**
Expand Down Expand Up @@ -48,13 +50,13 @@ public enum PubSubChannelMode implements ChannelMode {
* Will be applied via <code>SUBSCRIBE</code>/<code>PSUBSCRIBE</code> commands during connection
* establishment.
*/
private final Map<PubSubChannelMode, Set<String>> subscriptions;
private final Map<PubSubChannelMode, Set<GlideString>> subscriptions;

// All code below is a custom implementation of `SuperBuilder`
public StandaloneSubscriptionConfiguration(
Optional<MessageCallback> callback,
Optional<Object> context,
Map<PubSubChannelMode, Set<String>> subscriptions) {
Map<PubSubChannelMode, Set<GlideString>> subscriptions) {
super(callback, context);
this.subscriptions = subscriptions;
}
Expand All @@ -70,15 +72,16 @@ public static final class StandaloneSubscriptionConfigurationBuilder

private StandaloneSubscriptionConfigurationBuilder() {}

private Map<PubSubChannelMode, Set<String>> subscriptions = new HashMap<>(2);
// Note: Use a LinkedHashMap to preserve order for ease of debugging and unit testing.
private Map<PubSubChannelMode, Set<GlideString>> subscriptions = new LinkedHashMap<>(2);

/**
* Add a subscription to a channel or to multiple channels if {@link PubSubChannelMode#PATTERN}
* is used.<br>
* See {@link StandaloneSubscriptionConfiguration#subscriptions}.
*/
public StandaloneSubscriptionConfigurationBuilder subscription(
PubSubChannelMode mode, String channelOrPattern) {
PubSubChannelMode mode, GlideString channelOrPattern) {
addSubscription(subscriptions, mode, channelOrPattern);
return self();
}
Expand All @@ -88,7 +91,7 @@ public StandaloneSubscriptionConfigurationBuilder subscription(
* See {@link StandaloneSubscriptionConfiguration#subscriptions}.
*/
public StandaloneSubscriptionConfigurationBuilder subscriptions(
Map<PubSubChannelMode, Set<String>> subscriptions) {
Map<PubSubChannelMode, Set<GlideString>> subscriptions) {
this.subscriptions = subscriptions;
return this;
}
Expand All @@ -99,7 +102,7 @@ public StandaloneSubscriptionConfigurationBuilder subscriptions(
* See {@link StandaloneSubscriptionConfiguration#subscriptions}.
*/
public StandaloneSubscriptionConfigurationBuilder subscriptions(
PubSubChannelMode mode, Set<String> subscriptions) {
PubSubChannelMode mode, Set<GlideString> subscriptions) {
this.subscriptions.put(mode, subscriptions);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.connectors.handlers;

import static glide.api.models.GlideString.gs;

import glide.api.logging.Logger;
import glide.api.models.GlideString;
import glide.api.models.PubSubMessage;
import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback;
import glide.api.models.exceptions.RedisException;
Expand Down Expand Up @@ -59,11 +62,11 @@ public void handle(Response response) {
"Transport disconnected, messages might be lost");
break;
case PMessage:
handle(new PubSubMessage((String) values[2], (String) values[1], (String) values[0]));
handle(new PubSubMessage(gs((String) values[2]), gs((String) values[1]), gs((String) values[0])));
return;
case Message:
case SMessage:
handle(new PubSubMessage((String) values[1], (String) values[0]));
handle(new PubSubMessage(gs((String) values[1]), gs((String) values[0])));
return;
case Subscribe:
case PSubscribe:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClient(
var subscriptionsBuilder = PubSubSubscriptions.newBuilder();
for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) {
var channelsBuilder = PubSubChannelsOrPatterns.newBuilder();
System.out.println(entry.getValue());
for (var channel : entry.getValue()) {
channelsBuilder.addChannelsOrPatterns(ByteString.copyFromUtf8(channel));
channelsBuilder.addChannelsOrPatterns(ByteString.copyFrom(channel.getBytes()));
}
subscriptionsBuilder.putChannelsOrPatternsByType(
entry.getKey().ordinal(), channelsBuilder.build());
Expand Down Expand Up @@ -178,7 +179,7 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClusterClien
for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) {
var channelsBuilder = PubSubChannelsOrPatterns.newBuilder();
for (var channel : entry.getValue()) {
channelsBuilder.addChannelsOrPatterns(ByteString.copyFromUtf8(channel));
channelsBuilder.addChannelsOrPatterns(ByteString.copyFrom(channel.getBytes()));
}
subscriptionsBuilder.putChannelsOrPatternsByType(
entry.getKey().ordinal(), channelsBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.managers;

import static glide.api.models.GlideString.gs;
import static glide.api.models.configuration.NodeAddress.DEFAULT_HOST;
import static glide.api.models.configuration.NodeAddress.DEFAULT_PORT;
import static glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode.EXACT;
Expand Down Expand Up @@ -144,9 +145,9 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
.clientName(CLIENT_NAME)
.subscriptionConfiguration(
StandaloneSubscriptionConfiguration.builder()
.subscription(EXACT, "channel_1")
.subscription(EXACT, "channel_2")
.subscription(PATTERN, "*chatRoom*")
.subscription(EXACT, gs("channel_1"))
.subscription(EXACT, gs("channel_2"))
.subscription(PATTERN, gs("*chatRoom*"))
.build())
.build();
ConnectionRequest expectedProtobufConnectionRequest =
Expand Down Expand Up @@ -181,12 +182,12 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
Map.of(
EXACT.ordinal(),
PubSubChannelsOrPatterns.newBuilder()
.addChannelsOrPatterns(ByteString.copyFromUtf8("channel_1"))
.addChannelsOrPatterns(ByteString.copyFromUtf8("channel_2"))
.addChannelsOrPatterns(ByteString.copyFrom(gs("channel_1").getBytes()))
.addChannelsOrPatterns(ByteString.copyFrom(gs("channel_2").getBytes()))
.build(),
PATTERN.ordinal(),
PubSubChannelsOrPatterns.newBuilder()
.addChannelsOrPatterns(ByteString.copyFromUtf8("*chatRoom*"))
.addChannelsOrPatterns(ByteString.copyFrom(gs("*chatRoom*").getBytes()))
.build()))
.build())
.build();
Expand Down
Loading

0 comments on commit 9311b9a

Please sign in to comment.