Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Address pubsub PR comments #1773

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,12 @@ public void close() throws ExecutionException {

protected static MessageHandler buildMessageHandler(BaseClientConfiguration config) {
if (config.getSubscriptionConfiguration() == null) {
return new MessageHandler(Optional.empty(), Optional.empty(), responseResolver);
return new MessageHandler(Optional.empty(), Optional.empty(), binaryResponseResolver);
}
return new MessageHandler(
config.getSubscriptionConfiguration().getCallback(),
config.getSubscriptionConfiguration().getContext(),
responseResolver);
binaryResponseResolver);
}

protected static ChannelHandler buildChannelHandler(
Expand Down Expand Up @@ -3792,7 +3792,7 @@ public CompletableFuture<Map<String, Object>> lcsIdxWithMatchLen(
}

@Override
public CompletableFuture<String> publish(@NonNull String channel, @NonNull String message) {
public CompletableFuture<String> publish(@NonNull String message, @NonNull String channel) {
return commandManager.submitNewCommand(
Publish,
new String[] {channel, message},
Expand All @@ -3803,6 +3803,19 @@ public CompletableFuture<String> publish(@NonNull String channel, @NonNull Strin
});
}

@Override
public CompletableFuture<String> publish(
@NonNull GlideString message, @NonNull GlideString channel) {
return commandManager.submitNewCommand(
Publish,
new GlideString[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
Expand Down
50 changes: 35 additions & 15 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import static glide.api.BaseClient.OK;
import static glide.api.commands.ServerManagementCommands.VERSION_REDIS_API;
import static glide.api.models.GlideString.gs;
import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING;
Expand Down Expand Up @@ -989,6 +988,39 @@ public CompletableFuture<ClusterValue<Map<String, Map<String, Object>>>> functio
response -> handleFunctionStatsBinaryResponse(response, route instanceof SingleNodeRoute));
}

public CompletableFuture<String> publish(
@NonNull String message, @NonNull String channel, boolean sharded) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
if (!sharded) {
return publish(message, channel);
}

return commandManager.submitNewCommand(
SPublish,
new String[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> publish(
@NonNull GlideString message, @NonNull GlideString channel, boolean sharded) {
if (!sharded) {
return publish(message, channel);
}

return commandManager.submitNewCommand(
SPublish,
new GlideString[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> unwatch(@NonNull Route route) {
return commandManager.submitNewCommand(
Expand Down Expand Up @@ -1040,18 +1072,6 @@ public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions op
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
SPublish,
new String[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String[]> sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
Expand Down Expand Up @@ -1165,8 +1185,8 @@ private void internalClose() {
Logger.log(
Logger.Level.ERROR,
"ClusterScanCursor",
() -> "Error releasing cursor " + cursorHandle + ": " + ex.getMessage());
Logger.log(Logger.Level.ERROR, "ClusterScanCursor", ex);
() -> "Error releasing cursor " + cursorHandle,
ex);
} finally {
// Mark the cursor as closed to avoid double-free (if close() gets called more than once).
isClosed = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.GlideString;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -14,14 +15,29 @@ public interface PubSubBaseCommands {
* Publishes message on pubsub channel.
*
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("The cat said 'meow'!", "announcements").get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String message, String channel);

/**
* Publishes message on pubsub channel.
*
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("announcements", "The cat said 'meow'!").get();
* String response = client.publish(gs("The cat said 'meow'!"), gs("announcements")).get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String channel, String message);
CompletableFuture<String> publish(GlideString message, GlideString channel);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.GlideString;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -11,18 +12,38 @@
public interface PubSubClusterCommands {

/**
* Publishes message on pubsub channel in sharded mode.
* Publishes message on pubsub channel.
*
* @since Redis 7.0 and above.
jduo marked this conversation as resolved.
Show resolved Hide resolved
* @see <a href="https://valkey.io/commands/spublish/">valkey.io</a> for details.
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("The cat said 'meow'!", "announcements", true).get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String message, String channel, boolean sharded);

/**
* Publishes message on pubsub channel.
*
* @since Redis 7.0 and above.
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.spublish("announcements", "The cat said 'meow'!").get();
* String response = client.publish(gs("The cat said 'meow'!"), gs("announcements"), true).get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> spublish(String channel, String message);
CompletableFuture<String> publish(GlideString message, GlideString channel, boolean sharded);
}
52 changes: 36 additions & 16 deletions java/client/src/main/java/glide/api/logging/Logger.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import static glide.ffi.resolvers.LoggerResolver.initInternal;
import static glide.ffi.resolvers.LoggerResolver.logInternal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -178,24 +178,44 @@ public static void log(
*
* @param level The log level of the provided message.
* @param logIdentifier The log identifier should give the log a context.
* @param message The message to log with the exception.
* @param throwable The exception or error to log.
*/
public static void log(
@NonNull Level level, @NonNull String logIdentifier, @NonNull Throwable throwable) {
@NonNull Level level,
@NonNull String logIdentifier,
@NonNull String message,
@NonNull Throwable throwable) {
// TODO: Add the corresponding API to Python and Node.js.
log(level, logIdentifier, () -> message + ": " + prettyPrintException(throwable));
}

/**
* Logs the provided exception or error if the provided log level is lower than the logger level.
*
* @param level The log level of the provided message.
* @param logIdentifier The log identifier should give the log a context.
* @param message The message to log with the exception.
* @param throwable The exception or error to log.
*/
public static void log(
@NonNull Level level,
@NonNull String logIdentifier,
@NonNull Supplier<String> message,
@NonNull Throwable throwable) {
// TODO: Add the corresponding API to Python and Node.js.
log(
level,
logIdentifier,
() -> {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(outputStream)) {
throwable.printStackTrace(printStream);
return printStream.toString();
} catch (IOException e) {
// This can't happen with a ByteArrayOutputStream.
return null;
}
});
log(level, logIdentifier, () -> message.get() + ": " + prettyPrintException(throwable));
}

private static String prettyPrintException(@NonNull Throwable throwable) {
try (StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter)) {
throwable.printStackTrace(printWriter);
return stringWriter.toString();
} catch (IOException e) {
// This can't happen with a ByteArrayOutputStream.
return null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5824,11 +5824,11 @@ public <ArgType> T lcsLen(@NonNull ArgType key1, @NonNull ArgType key2) {
* @implNote ArgType is limited to String or GlideString, any other type will throw
* IllegalArgumentException
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param channel The channel to publish the message on.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @return Command response - The number of clients that received the message.
*/
public <ArgType> T publish(@NonNull ArgType channel, @NonNull ArgType message) {
public <ArgType> T publish(@NonNull ArgType message, @NonNull ArgType channel) {
checkTypeOrThrow(channel);
protobufTransaction.addCommands(
buildCommand(Publish, newArgsBuilder().add(channel).add(message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ protected ClusterTransaction getThis() {
* @implNote ArgType is limited to String or GlideString, any other type will throw
* IllegalArgumentException
* @see <a href="https://valkey.io/commands/publish/">valkey.io</a> for details.
* @param channel The channel to publish the message on.
* @param message The message to publish.
* @param channel The channel to publish the message on.
* @param sharded Indicates that this should be run in sharded mode. Setting <code>sharded</code>
* to <code>true</code> is only applicable with Redis 7.0+.
* @return Command response - The number of clients that received the message.
*/
public <ArgType> ClusterTransaction spublish(@NonNull ArgType channel, @NonNull ArgType message) {
public <ArgType> ClusterTransaction publish(
@NonNull ArgType message, @NonNull ArgType channel, boolean sharded) {
if (!sharded) {
return super.publish(message, channel);
}
checkTypeOrThrow(channel);
protobufTransaction.addCommands(
buildCommand(SPublish, newArgsBuilder().add(channel).add(message)));
Expand Down
6 changes: 4 additions & 2 deletions java/client/src/main/java/glide/api/models/GlideString.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ public String getString() {
return string;
}

assert canConvertToString() : "Value cannot be represented as a string";
return string;
if (canConvertToString()) {
return string;
}
return String.format("Value not convertible to string: byte[] %d", Arrays.hashCode(bytes));
}

public int compareTo(GlideString o) {
Expand Down
10 changes: 5 additions & 5 deletions java/client/src/main/java/glide/api/models/PubSubMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
@EqualsAndHashCode
public class PubSubMessage {
/** An incoming message received. */
private final String message;
private final GlideString message;

/** A name of the originating channel. */
private final String channel;
private final GlideString channel;

/** A pattern matched to the channel name. */
private final Optional<String> pattern;
private final Optional<GlideString> pattern;

public PubSubMessage(String message, String channel, String pattern) {
public PubSubMessage(GlideString message, GlideString channel, GlideString pattern) {
this.message = message;
this.channel = channel;
this.pattern = Optional.ofNullable(pattern);
}

public PubSubMessage(String message, String channel) {
public PubSubMessage(GlideString message, GlideString channel) {
this.message = message;
this.channel = channel;
this.pattern = Optional.empty();
Expand Down
Loading
Loading