Skip to content

Commit

Permalink
Add pubsub MaxSize integration tests (disabled)
Browse files Browse the repository at this point in the history
Add the following tests from Python:
* pubsub_exact_max_size_message
* pubsub_sharded_max_size_message
* pubsub_exact_max_size_message_callback
* pubsub_sharded_max_size_message_callback
  • Loading branch information
jduo committed Jul 4, 2024
1 parent e1c033e commit d66b9c6
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.connectors.handlers;

import static glide.api.models.GlideString.gs;

import glide.api.logging.Logger;
import glide.api.models.GlideString;
import glide.api.models.PubSubMessage;
Expand Down Expand Up @@ -49,7 +51,7 @@ public void handle(Response response) {
}
@SuppressWarnings("unchecked")
Map<GlideString, Object> push = (Map<GlideString, Object>) data;
PushKind pushType = Enum.valueOf(PushKind.class, ((GlideString) push.get("kind")).getString());
PushKind pushType = Enum.valueOf(PushKind.class, push.get("kind").toString());
Object[] values = (Object[]) push.get("values");

switch (pushType) {
Expand All @@ -62,11 +64,11 @@ public void handle(Response response) {
case PMessage:
handle(
new PubSubMessage(
(GlideString) values[2], (GlideString) values[1], (GlideString) values[0]));
gs((byte[]) values[2]), gs((byte[]) values[1]), gs((byte[]) values[0])));
return;
case Message:
case SMessage:
handle(new PubSubMessage((GlideString) values[1], (GlideString) values[0]));
handle(new PubSubMessage(gs((byte[]) values[1]), gs((byte[]) values[0])));
return;
case Subscribe:
case PSubscribe:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Logger.log(ERROR, "read handler", () -> "=== exceptionCaught " + ctx + " " + cause);
Logger.log(ERROR, "read handler", cause);

callbackDispatcher.distributeClosingException(
"An unhandled error while reading from UDS channel: " + cause);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.ffi.resolvers;

import glide.api.models.GlideString;
import response.ResponseOuterClass.Response;

