Skip to content

Commit

Permalink
Java: Address pubsub PR comments (valkey-io#1773)
Browse files Browse the repository at this point in the history
* Remove spublish and provide two overloads for publish

* Change publish argument order to match Python

* Explicitly prevent the context being non-null and the callback being null

* Parameterize three_publishing_clients_same_name_with_sharded

* Address PR feedback

* Add GlideString versions of pubsub commands

Also add GlideString version of OK constant (named TOK from Python)

* PubSubMessages should use GlideStrings instead of String

* Spotless

* Address PR comments

* Use GlideString for MessageHandler responses
* Have GlideString overloads for PubSub comamnds return String instead of GlideString OKs.

* Have GlideString log byte array hash

* Add pubsub MaxSize integration tests (disabled)

Add the following tests from Python:
* pubsub_exact_max_size_message
* pubsub_sharded_max_size_message
* pubsub_exact_max_size_message_callback
* pubsub_sharded_max_size_message_callback

* Improve exception logging.

Supply a message with the exception being logged and fix the output when printing an exception

* Add exception-from-callback test

* Make the callback exception error handling test pass

* PubSub Test fixes

* Senders do not need subscriptions/callbacks.
* Correct validating the pattern by checking that the optional is empty instead of null.
* Use 512MB messages for max-size tests
* Make client cleanup match the other tests

* Add pub-sub testing with non-UTF-8 data

* Simplify the non-String PubSub test

* Remove stale TODOs

* Change pubsub callback exception handling to pass through Netty

Exceptions from the callback now escape the netty handler.
This will get Netty to log it.
We also explicitly log the the Glide log and System.err
  • Loading branch information
jduo authored and cyip10 committed Jul 16, 2024
1 parent 669bdd7 commit af7ae4e
Show file tree
Hide file tree
Showing 26 changed files with 674 additions and 276 deletions.
19 changes: 16 additions & 3 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,12 @@ public void close() throws ExecutionException {

protected static MessageHandler buildMessageHandler(BaseClientConfiguration config) {
if (config.getSubscriptionConfiguration() == null) {
return new MessageHandler(Optional.empty(), Optional.empty(), responseResolver);
return new MessageHandler(Optional.empty(), Optional.empty(), binaryResponseResolver);
}
return new MessageHandler(
config.getSubscriptionConfiguration().getCallback(),
config.getSubscriptionConfiguration().getContext(),
responseResolver);
binaryResponseResolver);
}

protected static ChannelHandler buildChannelHandler(
Expand Down Expand Up @@ -3846,7 +3846,7 @@ public CompletableFuture<Map<String, Object>> lcsIdxWithMatchLen(
}

@Override
public CompletableFuture<String> publish(@NonNull String channel, @NonNull String message) {
public CompletableFuture<String> publish(@NonNull String message, @NonNull String channel) {
return commandManager.submitNewCommand(
Publish,
new String[] {channel, message},
Expand All @@ -3857,6 +3857,19 @@ public CompletableFuture<String> publish(@NonNull String channel, @NonNull Strin
});
}

@Override
public CompletableFuture<String> publish(
@NonNull GlideString message, @NonNull GlideString channel) {
return commandManager.submitNewCommand(
Publish,
new GlideString[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
Expand Down
50 changes: 35 additions & 15 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import static glide.api.BaseClient.OK;
import static glide.api.commands.ServerManagementCommands.VERSION_REDIS_API;
import static glide.api.models.GlideString.gs;
import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING;
Expand Down Expand Up @@ -989,6 +988,39 @@ public CompletableFuture<ClusterValue<Map<String, Map<String, Object>>>> functio
response -> handleFunctionStatsBinaryResponse(response, route instanceof SingleNodeRoute));
}

public CompletableFuture<String> publish(
@NonNull String message, @NonNull String channel, boolean sharded) {
if (!sharded) {
return publish(message, channel);
}

return commandManager.submitNewCommand(
SPublish,
new String[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> publish(
@NonNull GlideString message, @NonNull GlideString channel, boolean sharded) {
if (!sharded) {
return publish(message, channel);
}

return commandManager.submitNewCommand(
SPublish,
new GlideString[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> unwatch(@NonNull Route route) {
return commandManager.submitNewCommand(
Expand Down Expand Up @@ -1040,18 +1072,6 @@ public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions op
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
SPublish,
new String[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String[]> sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
Expand Down Expand Up @@ -1165,8 +1185,8 @@ private void internalClose() {
Logger.log(
Logger.Level.ERROR,
"ClusterScanCursor",
() -> "Error releasing cursor " + cursorHandle + ": " + ex.getMessage());
Logger.log(Logger.Level.ERROR, "ClusterScanCursor", ex);
() -> "Error releasing cursor " + cursorHandle,
ex);
} finally {
// Mark the cursor as closed to avoid double-free (if close() gets called more than once).
isClosed = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.GlideString;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -14,14 +15,29 @@ public interface PubSubBaseCommands {
* Publishes message on pubsub channel.
*
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("The cat said 'meow'!", "announcements").get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String message, String channel);

/**
* Publishes message on pubsub channel.
*
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("announcements", "The cat said 'meow'!").get();
* String response = client.publish(gs("The cat said 'meow'!"), gs("announcements")).get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String channel, String message);
CompletableFuture<String> publish(GlideString message, GlideString channel);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.GlideString;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -11,18 +12,38 @@
public interface PubSubClusterCommands {

/**
* Publishes message on pubsub channel in sharded mode.
* Publishes message on pubsub channel.
*
* @since Redis 7.0 and above.
* @see <a href="https://valkey.io/commands/spublish/">valkey.io</a> for details.
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("The cat said 'meow'!", "announcements", true).get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String message, String channel, boolean sharded);

/**
* Publishes message on pubsub channel.
*
* @since Redis 7.0 and above.
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.spublish("announcements", "The cat said 'meow'!").get();
* String response = client.publish(gs("The cat said 'meow'!"), gs("announcements"), true).get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> spublish(String channel, String message);
CompletableFuture<String> publish(GlideString message, GlideString channel, boolean sharded);
}
52 changes: 36 additions & 16 deletions java/client/src/main/java/glide/api/logging/Logger.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import static glide.ffi.resolvers.LoggerResolver.initInternal;
import static glide.ffi.resolvers.LoggerResolver.logInternal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -178,24 +178,44 @@ public static void log(
*
* @param level The log level of the provided message.
* @param logIdentifier The log identifier should give the log a context.
* @param message The message to log with the exception.
* @param throwable The exception or error to log.
*/
public static void log(
@NonNull Level level, @NonNull String logIdentifier, @NonNull Throwable throwable) {
@NonNull Level level,
@NonNull String logIdentifier,
@NonNull String message,
@NonNull Throwable throwable) {
// TODO: Add the corresponding API to Python and Node.js.
log(level, logIdentifier, () -> message + ": " + prettyPrintException(throwable));
}

/**
* Logs the provided exception or error if the provided log level is lower than the logger level.
*
* @param level The log level of the provided message.
* @param logIdentifier The log identifier should give the log a context.
* @param message The message to log with the exception.
* @param throwable The exception or error to log.
*/
public static void log(
@NonNull Level level,
@NonNull String logIdentifier,
@NonNull Supplier<String> message,
@NonNull Throwable throwable) {
// TODO: Add the corresponding API to Python and Node.js.
log(
level,
logIdentifier,
() -> {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(outputStream)) {
throwable.printStackTrace(printStream);
return printStream.toString();
} catch (IOException e) {
// This can't happen with a ByteArrayOutputStream.
return null;
}
});
log(level, logIdentifier, () -> message.get() + ": " + prettyPrintException(throwable));
}

private static String prettyPrintException(@NonNull Throwable throwable) {
try (StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter)) {
throwable.printStackTrace(printWriter);
return stringWriter.toString();
} catch (IOException e) {
// This can't happen with a ByteArrayOutputStream.
return null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5937,11 +5937,11 @@ public <ArgType> T lcsLen(@NonNull ArgType key1, @NonNull ArgType key2) {
* @implNote ArgType is limited to String or GlideString, any other type will throw
* IllegalArgumentException
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param channel The channel to publish the message on.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @return Command response - The number of clients that received the message.
*/
public <ArgType> T publish(@NonNull ArgType channel, @NonNull ArgType message) {
public <ArgType> T publish(@NonNull ArgType message, @NonNull ArgType channel) {
checkTypeOrThrow(channel);
protobufTransaction.addCommands(
buildCommand(Publish, newArgsBuilder().add(channel).add(message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ protected ClusterTransaction getThis() {
* @implNote ArgType is limited to String or GlideString, any other type will throw
* IllegalArgumentException
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param channel The channel to publish the message on.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return Command response - The number of clients that received the message.
*/
public <ArgType> ClusterTransaction spublish(@NonNull ArgType channel, @NonNull ArgType message) {
public <ArgType> ClusterTransaction publish(
@NonNull ArgType message, @NonNull ArgType channel, boolean sharded) {
if (!sharded) {
return super.publish(message, channel);
}
checkTypeOrThrow(channel);
protobufTransaction.addCommands(
buildCommand(SPublish, newArgsBuilder().add(channel).add(message)));
Expand Down
6 changes: 4 additions & 2 deletions java/client/src/main/java/glide/api/models/GlideString.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ public String getString() {
return string;
}

assert canConvertToString() : "Value cannot be represented as a string";
return string;
if (canConvertToString()) {
return string;
}
return String.format("Value not convertible to string: byte[] %d", Arrays.hashCode(bytes));
}

public int compareTo(GlideString o) {
Expand Down
10 changes: 5 additions & 5 deletions java/client/src/main/java/glide/api/models/PubSubMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
@EqualsAndHashCode
public class PubSubMessage {
/** An incoming message received. */
private final String message;
private final GlideString message;

/** A name of the originating channel. */
private final String channel;
private final GlideString channel;

/** A pattern matched to the channel name. */
private final Optional<String> pattern;
private final Optional<GlideString> pattern;

public PubSubMessage(String message, String channel, String pattern) {
public PubSubMessage(GlideString message, GlideString channel, GlideString pattern) {
this.message = message;
this.channel = channel;
this.pattern = Optional.ofNullable(pattern);
}

public PubSubMessage(String message, String channel) {
public PubSubMessage(GlideString message, GlideString channel) {
this.message = message;
this.channel = channel;
this.pattern = Optional.empty();
Expand Down
Loading

0 comments on commit af7ae4e

Please sign in to comment.