diff --git a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java index 11da11954f..8c2aa7a77e 100644 --- a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java @@ -1,6 +1,8 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.connectors.handlers; +import static glide.api.models.GlideString.gs; + import glide.api.logging.Logger; import glide.api.models.GlideString; import glide.api.models.PubSubMessage; @@ -49,7 +51,7 @@ public void handle(Response response) { } @SuppressWarnings("unchecked") Map push = (Map) data; - PushKind pushType = Enum.valueOf(PushKind.class, ((GlideString) push.get("kind")).getString()); + PushKind pushType = Enum.valueOf(PushKind.class, push.get("kind").toString()); Object[] values = (Object[]) push.get("values"); switch (pushType) { @@ -62,11 +64,11 @@ public void handle(Response response) { case PMessage: handle( new PubSubMessage( - (GlideString) values[2], (GlideString) values[1], (GlideString) values[0])); + gs((byte[]) values[2]), gs((byte[]) values[1]), gs((byte[]) values[0]))); return; case Message: case SMessage: - handle(new PubSubMessage((GlideString) values[1], (GlideString) values[0])); + handle(new PubSubMessage(gs((byte[]) values[1]), gs((byte[]) values[0]))); return; case Subscribe: case PSubscribe: diff --git a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java index d78d144cea..0a6e08610b 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java @@ -33,6 +33,7 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Logger.log(ERROR, "read handler", () -> "=== exceptionCaught " + ctx + " " + cause); + Logger.log(ERROR, "read handler", cause); callbackDispatcher.distributeClosingException( "An unhandled error while reading from UDS channel: " + cause); diff --git a/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java b/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java index bf9bee28ea..b38b1a910c 100644 --- a/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java +++ b/java/client/src/main/java/glide/ffi/resolvers/RedisValueResolver.java @@ -1,7 +1,6 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.ffi.resolvers; -import glide.api.models.GlideString; import response.ResponseOuterClass.Response; public class RedisValueResolver { @@ -27,7 +26,8 @@ public class RedisValueResolver { /** * Resolve a value received from Redis using given C-style pointer. This method does not assume - * that strings are valid UTF-8 encoded strings and will expose this data as {@link GlideString}. + * that strings are valid UTF-8 encoded strings and will expose this data as a byte[] + * . * * @param pointer A memory pointer from {@link Response} * @return A RESP3 value diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 8335037170..553c477e7e 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -4,10 +4,12 @@ import static glide.TestConfiguration.REDIS_VERSION; import static glide.TestUtilities.commonClientConfig; import static glide.TestUtilities.commonClusterClientConfig; +import static glide.api.BaseClient.OK; import static glide.api.models.GlideString.gs; import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -38,11 +40,14 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -70,7 +75,10 @@ private BaseClient createClientWithSubscriptions( subConfigBuilder.callback(callback.get(), context.get()); } return RedisClient.CreateClient( - commonClientConfig().subscriptionConfiguration(subConfigBuilder.build()).build()) + commonClientConfig() + .requestTimeout(5000) + .subscriptionConfiguration(subConfigBuilder.build()) + .build()) .get(); } else { var subConfigBuilder = @@ -83,6 +91,7 @@ private BaseClient createClientWithSubscriptions( return RedisClusterClient.CreateClient( commonClusterClientConfig() + .requestTimeout(5000) .subscriptionConfiguration(subConfigBuilder.build()) .build()) .get(); @@ -870,4 +879,293 @@ public void transaction_with_all_types_of_PubsubMessages( Pair.of(1, exactMessage), Pair.of(1, patternMessage), Pair.of(1, shardedMessage)); verifyReceivedPubsubMessages(expected, listener, useCallback); } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_exact_max_size_message(boolean standalone) { + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(1 << 25)); // 33MB + final GlideString message2 = gs("2".repeat(1 << 25)); // 3MB + + Map> subscriptions = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(channel)) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + var listener = createClientWithSubscriptions(standalone, subscriptions); + var sender = createClientWithSubscriptions(standalone, subscriptions); + + try { + assertEquals(OK, sender.publish(message, channel).get()); + assertEquals(OK, sender.publish(message2, channel).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + PubSubMessage asyncMessage = listener.getPubSubMessage().get(); + PubSubMessage syncMessage = listener.tryGetPubSubMessage(); + assertEquals(message, asyncMessage.getMessage()); + assertEquals(channel, asyncMessage.getChannel()); + assertTrue(asyncMessage.getPattern().isEmpty()); + + assertEquals(message2, syncMessage.getMessage()); + assertEquals(channel, syncMessage.getChannel()); + assertTrue(syncMessage.getPattern().isEmpty()); + + // Assert there are no more messages to read. + assertThrows( + TimeoutException.class, + () -> { + listener.getPubSubMessage().get(3, TimeUnit.SECONDS); + }); + assertNull(listener.tryGetPubSubMessage()); + } finally { + if (!standalone) { + // Since all tests run on the same cluster, when closing the client, garbage collector can + // be called + // after another test will start running. + // In cluster mode, we check how many subscriptions received the message. + // So to avoid flakiness, we make sure to unsubscribe from the channels. + ((RedisClusterClient) listener) + .customCommand(new String[] {"UNSUBSCRIBE", channel.toString()}); + } + } + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_sharded_max_size_message(boolean standalone) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(1 << 25)); // 33MB + final GlideString message2 = gs("2".repeat(1 << 25)); // 3MB + + Map> subscriptions = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); + var listener = createClientWithSubscriptions(standalone, subscriptions); + var sender = createClientWithSubscriptions(standalone, subscriptions); + + try { + assertEquals(OK, sender.publish(message, channel).get()); + assertEquals(OK, sender.publish(message2, channel).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + PubSubMessage asyncMessage = listener.getPubSubMessage().get(); + PubSubMessage syncMessage = listener.tryGetPubSubMessage(); + assertEquals(message, asyncMessage.getMessage()); + assertEquals(channel, asyncMessage.getChannel()); + assertTrue(asyncMessage.getPattern().isEmpty()); + + assertEquals(message2, syncMessage.getMessage()); + assertEquals(channel, syncMessage.getChannel()); + assertTrue(syncMessage.getPattern().isEmpty()); + + // Assert there are no more messages to read. + assertThrows( + TimeoutException.class, + () -> { + listener.getPubSubMessage().get(3, TimeUnit.SECONDS); + }); + + assertNull(listener.tryGetPubSubMessage()); + } finally { + if (!standalone) { + // Since all tests run on the same cluster, when closing the client, garbage collector can + // be called + // after another test will start running. + // In cluster mode, we check how many subscriptions received the message. + // So to avoid flakiness, we make sure to unsubscribe from the channels. + ((RedisClusterClient) listener) + .customCommand(new String[] {"UNSUBSCRIBE", channel.toString()}); + } + } + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_exact_max_size_message_callback(boolean standalone) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(1 << 25)); // 33MB + + ArrayList callbackMessages = new ArrayList<>(); + final MessageCallback callback = + (pubSubMessage, context) -> { + ArrayList receivedMessages = (ArrayList) context; + receivedMessages.add(pubSubMessage); + }; + + Map> subscriptions = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(channel)) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + + var listener = + createClientWithSubscriptions( + standalone, + subscriptions, + Optional.ofNullable(callback), + Optional.of(callbackMessages)); + var sender = + createClientWithSubscriptions( + standalone, + subscriptions, + Optional.ofNullable(callback), + Optional.of(callbackMessages)); + + try { + assertEquals(OK, sender.publish(message, channel).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + assertEquals(1, callbackMessages.size()); + assertEquals(message, callbackMessages.get(0).getMessage()); + assertEquals(channel, callbackMessages.get(0).getChannel()); + assertNull(callbackMessages.get(0).getPattern()); + } finally { + if (!standalone) { + // Since all tests run on the same cluster, when closing the client, garbage collector can + // be called + // after another test will start running. + // In cluster mode, we check how many subscriptions received the message. + // So to avoid flakiness, we make sure to unsubscribe from the channels. + ((RedisClusterClient) listener) + .customCommand(new String[] {"UNSUBSCRIBE", channel.toString()}); + } + } + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {false}) + @Disabled( + "No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649") + public void pubsub_sharded_max_size_message_callback(boolean standalone) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + + final GlideString channel = gs(UUID.randomUUID().toString()); + final GlideString message = gs("1".repeat(1 << 25)); // 33MB + + ArrayList callbackMessages = new ArrayList<>(); + final MessageCallback callback = + (pubSubMessage, context) -> { + ArrayList receivedMessages = (ArrayList) context; + receivedMessages.add(pubSubMessage); + }; + + Map> subscriptions = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); + + var listener = + createClientWithSubscriptions( + standalone, + subscriptions, + Optional.ofNullable(callback), + Optional.of(callbackMessages)); + var sender = + createClientWithSubscriptions( + standalone, + subscriptions, + Optional.ofNullable(callback), + Optional.of(callbackMessages)); + + try { + assertEquals(OK, ((RedisClusterClient) sender).publish(message, channel, true).get()); + + // Allow the message to propagate. + Thread.sleep(MESSAGE_DELIVERY_DELAY); + + assertEquals(1, callbackMessages.size()); + assertEquals(message, callbackMessages.get(0).getMessage()); + } finally { + if (!standalone) { + // Since all tests run on the same cluster, when closing the client, garbage collector can + // be called + // after another test will start running. + // In cluster mode, we check how many subscriptions received the message. + // So to avoid flakiness, we make sure to unsubscribe from the channels. + ((RedisClusterClient) listener) + .customCommand(new String[] {"UNSUBSCRIBE", channel.toString()}); + } + } + } + + // @pytest.mark.parametrize("cluster_mode", [True]) + // async def test_pubsub_sharded_max_size_message_callback( + // self, request, cluster_mode: bool + // ): + // """ + // Tests publishing and receiving maximum size messages in sharded PUBSUB with callback + // method. + // + // This test verifies that very large messages (512MB - BulkString max size) can be + // published and received + // correctly. It ensures that the PUBSUB system + // can handle maximum size messages without errors and that the callback message + // retrieval method works as expected. + // + // The test covers the following scenarios: + // - Setting up PUBSUB subscription for a specific sharded channel with a callback. + // - Publishing a maximum size message to the channel. + // - Verifying that the message is received correctly using the callback method. + // """ + // channel = get_random_string(10) + // message = get_random_string(512 * 1024 * 1024) + // publish_response = 1 if cluster_mode else OK + // + // callback_messages: List[CoreCommands.PubSubMsg] = [] + // callback, context = new_message, callback_messages + // + // pub_sub = create_pubsub_subscription( + // cluster_mode, + // {ClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + // {}, + // callback=callback, + // context=context, + // ) + // + // publishing_client, listening_client = await create_two_clients( + // request, cluster_mode, pub_sub + // ) + // + // # (Redis version > 7) + // if await check_if_server_version_lt(publishing_client, "7.0.0"): + // pytest.skip("Redis version required >= 7.0.0") + // + // try: + // assert ( + // await cast(GlideClusterClient, publishing_client).publish( + // message, channel, sharded=True + // ) + // == publish_response + // ) + // # allow the message to propagate + // await asyncio.sleep(5) + // + // assert len(callback_messages) == 1 + // + // assert callback_messages[0].message == message + // assert callback_messages[0].channel == channel + // assert callback_messages[0].pattern is None + // + // finally: + // if cluster_mode: + // # Since all tests run on the same cluster, when closing the client, garbage collector can be + // called after another test will start running + // # In cluster mode, we check how many subscriptions received the message + // # So to avoid flakiness, we make sure to unsubscribe from the channels + // await listening_client.custom_command(["UNSUBSCRIBE", channel]) }