public class RedisValueResolver {
Expand All @@ -27,7 +26,8 @@ public class RedisValueResolver {

/**
* Resolve a value received from Redis using given C-style pointer. This method does not assume
* that strings are valid UTF-8 encoded strings and will expose this data as {@link GlideString}.
* that strings are valid UTF-8 encoded strings and will expose this data as a <code>byte[]</code>
* .
*
* @param pointer A memory pointer from {@link Response}
* @return A RESP3 value
Expand Down
300 changes: 299 additions & 1 deletion java/integTest/src/test/java/glide/PubSubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import static glide.TestConfiguration.REDIS_VERSION;
import static glide.TestUtilities.commonClientConfig;
import static glide.TestUtilities.commonClusterClientConfig;
import static glide.api.BaseClient.OK;
import static glide.api.models.GlideString.gs;
import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -38,11 +40,14 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -70,7 +75,10 @@ private <M extends ChannelMode> BaseClient createClientWithSubscriptions(
subConfigBuilder.callback(callback.get(), context.get());
}
return RedisClient.CreateClient(
commonClientConfig().subscriptionConfiguration(subConfigBuilder.build()).build())
commonClientConfig()
.requestTimeout(5000)
.subscriptionConfiguration(subConfigBuilder.build())
.build())
.get();
} else {
var subConfigBuilder =
Expand All @@ -83,6 +91,7 @@ private <M extends ChannelMode> BaseClient createClientWithSubscriptions(

return RedisClusterClient.CreateClient(
commonClusterClientConfig()
.requestTimeout(5000)
.subscriptionConfiguration(subConfigBuilder.build())
.build())
.get();
Expand Down Expand Up @@ -870,4 +879,293 @@ public void transaction_with_all_types_of_PubsubMessages(
Pair.of(1, exactMessage), Pair.of(1, patternMessage), Pair.of(1, shardedMessage));
verifyReceivedPubsubMessages(expected, listener, useCallback);
}

@SneakyThrows
@ParameterizedTest(name = "standalone = {0}")
@ValueSource(booleans = {true, false})
@Disabled(
"No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649")
public void pubsub_exact_max_size_message(boolean standalone) {
final GlideString channel = gs(UUID.randomUUID().toString());
final GlideString message = gs("1".repeat(1 << 25)); // 33MB
final GlideString message2 = gs("2".repeat(1 << 25)); // 3MB

Map<? extends ChannelMode, Set<GlideString>> subscriptions =
standalone
? Map.of(PubSubChannelMode.EXACT, Set.of(channel))
: Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel));
var listener = createClientWithSubscriptions(standalone, subscriptions);
var sender = createClientWithSubscriptions(standalone, subscriptions);

try {
assertEquals(OK, sender.publish(message, channel).get());
assertEquals(OK, sender.publish(message2, channel).get());

// Allow the message to propagate.
Thread.sleep(MESSAGE_DELIVERY_DELAY);

PubSubMessage asyncMessage = listener.getPubSubMessage().get();
PubSubMessage syncMessage = listener.tryGetPubSubMessage();
assertEquals(message, asyncMessage.getMessage());
assertEquals(channel, asyncMessage.getChannel());
assertTrue(asyncMessage.getPattern().isEmpty());

assertEquals(message2, syncMessage.getMessage());
assertEquals(channel, syncMessage.getChannel());
assertTrue(syncMessage.getPattern().isEmpty());

// Assert there are no more messages to read.
assertThrows(
TimeoutException.class,
() -> {
listener.getPubSubMessage().get(3, TimeUnit.SECONDS);
});
assertNull(listener.tryGetPubSubMessage());
} finally {
if (!standalone) {
// Since all tests run on the same cluster, when closing the client, garbage collector can
// be called
// after another test will start running.
// In cluster mode, we check how many subscriptions received the message.
// So to avoid flakiness, we make sure to unsubscribe from the channels.
((RedisClusterClient) listener)
.customCommand(new String[] {"UNSUBSCRIBE", channel.toString()});
}
}
}

@SneakyThrows
@ParameterizedTest(name = "standalone = {0}")
@ValueSource(booleans = {false})
@Disabled(
"No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649")
public void pubsub_sharded_max_size_message(boolean standalone) {
assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7");

final GlideString channel = gs(UUID.randomUUID().toString());
final GlideString message = gs("1".repeat(1 << 25)); // 33MB
final GlideString message2 = gs("2".repeat(1 << 25)); // 3MB

Map<? extends ChannelMode, Set<GlideString>> subscriptions =
Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel));
var listener = createClientWithSubscriptions(standalone, subscriptions);
var sender = createClientWithSubscriptions(standalone, subscriptions);

try {
assertEquals(OK, sender.publish(message, channel).get());
assertEquals(OK, sender.publish(message2, channel).get());

// Allow the message to propagate.
Thread.sleep(MESSAGE_DELIVERY_DELAY);

PubSubMessage asyncMessage = listener.getPubSubMessage().get();
PubSubMessage syncMessage = listener.tryGetPubSubMessage();
assertEquals(message, asyncMessage.getMessage());
assertEquals(channel, asyncMessage.getChannel());
assertTrue(asyncMessage.getPattern().isEmpty());

assertEquals(message2, syncMessage.getMessage());
assertEquals(channel, syncMessage.getChannel());
assertTrue(syncMessage.getPattern().isEmpty());

// Assert there are no more messages to read.
assertThrows(
TimeoutException.class,
() -> {
listener.getPubSubMessage().get(3, TimeUnit.SECONDS);
});

assertNull(listener.tryGetPubSubMessage());
} finally {
if (!standalone) {
// Since all tests run on the same cluster, when closing the client, garbage collector can
// be called
// after another test will start running.
// In cluster mode, we check how many subscriptions received the message.
// So to avoid flakiness, we make sure to unsubscribe from the channels.
((RedisClusterClient) listener)
.customCommand(new String[] {"UNSUBSCRIBE", channel.toString()});
}
}
}

@SneakyThrows
@ParameterizedTest(name = "standalone = {0}")
@ValueSource(booleans = {true, false})
@Disabled(
"No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649")
public void pubsub_exact_max_size_message_callback(boolean standalone) {
assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7");

final GlideString channel = gs(UUID.randomUUID().toString());
final GlideString message = gs("1".repeat(1 << 25)); // 33MB

ArrayList<PubSubMessage> callbackMessages = new ArrayList<>();
final MessageCallback callback =
(pubSubMessage, context) -> {
ArrayList<PubSubMessage> receivedMessages = (ArrayList<PubSubMessage>) context;
receivedMessages.add(pubSubMessage);
};

Map<? extends ChannelMode, Set<GlideString>> subscriptions =
standalone
? Map.of(PubSubChannelMode.EXACT, Set.of(channel))
: Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel));

var listener =
createClientWithSubscriptions(
standalone,
subscriptions,
Optional.ofNullable(callback),
Optional.of(callbackMessages));
var sender =
createClientWithSubscriptions(
standalone,
subscriptions,
Optional.ofNullable(callback),
Optional.of(callbackMessages));

try {
assertEquals(OK, sender.publish(message, channel).get());

// Allow the message to propagate.
Thread.sleep(MESSAGE_DELIVERY_DELAY);

assertEquals(1, callbackMessages.size());
assertEquals(message, callbackMessages.get(0).getMessage());
assertEquals(channel, callbackMessages.get(0).getChannel());
assertNull(callbackMessages.get(0).getPattern());
} finally {
if (!standalone) {
// Since all tests run on the same cluster, when closing the client, garbage collector can
// be called
// after another test will start running.
// In cluster mode, we check how many subscriptions received the message.
// So to avoid flakiness, we make sure to unsubscribe from the channels.
((RedisClusterClient) listener)
.customCommand(new String[] {"UNSUBSCRIBE", channel.toString()});
}
}
}

@SneakyThrows
@ParameterizedTest(name = "standalone = {0}")
@ValueSource(booleans = {false})
@Disabled(
"No way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649")
public void pubsub_sharded_max_size_message_callback(boolean standalone) {
assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7");

final GlideString channel = gs(UUID.randomUUID().toString());
final GlideString message = gs("1".repeat(1 << 25)); // 33MB

ArrayList<PubSubMessage> callbackMessages = new ArrayList<>();
final MessageCallback callback =
(pubSubMessage, context) -> {
ArrayList<PubSubMessage> receivedMessages = (ArrayList<PubSubMessage>) context;
receivedMessages.add(pubSubMessage);
};

Map<? extends ChannelMode, Set<GlideString>> subscriptions =
Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel));

var listener =
createClientWithSubscriptions(
standalone,
subscriptions,
Optional.ofNullable(callback),
Optional.of(callbackMessages));
var sender =
createClientWithSubscriptions(
standalone,
subscriptions,
Optional.ofNullable(callback),
Optional.of(callbackMessages));

try {
assertEquals(OK, ((RedisClusterClient) sender).publish(message, channel, true).get());

// Allow the message to propagate.
Thread.sleep(MESSAGE_DELIVERY_DELAY);

assertEquals(1, callbackMessages.size());
assertEquals(message, callbackMessages.get(0).getMessage());
} finally {
if (!standalone) {
// Since all tests run on the same cluster, when closing the client, garbage collector can
// be called
// after another test will start running.
// In cluster mode, we check how many subscriptions received the message.
// So to avoid flakiness, we make sure to unsubscribe from the channels.
((RedisClusterClient) listener)
.customCommand(new String[] {"UNSUBSCRIBE", channel.toString()});
}
}
}

// @pytest.mark.parametrize("cluster_mode", [True])
// async def test_pubsub_sharded_max_size_message_callback(
// self, request, cluster_mode: bool
// ):
// """
// Tests publishing and receiving maximum size messages in sharded PUBSUB with callback
// method.
//
// This test verifies that very large messages (512MB - BulkString max size) can be
// published and received
// correctly. It ensures that the PUBSUB system
// can handle maximum size messages without errors and that the callback message
// retrieval method works as expected.
//
// The test covers the following scenarios:
// - Setting up PUBSUB subscription for a specific sharded channel with a callback.
// - Publishing a maximum size message to the channel.
// - Verifying that the message is received correctly using the callback method.
// """
// channel = get_random_string(10)
// message = get_random_string(512 * 1024 * 1024)
// publish_response = 1 if cluster_mode else OK
//
// callback_messages: List[CoreCommands.PubSubMsg] = []
// callback, context = new_message, callback_messages
//
// pub_sub = create_pubsub_subscription(
// cluster_mode,
// {ClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}},
// {},
// callback=callback,
// context=context,
// )
//
// publishing_client, listening_client = await create_two_clients(
// request, cluster_mode, pub_sub
// )
//
// # (Redis version > 7)
// if await check_if_server_version_lt(publishing_client, "7.0.0"):
// pytest.skip("Redis version required >= 7.0.0")
//
// try:
// assert (
// await cast(GlideClusterClient, publishing_client).publish(
// message, channel, sharded=True
// )
// == publish_response
// )
// # allow the message to propagate
// await asyncio.sleep(5)
//
// assert len(callback_messages) == 1
//
// assert callback_messages[0].message == message
// assert callback_messages[0].channel == channel
// assert callback_messages[0].pattern is None
//
// finally:
// if cluster_mode:
// # Since all tests run on the same cluster, when closing the client, garbage collector can be
// called after another test will start running
// # In cluster mode, we check how many subscriptions received the message
// # So to avoid flakiness, we make sure to unsubscribe from the channels
// await listening_client.custom_command(["UNSUBSCRIBE", channel])
}

0 comments on commit d66b9c6

Please sign in to comment.