From af7ae4ece7ac6b0272c64c98e75d8bd74f7cb2b6 Mon Sep 17 00:00:00 2001 From: James Duong Date: Thu, 4 Jul 2024 22:51:27 -0700 Subject: [PATCH] Java: Address pubsub PR comments (#1773) * Remove spublish and provide two overloads for publish * Change publish argument order to match Python * Explicitly prevent the context being non-null and the callback being null * Parameterize three_publishing_clients_same_name_with_sharded * Address PR feedback * Add GlideString versions of pubsub commands Also add GlideString version of OK constant (named TOK from Python) * PubSubMessages should use GlideStrings instead of String * Spotless * Address PR comments * Use GlideString for MessageHandler responses * Have GlideString overloads for PubSub comamnds return String instead of GlideString OKs. * Have GlideString log byte array hash * Add pubsub MaxSize integration tests (disabled) Add the following tests from Python: * pubsub_exact_max_size_message * pubsub_sharded_max_size_message * pubsub_exact_max_size_message_callback * pubsub_sharded_max_size_message_callback * Improve exception logging. Supply a message with the exception being logged and fix the output when printing an exception * Add exception-from-callback test * Make the callback exception error handling test pass * PubSub Test fixes * Senders do not need subscriptions/callbacks. * Correct validating the pattern by checking that the optional is empty instead of null. * Use 512MB messages for max-size tests * Make client cleanup match the other tests * Add pub-sub testing with non-UTF-8 data * Simplify the non-String PubSub test * Remove stale TODOs * Change pubsub callback exception handling to pass through Netty Exceptions from the callback now escape the netty handler. This will get Netty to log it. We also explicitly log the the Glide log and System.err --- .../src/main/java/glide/api/BaseClient.java | 19 +- .../java/glide/api/RedisClusterClient.java | 50 +- .../api/commands/PubSubBaseCommands.java | 20 +- .../api/commands/PubSubClusterCommands.java | 29 +- .../main/java/glide/api/logging/Logger.java | 52 +- .../glide/api/models/BaseTransaction.java | 4 +- .../glide/api/models/ClusterTransaction.java | 10 +- .../java/glide/api/models/GlideString.java | 6 +- .../java/glide/api/models/PubSubMessage.java | 10 +- .../BaseSubscriptionConfiguration.java | 21 +- .../ClusterSubscriptionConfiguration.java | 13 +- .../StandaloneSubscriptionConfiguration.java | 16 +- .../handlers/CallbackDispatcher.java | 2 +- .../connectors/handlers/MessageHandler.java | 35 +- .../connectors/handlers/ReadHandler.java | 16 +- .../ffi/resolvers/RedisValueResolver.java | 6 +- .../glide/managers/ConnectionManager.java | 4 +- .../test/java/glide/api/RedisClientTest.java | 2 +- .../glide/api/RedisClusterClientTest.java | 2 +- .../api/models/ClusterTransactionTests.java | 2 +- .../glide/api/models/TransactionTests.java | 2 +- .../glide/managers/ConnectionManagerTest.java | 16 +- java/integTest/build.gradle | 6 +- .../src/test/java/glide/PubSubTests.java | 601 ++++++++++++------ .../java/glide/TransactionTestUtilities.java | 4 +- .../cluster/ClusterTransactionTests.java | 2 +- 26 files changed, 674 insertions(+), 276 deletions(-) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index ef757e119b..831980e29c 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -432,12 +432,12 @@ public void close() throws ExecutionException { protected static MessageHandler buildMessageHandler(BaseClientConfiguration config) { if (config.getSubscriptionConfiguration() == null) { - return new MessageHandler(Optional.empty(), Optional.empty(), responseResolver); + return new MessageHandler(Optional.empty(), Optional.empty(), binaryResponseResolver); } return new MessageHandler( config.getSubscriptionConfiguration().getCallback(), config.getSubscriptionConfiguration().getContext(), - responseResolver); + binaryResponseResolver); } protected static ChannelHandler buildChannelHandler( @@ -3846,7 +3846,7 @@ public CompletableFuture> lcsIdxWithMatchLen( } @Override - public CompletableFuture publish(@NonNull String channel, @NonNull String message) { + public CompletableFuture publish(@NonNull String message, @NonNull String channel) { return commandManager.submitNewCommand( Publish, new String[] {channel, message}, @@ -3857,6 +3857,19 @@ public CompletableFuture publish(@NonNull String channel, @NonNull Strin }); } + @Override + public CompletableFuture publish( + @NonNull GlideString message, @NonNull GlideString channel) { + return commandManager.submitNewCommand( + Publish, + new GlideString[] {channel, message}, + response -> { + // Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO + handleLongResponse(response); + return OK; + }); + } + @Override public CompletableFuture watch(@NonNull String[] keys) { return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse); diff --git a/java/client/src/main/java/glide/api/RedisClusterClient.java b/java/client/src/main/java/glide/api/RedisClusterClient.java index 194fb15bf0..b848b3e5b2 100644 --- a/java/client/src/main/java/glide/api/RedisClusterClient.java +++ b/java/client/src/main/java/glide/api/RedisClusterClient.java @@ -1,7 +1,6 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api; -import static glide.api.BaseClient.OK; import static glide.api.commands.ServerManagementCommands.VERSION_REDIS_API; import static glide.api.models.GlideString.gs; import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING; @@ -989,6 +988,39 @@ public CompletableFuture>>> functio response -> handleFunctionStatsBinaryResponse(response, route instanceof SingleNodeRoute)); } + public CompletableFuture publish( + @NonNull String message, @NonNull String channel, boolean sharded) { + if (!sharded) { + return publish(message, channel); + } + + return commandManager.submitNewCommand( + SPublish, + new String[] {channel, message}, + response -> { + // Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO + handleLongResponse(response); + return OK; + }); + } + + @Override + public CompletableFuture publish( + @NonNull GlideString message, @NonNull GlideString channel, boolean sharded) { + if (!sharded) { + return publish(message, channel); + } + + return commandManager.submitNewCommand( + SPublish, + new GlideString[] {channel, message}, + response -> { + // Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO + handleLongResponse(response); + return OK; + }); + } + @Override public CompletableFuture unwatch(@NonNull Route route) { return commandManager.submitNewCommand( @@ -1040,18 +1072,6 @@ public CompletableFuture scan(ClusterScanCursor cursor, ScanOptions op result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]}); } - @Override - public CompletableFuture spublish(@NonNull String channel, @NonNull String message) { - return commandManager.submitNewCommand( - SPublish, - new String[] {channel, message}, - response -> { - // Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO - handleLongResponse(response); - return OK; - }); - } - @Override public CompletableFuture sort( @NonNull String key, @NonNull SortClusterOptions sortClusterOptions) { @@ -1165,8 +1185,8 @@ private void internalClose() { Logger.log( Logger.Level.ERROR, "ClusterScanCursor", - () -> "Error releasing cursor " + cursorHandle + ": " + ex.getMessage()); - Logger.log(Logger.Level.ERROR, "ClusterScanCursor", ex); + () -> "Error releasing cursor " + cursorHandle, + ex); } finally { // Mark the cursor as closed to avoid double-free (if close() gets called more than once). isClosed = true; diff --git a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java index 564f80a144..26f856418d 100644 --- a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java @@ -1,6 +1,7 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.commands; +import glide.api.models.GlideString; import java.util.concurrent.CompletableFuture; /** @@ -14,14 +15,29 @@ public interface PubSubBaseCommands { * Publishes message on pubsub channel. * * @see valkey.io for details. + * @param message The message to publish. * @param channel The channel to publish the message on. + * @return OK. + * @example + *
{@code
+     * String response = client.publish("The cat said 'meow'!", "announcements").get();
+     * assert response.equals("OK");
+     * }
+ */ + CompletableFuture publish(String message, String channel); + + /** + * Publishes message on pubsub channel. + * + * @see valkey.io for details. * @param message The message to publish. + * @param channel The channel to publish the message on. * @return OK. * @example *
{@code
-     * String response = client.publish("announcements", "The cat said 'meow'!").get();
+     * String response = client.publish(gs("The cat said 'meow'!"), gs("announcements")).get();
      * assert response.equals("OK");
      * }
*/ - CompletableFuture publish(String channel, String message); + CompletableFuture publish(GlideString message, GlideString channel); } diff --git a/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java b/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java index 4039c8ddae..564b97dffc 100644 --- a/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java @@ -1,6 +1,7 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.commands; +import glide.api.models.GlideString; import java.util.concurrent.CompletableFuture; /** @@ -11,18 +12,38 @@ public interface PubSubClusterCommands { /** - * Publishes message on pubsub channel in sharded mode. + * Publishes message on pubsub channel. * * @since Redis 7.0 and above. - * @see valkey.io for details. + * @see valkey.io for details. + * @param message The message to publish. * @param channel The channel to publish the message on. + * @param sharded Indicates that this should be run in sharded mode. Setting sharded + * to true is only applicable with Redis 7.0+. + * @return OK. + * @example + *
{@code
+     * String response = client.publish("The cat said 'meow'!", "announcements", true).get();
+     * assert response.equals("OK");
+     * }
+ */ + CompletableFuture publish(String message, String channel, boolean sharded); + + /** + * Publishes message on pubsub channel. + * + * @since Redis 7.0 and above. + * @see valkey.io for details. * @param message The message to publish. + * @param channel The channel to publish the message on. + * @param sharded Indicates that this should be run in sharded mode. Setting sharded + * to true is only applicable with Redis 7.0+. * @return OK. * @example *
{@code
-     * String response = client.spublish("announcements", "The cat said 'meow'!").get();
+     * String response = client.publish(gs("The cat said 'meow'!"), gs("announcements"), true).get();
      * assert response.equals("OK");
      * }
*/ - CompletableFuture spublish(String channel, String message); + CompletableFuture publish(GlideString message, GlideString channel, boolean sharded); } diff --git a/java/client/src/main/java/glide/api/logging/Logger.java b/java/client/src/main/java/glide/api/logging/Logger.java index 2b711b8d68..97f5503487 100644 --- a/java/client/src/main/java/glide/api/logging/Logger.java +++ b/java/client/src/main/java/glide/api/logging/Logger.java @@ -4,9 +4,9 @@ import static glide.ffi.resolvers.LoggerResolver.initInternal; import static glide.ffi.resolvers.LoggerResolver.logInternal; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.function.Supplier; import lombok.Getter; import lombok.NonNull; @@ -178,24 +178,44 @@ public static void log( * * @param level The log level of the provided message. * @param logIdentifier The log identifier should give the log a context. + * @param message The message to log with the exception. * @param throwable The exception or error to log. */ public static void log( - @NonNull Level level, @NonNull String logIdentifier, @NonNull Throwable throwable) { + @NonNull Level level, + @NonNull String logIdentifier, + @NonNull String message, + @NonNull Throwable throwable) { + // TODO: Add the corresponding API to Python and Node.js. + log(level, logIdentifier, () -> message + ": " + prettyPrintException(throwable)); + } + + /** + * Logs the provided exception or error if the provided log level is lower than the logger level. + * + * @param level The log level of the provided message. + * @param logIdentifier The log identifier should give the log a context. + * @param message The message to log with the exception. + * @param throwable The exception or error to log. + */ + public static void log( + @NonNull Level level, + @NonNull String logIdentifier, + @NonNull Supplier message, + @NonNull Throwable throwable) { // TODO: Add the corresponding API to Python and Node.js. - log( - level, - logIdentifier, - () -> { - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - PrintStream printStream = new PrintStream(outputStream)) { - throwable.printStackTrace(printStream); - return printStream.toString(); - } catch (IOException e) { - // This can't happen with a ByteArrayOutputStream. - return null; - } - }); + log(level, logIdentifier, () -> message.get() + ": " + prettyPrintException(throwable)); + } + + private static String prettyPrintException(@NonNull Throwable throwable) { + try (StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter)) { + throwable.printStackTrace(printWriter); + return stringWriter.toString(); + } catch (IOException e) { + // This can't happen with a ByteArrayOutputStream. + return null; + } } /** diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 3b7eccacb8..2941520556 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -5937,11 +5937,11 @@ public T lcsLen(@NonNull ArgType key1, @NonNull ArgType key2) { * @implNote ArgType is limited to String or GlideString, any other type will throw * IllegalArgumentException * @see valkey.io for details. - * @param channel The channel to publish the message on. * @param message The message to publish. + * @param channel The channel to publish the message on. * @return Command response - The number of clients that received the message. */ - public T publish(@NonNull ArgType channel, @NonNull ArgType message) { + public T publish(@NonNull ArgType message, @NonNull ArgType channel) { checkTypeOrThrow(channel); protobufTransaction.addCommands( buildCommand(Publish, newArgsBuilder().add(channel).add(message))); diff --git a/java/client/src/main/java/glide/api/models/ClusterTransaction.java b/java/client/src/main/java/glide/api/models/ClusterTransaction.java index 1c47ee58ce..a13bacd476 100644 --- a/java/client/src/main/java/glide/api/models/ClusterTransaction.java +++ b/java/client/src/main/java/glide/api/models/ClusterTransaction.java @@ -40,11 +40,17 @@ protected ClusterTransaction getThis() { * @implNote ArgType is limited to String or GlideString, any other type will throw * IllegalArgumentException * @see valkey.io for details. - * @param channel The channel to publish the message on. * @param message The message to publish. + * @param channel The channel to publish the message on. + * @param sharded Indicates that this should be run in sharded mode. Setting sharded + * to true is only applicable with Redis 7.0+. * @return Command response - The number of clients that received the message. */ - public ClusterTransaction spublish(@NonNull ArgType channel, @NonNull ArgType message) { + public ClusterTransaction publish( + @NonNull ArgType message, @NonNull ArgType channel, boolean sharded) { + if (!sharded) { + return super.publish(message, channel); + } checkTypeOrThrow(channel); protobufTransaction.addCommands( buildCommand(SPublish, newArgsBuilder().add(channel).add(message))); diff --git a/java/client/src/main/java/glide/api/models/GlideString.java b/java/client/src/main/java/glide/api/models/GlideString.java index b2fb6c78bc..d975485f60 100644 --- a/java/client/src/main/java/glide/api/models/GlideString.java +++ b/java/client/src/main/java/glide/api/models/GlideString.java @@ -64,8 +64,10 @@ public String getString() { return string; } - assert canConvertToString() : "Value cannot be represented as a string"; - return string; + if (canConvertToString()) { + return string; + } + return String.format("Value not convertible to string: byte[] %d", Arrays.hashCode(bytes)); } public int compareTo(GlideString o) { diff --git a/java/client/src/main/java/glide/api/models/PubSubMessage.java b/java/client/src/main/java/glide/api/models/PubSubMessage.java index c3109956f1..f45add3f63 100644 --- a/java/client/src/main/java/glide/api/models/PubSubMessage.java +++ b/java/client/src/main/java/glide/api/models/PubSubMessage.java @@ -10,21 +10,21 @@ @EqualsAndHashCode public class PubSubMessage { /** An incoming message received. */ - private final String message; + private final GlideString message; /** A name of the originating channel. */ - private final String channel; + private final GlideString channel; /** A pattern matched to the channel name. */ - private final Optional pattern; + private final Optional pattern; - public PubSubMessage(String message, String channel, String pattern) { + public PubSubMessage(GlideString message, GlideString channel, GlideString pattern) { this.message = message; this.channel = channel; this.pattern = Optional.ofNullable(pattern); } - public PubSubMessage(String message, String channel) { + public PubSubMessage(GlideString message, GlideString channel) { this.message = message; this.channel = channel; this.pattern = Optional.empty(); diff --git a/java/client/src/main/java/glide/api/models/configuration/BaseSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/BaseSubscriptionConfiguration.java index 12548cb9f0..d97f8f64a8 100644 --- a/java/client/src/main/java/glide/api/models/configuration/BaseSubscriptionConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/BaseSubscriptionConfiguration.java @@ -2,12 +2,14 @@ package glide.api.models.configuration; import glide.api.BaseClient; +import glide.api.models.GlideString; import glide.api.models.PubSubMessage; import glide.api.models.configuration.ClusterSubscriptionConfiguration.ClusterSubscriptionConfigurationBuilder; import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode; import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode; import glide.api.models.configuration.StandaloneSubscriptionConfiguration.StandaloneSubscriptionConfigurationBuilder; -import java.util.HashSet; +import glide.api.models.exceptions.ConfigurationError; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -69,9 +71,10 @@ public abstract static class BaseSubscriptionConfigurationBuilder< protected Optional context = Optional.empty(); protected void addSubscription( - Map> subscriptions, M mode, String channelOrPattern) { + Map> subscriptions, M mode, GlideString channelOrPattern) { if (!subscriptions.containsKey(mode)) { - subscriptions.put(mode, new HashSet<>()); + // Note: Use a LinkedHashSet to preserve order for ease of debugging and unit testing. + subscriptions.put(mode, new LinkedHashSet<>()); } subscriptions.get(mode).add(channelOrPattern); } @@ -83,10 +86,14 @@ protected void addSubscription( /** * Set a callback and a context. * - * @param callback The {@link #callback}. + * @param callback The {@link #callback}. This can be null to unset the callback. * @param context The {@link #context}. */ public B callback(MessageCallback callback, Object context) { + if (context != null && callback == null) { + throw new ConfigurationError( + "PubSub subscriptions with a context require a callback function to be configured."); + } this.callback = Optional.ofNullable(callback); this.context = Optional.ofNullable(context); return self(); @@ -96,9 +103,13 @@ public B callback(MessageCallback callback, Object context) { * Set a callback without context. null will be supplied to all callback calls as a * context. * - * @param callback The {@link #callback}. + * @param callback The {@link #callback}. This can be null to unset the callback. */ public B callback(MessageCallback callback) { + if (callback == null && this.context.isPresent()) { + throw new ConfigurationError( + "PubSub subscriptions with a context require a callback function to be configured."); + } this.callback = Optional.ofNullable(callback); return self(); } diff --git a/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java index 72293b9956..31a169b8c6 100644 --- a/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java @@ -2,6 +2,7 @@ package glide.api.models.configuration; import glide.api.RedisClusterClient; +import glide.api.models.GlideString; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -55,13 +56,13 @@ public enum PubSubClusterChannelMode implements ChannelMode { * Will be applied via SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE * commands during connection establishment. */ - private final Map> subscriptions; + private final Map> subscriptions; // All code below is a custom implementation of `SuperBuilder` private ClusterSubscriptionConfiguration( Optional callback, Optional context, - Map> subscriptions) { + Map> subscriptions) { super(callback, context); this.subscriptions = subscriptions; } @@ -77,7 +78,7 @@ public static final class ClusterSubscriptionConfigurationBuilder private ClusterSubscriptionConfigurationBuilder() {} - private Map> subscriptions = new HashMap<>(3); + private Map> subscriptions = new HashMap<>(3); /** * Add a subscription to a channel or to multiple channels if {@link @@ -85,7 +86,7 @@ private ClusterSubscriptionConfigurationBuilder() {} * See {@link ClusterSubscriptionConfiguration#subscriptions}. */ public ClusterSubscriptionConfigurationBuilder subscription( - PubSubClusterChannelMode mode, String channelOrPattern) { + PubSubClusterChannelMode mode, GlideString channelOrPattern) { addSubscription(subscriptions, mode, channelOrPattern); return this; } @@ -95,7 +96,7 @@ public ClusterSubscriptionConfigurationBuilder subscription( * See {@link ClusterSubscriptionConfiguration#subscriptions}. */ public ClusterSubscriptionConfigurationBuilder subscriptions( - Map> subscriptions) { + Map> subscriptions) { this.subscriptions = subscriptions; return this; } @@ -106,7 +107,7 @@ public ClusterSubscriptionConfigurationBuilder subscriptions( * See {@link ClusterSubscriptionConfiguration#subscriptions}. */ public ClusterSubscriptionConfigurationBuilder subscriptions( - PubSubClusterChannelMode mode, Set subscriptions) { + PubSubClusterChannelMode mode, Set subscriptions) { this.subscriptions.put(mode, subscriptions); return this; } diff --git a/java/client/src/main/java/glide/api/models/configuration/StandaloneSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/StandaloneSubscriptionConfiguration.java index c3bd24c4ce..0f9bdd88ff 100644 --- a/java/client/src/main/java/glide/api/models/configuration/StandaloneSubscriptionConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/StandaloneSubscriptionConfiguration.java @@ -2,7 +2,8 @@ package glide.api.models.configuration; import glide.api.RedisClient; -import java.util.HashMap; +import glide.api.models.GlideString; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -48,13 +49,13 @@ public enum PubSubChannelMode implements ChannelMode { * Will be applied via SUBSCRIBE/PSUBSCRIBE commands during connection * establishment. */ - private final Map> subscriptions; + private final Map> subscriptions; // All code below is a custom implementation of `SuperBuilder` public StandaloneSubscriptionConfiguration( Optional callback, Optional context, - Map> subscriptions) { + Map> subscriptions) { super(callback, context); this.subscriptions = subscriptions; } @@ -70,7 +71,8 @@ public static final class StandaloneSubscriptionConfigurationBuilder private StandaloneSubscriptionConfigurationBuilder() {} - private Map> subscriptions = new HashMap<>(2); + // Note: Use a LinkedHashMap to preserve order for ease of debugging and unit testing. + private Map> subscriptions = new LinkedHashMap<>(2); /** * Add a subscription to a channel or to multiple channels if {@link PubSubChannelMode#PATTERN} @@ -78,7 +80,7 @@ private StandaloneSubscriptionConfigurationBuilder() {} * See {@link StandaloneSubscriptionConfiguration#subscriptions}. */ public StandaloneSubscriptionConfigurationBuilder subscription( - PubSubChannelMode mode, String channelOrPattern) { + PubSubChannelMode mode, GlideString channelOrPattern) { addSubscription(subscriptions, mode, channelOrPattern); return self(); } @@ -88,7 +90,7 @@ public StandaloneSubscriptionConfigurationBuilder subscription( * See {@link StandaloneSubscriptionConfiguration#subscriptions}. */ public StandaloneSubscriptionConfigurationBuilder subscriptions( - Map> subscriptions) { + Map> subscriptions) { this.subscriptions = subscriptions; return this; } @@ -99,7 +101,7 @@ public StandaloneSubscriptionConfigurationBuilder subscriptions( * See {@link StandaloneSubscriptionConfiguration#subscriptions}. */ public StandaloneSubscriptionConfigurationBuilder subscriptions( - PubSubChannelMode mode, Set subscriptions) { + PubSubChannelMode mode, Set subscriptions) { this.subscriptions.put(mode, subscriptions); return this; } diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index cfd349afdf..90d7e0c73f 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -76,7 +76,7 @@ public CompletableFuture registerConnection() { * * @param response A response received */ - public void completeRequest(Response response) { + public void completeRequest(Response response) throws MessageHandler.MessageCallbackException { if (response.hasClosingError()) { // According to https://github.com/aws/glide-for-redis/issues/851 // a response with a closing error may arrive with any/random callback ID (usually -1) diff --git a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java index da2d16502a..6006e1f12d 100644 --- a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java @@ -1,6 +1,8 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.connectors.handlers; +import static glide.api.models.GlideString.gs; + import glide.api.logging.Logger; import glide.api.models.PubSubMessage; import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback; @@ -20,6 +22,19 @@ @RequiredArgsConstructor public class MessageHandler { + /** A wrapper for exceptions thrown from {@link MessageCallback} implementations. */ + static class MessageCallbackException extends Exception { + private MessageCallbackException(Exception cause) { + super(cause); + } + + @Override + public synchronized Exception getCause() { + // Overridden to restrict the return type to Exception rather than Throwable. + return (Exception) super.getCause(); + } + } + // TODO maybe store `BaseSubscriptionConfiguration` as is? /** * A user callback to call for every incoming message, if given. If missing, messages are pushed @@ -37,7 +52,7 @@ public class MessageHandler { private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); /** Process a push (PUBSUB) message received as a part of {@link Response} from GLIDE. */ - public void handle(Response response) { + void handle(Response response) throws MessageCallbackException { Object data = responseResolver.apply(response); if (!(data instanceof Map)) { Logger.log( @@ -48,7 +63,8 @@ public void handle(Response response) { } @SuppressWarnings("unchecked") Map push = (Map) data; - PushKind pushType = Enum.valueOf(PushKind.class, (String) push.get("kind")); + PushKind pushType = Enum.valueOf(PushKind.class, push.get("kind").toString()); + // The objects in values will actually be byte[]. Object[] values = (Object[]) push.get("values"); switch (pushType) { @@ -59,11 +75,13 @@ public void handle(Response response) { "Transport disconnected, messages might be lost"); break; case PMessage: - handle(new PubSubMessage((String) values[2], (String) values[1], (String) values[0])); + handle( + new PubSubMessage( + gs((byte[]) values[2]), gs((byte[]) values[1]), gs((byte[]) values[0]))); return; case Message: case SMessage: - handle(new PubSubMessage((String) values[1], (String) values[0])); + handle(new PubSubMessage(gs((byte[]) values[1]), gs((byte[]) values[0]))); return; case Subscribe: case PSubscribe: @@ -92,9 +110,14 @@ public void handle(Response response) { } /** Process a {@link PubSubMessage} received. */ - private void handle(PubSubMessage message) { + private void handle(PubSubMessage message) throws MessageCallbackException { if (callback.isPresent()) { - callback.get().accept(message, context.orElse(null)); + try { + callback.get().accept(message, context.orElse(null)); + } catch (Exception callbackException) { + throw new MessageCallbackException(callbackException); + } + // Note: Error subclasses are uncaught and will just propagate. } else { queue.push(message); } diff --git a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java index d78d144cea..6a9adbf93d 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java @@ -19,7 +19,7 @@ public class ReadHandler extends ChannelInboundHandlerAdapter { /** Submit responses from glide to an instance {@link CallbackDispatcher} to handle them. */ @Override public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) - throws RuntimeException { + throws MessageHandler.MessageCallbackException { if (msg instanceof Response) { Response response = (Response) msg; callbackDispatcher.completeRequest(response); @@ -32,8 +32,20 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) /** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Logger.log(ERROR, "read handler", () -> "=== exceptionCaught " + ctx + " " + cause); + if (cause instanceof MessageHandler.MessageCallbackException) { + Logger.log( + ERROR, + "read handler", + () -> "=== Exception thrown from pubsub callback " + ctx, + cause.getCause()); + // Mimic the behavior of if this got thrown by a user thread. Print to stderr, + cause.printStackTrace(); + + // Unwrap. Only works for Exceptions and not Errors. + throw ((MessageHandler.MessageCallbackException) cause).getCause(); + } + Logger.log(ERROR, "read handler", () -> "=== exceptionCaught " + ctx, cause); callbackDispatcher.distributeClosingException( "An unhandled error while reading from UDS channel: " + cause); } diff --git a/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java b/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java index 57b86bbf1f..b38b1a910c 100644 --- a/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java +++ b/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java @@ -16,7 +16,8 @@ public class RedisValueResolver { } /** - * Resolve a value received from Redis using given C-style pointer. + * Resolve a value received from Redis using given C-style pointer. String data is assumed to be + * UTF-8 and exposed as String objects. * * @param pointer A memory pointer from {@link Response} * @return A RESP3 value @@ -25,7 +26,8 @@ public class RedisValueResolver { /** * Resolve a value received from Redis using given C-style pointer. This method does not assume - * that strings are valid UTF-8 encoded strings + * that strings are valid UTF-8 encoded strings and will expose this data as a byte[] + * . * * @param pointer A memory pointer from {@link Response} * @return A RESP3 value diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 68648577b1..c8f866fe78 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -150,7 +150,7 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClient( for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) { var channelsBuilder = PubSubChannelsOrPatterns.newBuilder(); for (var channel : entry.getValue()) { - channelsBuilder.addChannelsOrPatterns(ByteString.copyFromUtf8(channel)); + channelsBuilder.addChannelsOrPatterns(ByteString.copyFrom(channel.getBytes())); } subscriptionsBuilder.putChannelsOrPatternsByType( entry.getKey().ordinal(), channelsBuilder.build()); @@ -178,7 +178,7 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClusterClien for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) { var channelsBuilder = PubSubChannelsOrPatterns.newBuilder(); for (var channel : entry.getValue()) { - channelsBuilder.addChannelsOrPatterns(ByteString.copyFromUtf8(channel)); + channelsBuilder.addChannelsOrPatterns(ByteString.copyFrom(channel.getBytes())); } subscriptionsBuilder.putChannelsOrPatternsByType( entry.getKey().ordinal(), channelsBuilder.build()); diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 04728a7dd3..eab5ef4cb4 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -11846,7 +11846,7 @@ public void publish_returns_success() { .thenReturn(testResponse); // exercise - CompletableFuture response = service.publish(channel, message); + CompletableFuture response = service.publish(message, channel); String payload = response.get(); // verify diff --git a/java/client/src/test/java/glide/api/RedisClusterClientTest.java b/java/client/src/test/java/glide/api/RedisClusterClientTest.java index 6f60a0f675..ff89d91b33 100644 --- a/java/client/src/test/java/glide/api/RedisClusterClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClusterClientTest.java @@ -2637,7 +2637,7 @@ public void spublish_returns_success() { .thenReturn(testResponse); // exercise - CompletableFuture response = service.spublish(channel, message); + CompletableFuture response = service.publish(message, channel, true); String payload = response.get(); // verify diff --git a/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java b/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java index 0f109f9108..7881c0ff0f 100644 --- a/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java +++ b/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java @@ -28,7 +28,7 @@ public void cluster_transaction_builds_protobuf_request() { ClusterTransaction transaction = new ClusterTransaction(); List> results = new LinkedList<>(); - transaction.spublish("ch1", "msg"); + transaction.publish("msg", "ch1", true); results.add(Pair.of(SPublish, buildArgs("ch1", "msg"))); transaction.sortReadOnly( diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 2941b260fb..a5ffffd0b8 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -1240,7 +1240,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.lcsLen("key1", "key2"); results.add(Pair.of(LCS, buildArgs("key1", "key2", "LEN"))); - transaction.publish("ch1", "msg"); + transaction.publish("msg", "ch1"); results.add(Pair.of(Publish, buildArgs("ch1", "msg"))); transaction.lcsIdx("key1", "key2"); diff --git a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java index fb5dc86014..9b89fce2c9 100644 --- a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java +++ b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java @@ -1,6 +1,7 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.managers; +import static glide.api.models.GlideString.gs; import static glide.api.models.configuration.NodeAddress.DEFAULT_HOST; import static glide.api.models.configuration.NodeAddress.DEFAULT_PORT; import static glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode.EXACT; @@ -144,9 +145,9 @@ public void connection_request_protobuf_generation_with_all_fields_set() { .clientName(CLIENT_NAME) .subscriptionConfiguration( StandaloneSubscriptionConfiguration.builder() - .subscription(EXACT, "channel_1") - .subscription(EXACT, "channel_2") - .subscription(PATTERN, "*chatRoom*") + .subscription(EXACT, gs("channel_1")) + .subscription(EXACT, gs("channel_2")) + .subscription(PATTERN, gs("*chatRoom*")) .build()) .build(); ConnectionRequest expectedProtobufConnectionRequest = @@ -181,12 +182,15 @@ public void connection_request_protobuf_generation_with_all_fields_set() { Map.of( EXACT.ordinal(), PubSubChannelsOrPatterns.newBuilder() - .addChannelsOrPatterns(ByteString.copyFromUtf8("channel_1")) - .addChannelsOrPatterns(ByteString.copyFromUtf8("channel_2")) + .addChannelsOrPatterns( + ByteString.copyFrom(gs("channel_1").getBytes())) + .addChannelsOrPatterns( + ByteString.copyFrom(gs("channel_2").getBytes())) .build(), PATTERN.ordinal(), PubSubChannelsOrPatterns.newBuilder() - .addChannelsOrPatterns(ByteString.copyFromUtf8("*chatRoom*")) + .addChannelsOrPatterns( + ByteString.copyFrom(gs("*chatRoom*").getBytes())) .build())) .build()) .build(); diff --git a/java/integTest/build.gradle b/java/integTest/build.gradle index 32e9258ce1..ad0192cffb 100644 --- a/java/integTest/build.gradle +++ b/java/integTest/build.gradle @@ -50,7 +50,7 @@ ext { extractRedisVersion = { String output -> // Redis response: // Redis server v=7.2.3 sha=00000000:0 malloc=jemalloc-5.3.0 bits=64 build=7504b1fedf883f2 - // Valkey response: + // Valkey response: // Server v=7.2.5 sha=26388270:0 malloc=jemalloc-5.3.0 bits=64 build=ea40bb1576e402d6 return output.split("v=")[1].split(" ")[0] } @@ -136,6 +136,10 @@ tasks.withType(Test) { events "started", "skipped", "passed", "failed" showStandardStreams true } + + minHeapSize = "2048m" // Initial heap size. Needed for max size tests. + maxHeapSize = "2048m" // Maximum heap size. Needed for max size tests. + afterTest { desc, result -> logger.quiet "${desc.className}.${desc.name}: ${result.resultType} ${(result.getEndTime() - result.getStartTime())/1000}s" } diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 7bdb8870c2..2dedec33f9 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -4,9 +4,12 @@ import static glide.TestConfiguration.REDIS_VERSION; import static glide.TestUtilities.commonClientConfig; import static glide.TestUtilities.commonClusterClientConfig; +import static glide.api.BaseClient.OK; +import static glide.api.models.GlideString.gs; import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -17,6 +20,7 @@ import glide.api.RedisClient; import glide.api.RedisClusterClient; import glide.api.models.ClusterTransaction; +import glide.api.models.GlideString; import glide.api.models.PubSubMessage; import glide.api.models.Transaction; import glide.api.models.configuration.BaseSubscriptionConfiguration.ChannelMode; @@ -28,6 +32,7 @@ import glide.api.models.exceptions.ConfigurationError; import glide.api.models.exceptions.RequestException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -36,11 +41,14 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -56,24 +64,27 @@ public class PubSubTests { @SuppressWarnings("unchecked") private BaseClient createClientWithSubscriptions( boolean standalone, - Map> subscriptions, + Map> subscriptions, Optional callback, Optional context) { if (standalone) { var subConfigBuilder = StandaloneSubscriptionConfiguration.builder() - .subscriptions((Map>) subscriptions); + .subscriptions((Map>) subscriptions); if (callback.isPresent()) { subConfigBuilder.callback(callback.get(), context.get()); } return RedisClient.CreateClient( - commonClientConfig().subscriptionConfiguration(subConfigBuilder.build()).build()) + commonClientConfig() + .requestTimeout(5000) + .subscriptionConfiguration(subConfigBuilder.build()) + .build()) .get(); } else { var subConfigBuilder = ClusterSubscriptionConfiguration.builder() - .subscriptions((Map>) subscriptions); + .subscriptions((Map>) subscriptions); if (callback.isPresent()) { subConfigBuilder.callback(callback.get(), context.get()); @@ -81,6 +92,7 @@ private BaseClient createClientWithSubscriptions( return RedisClusterClient.CreateClient( commonClusterClientConfig() + .requestTimeout(5000) .subscriptionConfiguration(subConfigBuilder.build()) .build()) .get(); @@ -88,7 +100,7 @@ private BaseClient createClientWithSubscriptions( } private BaseClient createClientWithSubscriptions( - boolean standalone, Map> subscriptions) { + boolean standalone, Map> subscriptions) { return createClientWithSubscriptions( standalone, subscriptions, Optional.empty(), Optional.empty()); } @@ -166,7 +178,7 @@ private BaseClient createListener( boolean standalone, boolean useCallback, int clientId, - Map> subscriptions) { + Map> subscriptions) { MessageCallback callback = (msg, ctx) -> ((ConcurrentLinkedDeque>) ctx) @@ -182,11 +194,6 @@ private BaseClient createListener( // test_pubsub_exact_happy_path_many_channels_co_existence // test_sharded_pubsub_co_existence // test_pubsub_pattern_co_existence - // TODO tests below blocked by https://github.com/aws/glide-for-redis/issues/1649 - // test_pubsub_exact_max_size_PubsubMessage - // test_pubsub_sharded_max_size_PubsubMessage - // test_pubsub_exact_max_size_PubsubMessage_callback - // test_pubsub_sharded_max_size_PubsubMessage_callback // TODO why `publish` returns 0 on cluster or > 1 on standalone when there is only 1 receiver??? // meanwhile, all pubsubMessages are delivered. @@ -205,15 +212,15 @@ private void skipTestsOnMac() { @MethodSource("getTwoBoolPermutations") public void exact_happy_path(boolean standalone, boolean useCallback) { skipTestsOnMac(); - String channel = UUID.randomUUID().toString(); - String message = UUID.randomUUID().toString(); + GlideString channel = gs(UUID.randomUUID().toString()); + GlideString message = gs(UUID.randomUUID().toString()); var subscriptions = Map.of(exact(standalone), Set.of(channel)); var listener = createListener(standalone, useCallback, 1, subscriptions); var sender = createClient(standalone); clients.addAll(List.of(listener, sender)); - sender.publish(channel, message).get(); + sender.publish(message, channel).get(); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message verifyReceivedPubsubMessages( @@ -230,13 +237,13 @@ public void exact_happy_path_many_channels(boolean standalone, boolean useCallba int messagesPerChannel = 256; var messages = new ArrayList(numChannels * messagesPerChannel); ChannelMode mode = exact(standalone); - Map> subscriptions = Map.of(mode, new HashSet<>()); + Map> subscriptions = Map.of(mode, new HashSet<>()); for (var i = 0; i < numChannels; i++) { - var channel = i + "-" + UUID.randomUUID(); + GlideString channel = gs(i + "-" + UUID.randomUUID()); subscriptions.get(mode).add(channel); for (var j = 0; j < messagesPerChannel; j++) { - var message = i + "-" + j + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + j + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(message, channel)); } } @@ -246,7 +253,7 @@ public void exact_happy_path_many_channels(boolean standalone, boolean useCallba clients.addAll(List.of(listener, sender)); for (var pubsubMessage : messages) { - sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get(); } Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -265,15 +272,15 @@ public void sharded_pubsub(boolean useCallback) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); - String channel = UUID.randomUUID().toString(); - String pubsubMessage = UUID.randomUUID().toString(); + GlideString channel = gs(UUID.randomUUID().toString()); + GlideString pubsubMessage = gs(UUID.randomUUID().toString()); var subscriptions = Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); var listener = createListener(false, useCallback, 1, subscriptions); var sender = (RedisClusterClient) createClient(false); clients.addAll(List.of(listener, sender)); - sender.spublish(channel, pubsubMessage).get(); + sender.publish(pubsubMessage, channel, true).get(); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message verifyReceivedPubsubMessages( @@ -292,13 +299,13 @@ public void sharded_pubsub_many_channels(boolean useCallback) { int pubsubMessagesPerChannel = 256; var pubsubMessages = new ArrayList(numChannels * pubsubMessagesPerChannel); PubSubClusterChannelMode mode = PubSubClusterChannelMode.SHARDED; - Map> subscriptions = Map.of(mode, new HashSet<>()); + Map> subscriptions = Map.of(mode, new HashSet<>()); for (var i = 0; i < numChannels; i++) { - var channel = i + "-" + UUID.randomUUID(); + GlideString channel = gs(i + "-" + UUID.randomUUID()); subscriptions.get(mode).add(channel); for (var j = 0; j < pubsubMessagesPerChannel; j++) { - var message = i + "-" + j + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + j + "-" + UUID.randomUUID()); pubsubMessages.add(new PubSubMessage(message, channel)); } } @@ -308,9 +315,9 @@ public void sharded_pubsub_many_channels(boolean useCallback) { clients.addAll(List.of(listener, sender)); for (var pubsubMessage : pubsubMessages) { - sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel(), true).get(); } - sender.spublish(UUID.randomUUID().toString(), UUID.randomUUID().toString()).get(); + sender.publish(UUID.randomUUID().toString(), UUID.randomUUID().toString(), true).get(); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -327,10 +334,13 @@ public void sharded_pubsub_many_channels(boolean useCallback) { public void pattern(boolean standalone, boolean useCallback) { skipTestsOnMac(); String prefix = "channel."; - String pattern = prefix + "*"; - Map message2channels = + GlideString pattern = gs(prefix + "*"); + Map message2channels = Map.of( - prefix + "1", UUID.randomUUID().toString(), prefix + "2", UUID.randomUUID().toString()); + gs(prefix + "1"), + gs(UUID.randomUUID().toString()), + gs(prefix + "2"), + gs(UUID.randomUUID().toString())); var subscriptions = Map.of( standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN, @@ -343,9 +353,9 @@ public void pattern(boolean standalone, boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // need some time to propagate subscriptions - why? for (var entry : message2channels.entrySet()) { - sender.publish(entry.getKey(), entry.getValue()).get(); + sender.publish(entry.getValue(), entry.getKey()).get(); } - sender.publish("channel", UUID.randomUUID().toString()).get(); + sender.publish(UUID.randomUUID().toString(), "channel").get(); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages var expected = @@ -363,7 +373,7 @@ public void pattern(boolean standalone, boolean useCallback) { public void pattern_many_channels(boolean standalone, boolean useCallback) { skipTestsOnMac(); String prefix = "channel."; - String pattern = prefix + "*"; + GlideString pattern = gs(prefix + "*"); int numChannels = 256; int messagesPerChannel = 256; ChannelMode mode = standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN; @@ -371,9 +381,9 @@ public void pattern_many_channels(boolean standalone, boolean useCallback) { var subscriptions = Map.of(mode, Set.of(pattern)); for (var i = 0; i < numChannels; i++) { - var channel = prefix + "-" + i + "-" + UUID.randomUUID(); + GlideString channel = gs(prefix + "-" + i + "-" + UUID.randomUUID()); for (var j = 0; j < messagesPerChannel; j++) { - var message = i + "-" + j + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + j + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(message, channel, pattern)); } } @@ -385,9 +395,9 @@ public void pattern_many_channels(boolean standalone, boolean useCallback) { Thread.sleep(MESSAGE_DELIVERY_DELAY); // need some time to propagate subscriptions - why? for (var pubsubMessage : messages) { - sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get(); } - sender.publish("channel", UUID.randomUUID().toString()).get(); + sender.publish(UUID.randomUUID().toString(), "channel").get(); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages verifyReceivedPubsubMessages( @@ -403,12 +413,12 @@ public void pattern_many_channels(boolean standalone, boolean useCallback) { public void combined_exact_and_pattern_one_client(boolean standalone, boolean useCallback) { skipTestsOnMac(); String prefix = "channel."; - String pattern = prefix + "*"; + GlideString pattern = gs(prefix + "*"); int numChannels = 256; int messagesPerChannel = 256; var messages = new ArrayList(numChannels * messagesPerChannel); ChannelMode mode = standalone ? PubSubChannelMode.EXACT : PubSubClusterChannelMode.EXACT; - Map> subscriptions = + Map> subscriptions = Map.of( mode, new HashSet<>(), @@ -416,17 +426,17 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us Set.of(pattern)); for (var i = 0; i < numChannels; i++) { - var channel = i + "-" + UUID.randomUUID(); + GlideString channel = gs(i + "-" + UUID.randomUUID()); subscriptions.get(mode).add(channel); for (var j = 0; j < messagesPerChannel; j++) { - var message = i + "-" + j + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + j + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(message, channel)); } } for (var j = 0; j < messagesPerChannel; j++) { - var pubsubMessage = j + "-" + UUID.randomUUID(); - var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + GlideString pubsubMessage = gs(j + "-" + UUID.randomUUID()); + GlideString channel = gs(prefix + "-" + j + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(pubsubMessage, channel, pattern)); } @@ -435,7 +445,7 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us clients.addAll(List.of(listener, sender)); for (var pubsubMessage : messages) { - sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get(); } Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -455,22 +465,22 @@ public void combined_exact_and_pattern_one_client(boolean standalone, boolean us public void combined_exact_and_pattern_multiple_clients(boolean standalone, boolean useCallback) { skipTestsOnMac(); String prefix = "channel."; - String pattern = prefix + "*"; + GlideString pattern = gs(prefix + "*"); int numChannels = 256; var messages = new ArrayList(numChannels * 2); ChannelMode mode = exact(standalone); - Map> subscriptions = Map.of(mode, new HashSet<>()); + Map> subscriptions = Map.of(mode, new HashSet<>()); for (var i = 0; i < numChannels; i++) { - var channel = i + "-" + UUID.randomUUID(); + GlideString channel = gs(i + "-" + UUID.randomUUID()); subscriptions.get(mode).add(channel); - var message = i + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(message, channel)); } for (var j = 0; j < numChannels; j++) { - var message = j + "-" + UUID.randomUUID(); - var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + GlideString message = gs(j + "-" + UUID.randomUUID()); + GlideString channel = gs(prefix + "-" + j + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(message, channel, pattern)); } @@ -483,7 +493,7 @@ public void combined_exact_and_pattern_multiple_clients(boolean standalone, bool clients.addAll(List.of(listenerExactSub, listenerPatternSub, sender)); for (var pubsubMessage : messages) { - sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get(); } Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -524,34 +534,34 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { skipTestsOnMac(); String prefix = "channel."; - String pattern = prefix + "*"; + GlideString pattern = gs(prefix + "*"); String shardPrefix = "{shard}"; int numChannels = 256; var messages = new ArrayList(numChannels * 2); var shardedMessages = new ArrayList(numChannels); - Map> subscriptions = + Map> subscriptions = Map.of( PubSubClusterChannelMode.EXACT, new HashSet<>(), PubSubClusterChannelMode.PATTERN, Set.of(pattern), PubSubClusterChannelMode.SHARDED, new HashSet<>()); for (var i = 0; i < numChannels; i++) { - var channel = i + "-" + UUID.randomUUID(); + GlideString channel = gs(i + "-" + UUID.randomUUID()); subscriptions.get(PubSubClusterChannelMode.EXACT).add(channel); - var message = i + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(message, channel)); } for (var i = 0; i < numChannels; i++) { - var channel = shardPrefix + "-" + i + "-" + UUID.randomUUID(); + GlideString channel = gs(shardPrefix + "-" + i + "-" + UUID.randomUUID()); subscriptions.get(PubSubClusterChannelMode.SHARDED).add(channel); - var message = i + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + UUID.randomUUID()); shardedMessages.add(new PubSubMessage(message, channel)); } for (var j = 0; j < numChannels; j++) { - var message = j + "-" + UUID.randomUUID(); - var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + GlideString message = gs(j + "-" + UUID.randomUUID()); + GlideString channel = gs(prefix + "-" + j + "-" + UUID.randomUUID()); messages.add(new PubSubMessage(message, channel, pattern)); } @@ -560,10 +570,10 @@ public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { clients.addAll(List.of(listener, sender)); for (var pubsubMessage : messages) { - sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get(); } for (var pubsubMessage : shardedMessages) { - sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel(), true).get(); } Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -587,36 +597,36 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) skipTestsOnMac(); String prefix = "channel."; - String pattern = prefix + "*"; + GlideString pattern = gs(prefix + "*"); String shardPrefix = "{shard}"; int numChannels = 256; var exactMessages = new ArrayList(numChannels); var patternMessages = new ArrayList(numChannels); var shardedMessages = new ArrayList(numChannels); - Map> subscriptionsExact = + Map> subscriptionsExact = Map.of(PubSubClusterChannelMode.EXACT, new HashSet<>()); - Map> subscriptionsPattern = + Map> subscriptionsPattern = Map.of(PubSubClusterChannelMode.PATTERN, Set.of(pattern)); - Map> subscriptionsSharded = + Map> subscriptionsSharded = Map.of(PubSubClusterChannelMode.SHARDED, new HashSet<>()); for (var i = 0; i < numChannels; i++) { - var channel = i + "-" + UUID.randomUUID(); + GlideString channel = gs(i + "-" + UUID.randomUUID()); subscriptionsExact.get(PubSubClusterChannelMode.EXACT).add(channel); - var pubsubMessage = i + "-" + UUID.randomUUID(); + GlideString pubsubMessage = gs(i + "-" + UUID.randomUUID()); exactMessages.add(new PubSubMessage(pubsubMessage, channel)); } for (var i = 0; i < numChannels; i++) { - var channel = shardPrefix + "-" + i + "-" + UUID.randomUUID(); + GlideString channel = gs(shardPrefix + "-" + i + "-" + UUID.randomUUID()); subscriptionsSharded.get(PubSubClusterChannelMode.SHARDED).add(channel); - var message = i + "-" + UUID.randomUUID(); + GlideString message = gs(i + "-" + UUID.randomUUID()); shardedMessages.add(new PubSubMessage(message, channel)); } for (var j = 0; j < numChannels; j++) { - var message = j + "-" + UUID.randomUUID(); - var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + GlideString message = gs(j + "-" + UUID.randomUUID()); + GlideString channel = gs(prefix + "-" + j + "-" + UUID.randomUUID()); patternMessages.add(new PubSubMessage(message, channel, pattern)); } @@ -634,13 +644,13 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded, sender)); for (var pubsubMessage : exactMessages) { - sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get(); } for (var pubsubMessage : patternMessages) { - sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get(); } for (var pubsubMessage : shardedMessages) { - sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel(), true).get(); } Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -688,111 +698,86 @@ public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) * tests. */ @SneakyThrows - @Test - public void three_publishing_clients_same_name_with_sharded_no_callback() { + @ParameterizedTest(name = "use callback = {0}") + @ValueSource(booleans = {true, false}) + public void three_publishing_clients_same_name_with_sharded(boolean useCallback) { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); skipTestsOnMac(); - String channel = UUID.randomUUID().toString(); - var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); - var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), channel, channel); - var shardedMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); - Map> subscriptionsExact = + GlideString channel = gs(UUID.randomUUID().toString()); + var exactMessage = new PubSubMessage(gs(UUID.randomUUID().toString()), channel); + var patternMessage = new PubSubMessage(gs(UUID.randomUUID().toString()), channel, channel); + var shardedMessage = new PubSubMessage(gs(UUID.randomUUID().toString()), channel); + Map> subscriptionsExact = Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); - Map> subscriptionsPattern = + Map> subscriptionsPattern = Map.of(PubSubClusterChannelMode.PATTERN, Set.of(channel)); - Map> subscriptionsSharded = + Map> subscriptionsSharded = Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); var listenerExact = - (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsExact); - var listenerPattern = - (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsPattern); - var listenerSharded = - (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsSharded); - clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded)); - - listenerPattern.publish(channel, exactMessage.getMessage()).get(); - listenerSharded.publish(channel, patternMessage.getMessage()).get(); - listenerExact.spublish(channel, shardedMessage.getMessage()).get(); - - Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages - - verifyReceivedPubsubMessages( - Set.of( - Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage), - Pair.of( - PubSubClusterChannelMode.EXACT.ordinal(), - new PubSubMessage(patternMessage.getMessage(), channel))), - listenerExact, - false); - verifyReceivedPubsubMessages( - Set.of( - Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), - Pair.of( - PubSubClusterChannelMode.PATTERN.ordinal(), - new PubSubMessage(exactMessage.getMessage(), channel, channel))), - listenerPattern, - false); - verifyReceivedPubsubMessages( - Set.of(Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)), - listenerSharded, - false); - } + !useCallback + ? (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsExact) + : (RedisClusterClient) + createListener( + false, true, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact); - /** - * Similar to `test_pubsub_three_publishing_clients_same_name_with_sharded` in python client - * tests. - */ - @SneakyThrows - @Test - public void three_publishing_clients_same_name_with_sharded_with_callback() { - assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - skipTestsOnMac(); - - String channel = UUID.randomUUID().toString(); - var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); - var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), channel, channel); - var shardedMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); - Map> subscriptionsExact = - Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); - Map> subscriptionsPattern = - Map.of(PubSubClusterChannelMode.PATTERN, Set.of(channel)); - Map> subscriptionsSharded = - Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); - - var listenerExact = - (RedisClusterClient) - createListener( - false, true, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact); var listenerPattern = - createListener( - false, true, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern); + !useCallback + ? (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsPattern) + : createListener( + false, true, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern); + var listenerSharded = - createListener( - false, true, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded); + !useCallback + ? (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsSharded) + : createListener( + false, true, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded); clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded)); - listenerPattern.publish(channel, exactMessage.getMessage()).get(); - listenerSharded.publish(channel, patternMessage.getMessage()).get(); - listenerExact.spublish(channel, shardedMessage.getMessage()).get(); + listenerPattern.publish(exactMessage.getMessage(), channel).get(); + listenerSharded.publish(patternMessage.getMessage(), channel).get(); + listenerExact.publish(shardedMessage.getMessage(), channel, true).get(); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages - var expected = - Set.of( - Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage), - Pair.of( - PubSubClusterChannelMode.EXACT.ordinal(), - new PubSubMessage(patternMessage.getMessage(), channel)), - Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), - Pair.of( - PubSubClusterChannelMode.PATTERN.ordinal(), - new PubSubMessage(exactMessage.getMessage(), channel, channel)), - Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)); - - verifyReceivedPubsubMessages(expected, listenerExact, true); + if (!useCallback) { + verifyReceivedPubsubMessages( + Set.of( + Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage), + Pair.of( + PubSubClusterChannelMode.EXACT.ordinal(), + new PubSubMessage(patternMessage.getMessage(), channel))), + listenerExact, + false); + verifyReceivedPubsubMessages( + Set.of( + Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), + Pair.of( + PubSubClusterChannelMode.PATTERN.ordinal(), + new PubSubMessage(exactMessage.getMessage(), channel, channel))), + listenerPattern, + false); + verifyReceivedPubsubMessages( + Set.of(Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)), + listenerSharded, + false); + } else { + var expected = + Set.of( + Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage), + Pair.of( + PubSubClusterChannelMode.EXACT.ordinal(), + new PubSubMessage(patternMessage.getMessage(), channel)), + Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), + Pair.of( + PubSubClusterChannelMode.PATTERN.ordinal(), + new PubSubMessage(exactMessage.getMessage(), channel, channel)), + Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)); + + verifyReceivedPubsubMessages(expected, listenerExact, true); + } } @SneakyThrows @@ -816,16 +801,13 @@ public void error_cases() { var clusterClient = (RedisClusterClient) createClient(false); var transaction = new ClusterTransaction() - .spublish("abc", "one") - .spublish("mnk", "two") - .spublish("xyz", "three"); + .publish("one", "abc", true) + .publish("two", "mnk", true) + .publish("three", "xyz", true); var exception = assertThrows(ExecutionException.class, () -> clusterClient.exec(transaction).get()); assertInstanceOf(RequestException.class, exception.getCause()); assertTrue(exception.getMessage().toLowerCase().contains("crossslot")); - - // TODO test when callback throws an exception - currently nothing happens now - // it should terminate the client } @SneakyThrows @@ -840,14 +822,14 @@ public void transaction_with_all_types_of_PubsubMessages( "Test doesn't work on cluster due to Cross Slot error, probably a bug in `redis-rs`"); String prefix = "channel"; - String pattern = prefix + "*"; - String shardPrefix = "{shard}"; - String channel = UUID.randomUUID().toString(); - var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); - var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), prefix, pattern); - var shardedMessage = new PubSubMessage(UUID.randomUUID().toString(), shardPrefix); - - Map> subscriptions = + GlideString pattern = gs(prefix + "*"); + GlideString shardPrefix = gs("{shard}"); + GlideString channel = gs(UUID.randomUUID().toString()); + var exactMessage = new PubSubMessage(gs(UUID.randomUUID().toString()), channel); + var patternMessage = new PubSubMessage(gs(UUID.randomUUID().toString()), gs(prefix), pattern); + var shardedMessage = new PubSubMessage(gs(UUID.randomUUID().toString()), shardPrefix); + + Map> subscriptions = standalone ? Map.of( PubSubChannelMode.EXACT, @@ -869,15 +851,15 @@ public void transaction_with_all_types_of_PubsubMessages( if (standalone) { var transaction = new Transaction() - .publish(exactMessage.getChannel(), exactMessage.getMessage()) - .publish(patternMessage.getChannel(), patternMessage.getMessage()); + .publish(exactMessage.getMessage(), exactMessage.getChannel()) + .publish(patternMessage.getMessage(), patternMessage.getChannel()); ((RedisClient) sender).exec(transaction).get(); } else { var transaction = new ClusterTransaction() - .spublish(shardedMessage.getChannel(), shardedMessage.getMessage()) - .publish(exactMessage.getChannel(), exactMessage.getMessage()) - .publish(patternMessage.getChannel(), patternMessage.getMessage()); + .publish(shardedMessage.getMessage(), shardedMessage.getChannel(), true) + .publish(exactMessage.getMessage(), exactMessage.getChannel()) + .publish(patternMessage.getMessage(), patternMessage.getChannel()); ((RedisClusterClient) sender).exec(transaction).get(); } @@ -890,4 +872,263 @@ public void transaction_with_all_types_of_PubsubMessages( Pair.of(1, exactMessage), Pair.of(1, patternMessage), Pair.of(1, shardedMessage)); verifyReceivedPubsubMessages(expected, listener, useCallback); } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_exact_max_size_message(boolean standalone) { + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(512 * 1024 * 1024)); // 512MB + final GlideString message2 = gs("2".repeat(1 << 25)); // 3MB + + Map> subscriptions = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(channel)) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + var listener = createClientWithSubscriptions(standalone, subscriptions); + var sender = createClient(standalone); + clients.addAll(Arrays.asList(listener, sender)); + + assertEquals(OK, sender.publish(message, channel).get()); + assertEquals(OK, sender.publish(message2, channel).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + PubSubMessage asyncMessage = listener.getPubSubMessage().get(); + assertEquals(message, asyncMessage.getMessage()); + assertEquals(channel, asyncMessage.getChannel()); + assertTrue(asyncMessage.getPattern().isEmpty()); + + PubSubMessage syncMessage = listener.tryGetPubSubMessage(); + assertEquals(message2, syncMessage.getMessage()); + assertEquals(channel, syncMessage.getChannel()); + assertTrue(syncMessage.getPattern().isEmpty()); + + // Assert there are no more messages to read. + assertThrows( + TimeoutException.class, () -> listener.getPubSubMessage().get(3, TimeUnit.SECONDS)); + assertNull(listener.tryGetPubSubMessage()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_sharded_max_size_message(boolean standalone) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(512 * 1024 * 1024)); // 512MB + final GlideString message2 = gs("2".repeat(1 << 25)); // 3MB + + Map> subscriptions = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); + var listener = createClientWithSubscriptions(standalone, subscriptions); + var sender = createClient(standalone); + clients.addAll(Arrays.asList(listener, sender)); + + assertEquals(OK, sender.publish(message, channel).get()); + assertEquals(OK, ((RedisClusterClient) sender).publish(message2, channel, true).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + PubSubMessage asyncMessage = listener.getPubSubMessage().get(); + assertEquals(message, asyncMessage.getMessage()); + assertEquals(channel, asyncMessage.getChannel()); + assertTrue(asyncMessage.getPattern().isEmpty()); + + PubSubMessage syncMessage = listener.tryGetPubSubMessage(); + assertEquals(message2, syncMessage.getMessage()); + assertEquals(channel, syncMessage.getChannel()); + assertTrue(syncMessage.getPattern().isEmpty()); + + // Assert there are no more messages to read. + assertThrows( + TimeoutException.class, + () -> { + listener.getPubSubMessage().get(3, TimeUnit.SECONDS); + }); + + assertNull(listener.tryGetPubSubMessage()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_exact_max_size_message_callback(boolean standalone) { + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(512 * 1024 * 1024)); // 512MB + + ArrayList callbackMessages = new ArrayList<>(); + final MessageCallback callback = + (pubSubMessage, context) -> { + ArrayList receivedMessages = (ArrayList) context; + receivedMessages.add(pubSubMessage); + }; + + Map> subscriptions = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(channel)) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + + var listener = + createClientWithSubscriptions( + standalone, + subscriptions, + Optional.ofNullable(callback), + Optional.of(callbackMessages)); + var sender = createClient(standalone); + clients.addAll(Arrays.asList(listener, sender)); + + assertEquals(OK, sender.publish(message, channel).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + assertEquals(1, callbackMessages.size()); + assertEquals(message, callbackMessages.get(0).getMessage()); + assertEquals(channel, callbackMessages.get(0).getChannel()); + assertTrue(callbackMessages.get(0).getPattern().isEmpty()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_sharded_max_size_message_callback(boolean standalone) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(512 * 1024 * 1024)); // 512MB + + ArrayList callbackMessages = new ArrayList<>(); + final MessageCallback callback = + (pubSubMessage, context) -> { + ArrayList receivedMessages = (ArrayList) context; + receivedMessages.add(pubSubMessage); + }; + + Map> subscriptions = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); + + var listener = + createClientWithSubscriptions( + standalone, + subscriptions, + Optional.ofNullable(callback), + Optional.of(callbackMessages)); + var sender = createClient(standalone); + clients.addAll(Arrays.asList(listener, sender)); + + assertEquals(OK, ((RedisClusterClient) sender).publish(message, channel, true).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + assertEquals(1, callbackMessages.size()); + assertEquals(message, callbackMessages.get(0).getMessage()); + } + + /** Test the behavior if the callback supplied to a subscription throws an uncaught exception. */ + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_test_callback_exception(boolean standalone) { + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message1 = gs("message1"); + final GlideString message2 = gs("message2"); + final GlideString errorMsg = gs("errorMsg"); + final GlideString message3 = gs("message3"); + + ArrayList callbackMessages = new ArrayList<>(); + final MessageCallback callback = + (pubSubMessage, context) -> { + if (pubSubMessage.getMessage().equals(errorMsg)) { + throw new RuntimeException("Test callback error message"); + } + ArrayList receivedMessages = (ArrayList) context; + receivedMessages.add(pubSubMessage); + }; + + Map> subscriptions = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(channel)) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + + var listener = + createClientWithSubscriptions( + standalone, + subscriptions, + Optional.ofNullable(callback), + Optional.of(callbackMessages)); + var sender = createClient(standalone); + clients.addAll(Arrays.asList(listener, sender)); + + assertEquals(OK, sender.publish(message1, channel).get()); + assertEquals(OK, sender.publish(message2, channel).get()); + assertEquals(OK, sender.publish(errorMsg, channel).get()); + assertEquals(OK, sender.publish(message3, channel).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + assertEquals(3, callbackMessages.size()); + assertEquals(message1, callbackMessages.get(0).getMessage()); + assertEquals(channel, callbackMessages.get(0).getChannel()); + assertTrue(callbackMessages.get(0).getPattern().isEmpty()); + + assertEquals(message2, callbackMessages.get(1).getMessage()); + assertEquals(channel, callbackMessages.get(1).getChannel()); + assertTrue(callbackMessages.get(1).getPattern().isEmpty()); + + // Ensure we can receive message 3 which is after the message that triggers a throw. + assertEquals(message3, callbackMessages.get(2).getMessage()); + assertEquals(channel, callbackMessages.get(2).getChannel()); + assertTrue(callbackMessages.get(2).getPattern().isEmpty()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_with_binary(boolean standalone) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + + GlideString channel = gs(new byte[] {(byte) 0xE2, 0x28, (byte) 0xA1}); + var message = + new PubSubMessage(gs(new byte[] {(byte) 0xF0, 0x28, (byte) 0x8C, (byte) 0xBC}), channel); + + ArrayList callbackMessages = new ArrayList<>(); + final MessageCallback callback = + (pubSubMessage, context) -> { + ArrayList receivedMessages = (ArrayList) context; + receivedMessages.add(pubSubMessage); + }; + + Map> subscriptions = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(channel)) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + + var listener = createClientWithSubscriptions(standalone, subscriptions); + var listener2 = + createClientWithSubscriptions( + standalone, subscriptions, Optional.of(callback), Optional.of(callbackMessages)); + var sender = createClient(standalone); + clients.addAll(Arrays.asList(listener, listener2, sender)); + + assertEquals(OK, sender.publish(message.getMessage(), channel).get()); + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + assertEquals(message, listener.tryGetPubSubMessage()); + assertEquals(1, callbackMessages.size()); + assertEquals(message, callbackMessages.get(0)); + } } diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index 07c354f7ac..49eaf1bd30 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -1219,10 +1219,10 @@ private static Object[] bitmapCommands(BaseTransaction transaction) { } private static Object[] pubsubCommands(BaseTransaction transaction) { - transaction.publish("Tchannel", "message"); + transaction.publish("message", "Tchannel"); return new Object[] { - 0L, // publish("Tchannel", "message") + 0L, // publish("message", "Tchannel") }; } } diff --git a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java index cd31fe7bb6..5b855b5a4b 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java @@ -280,7 +280,7 @@ public void unwatch() { @SneakyThrows public void spublish() { assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); - ClusterTransaction transaction = new ClusterTransaction().spublish("Schannel", "message"); + ClusterTransaction transaction = new ClusterTransaction().publish("messagae", "Schannel", true); assertArrayEquals(new Object[] {0L}, clusterClient.exec(transaction).get()); }