Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce JedisBroadcast to broadcast commands #3194

Merged
merged 13 commits into from
Dec 7, 2022
25 changes: 25 additions & 0 deletions src/main/java/redis/clients/jedis/BroadcastResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package redis.clients.jedis;

import java.util.function.Supplier;

public class BroadcastResponse<T> implements Supplier<T> {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

private T response = null;
private RuntimeException exception = null;

public BroadcastResponse(T response) {
this.response = response;
}

public BroadcastResponse(RuntimeException exception) {
this.exception = exception;
}

@Override
public T get() {
if (exception != null) {
throw exception;
}
return response;
}
}
130 changes: 130 additions & 0 deletions src/main/java/redis/clients/jedis/JedisBroadcast.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package redis.clients.jedis;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.search.FTCreateParams;
import redis.clients.jedis.search.IndexOptions;
import redis.clients.jedis.search.Schema;
import redis.clients.jedis.search.SearchProtocol.SearchCommand;
import redis.clients.jedis.search.SearchProtocol.SearchKeyword;
import redis.clients.jedis.search.schemafields.SchemaField;
import redis.clients.jedis.util.Pool;

public class JedisBroadcast {

private final ConnectionProvider provider;

public JedisBroadcast(UnifiedJedis jedis) {
this(jedis.provider);
}

public JedisBroadcast(ConnectionProvider provider) {
if (provider == null) {
throw new NullPointerException("ConnectionProvider is null.");
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
}
this.provider = provider;
}

public final <T> Map<?, BroadcastResponse<T>> broadcastCommand(CommandObject<T> commandObject) {
Map<?, ?> connectionMap = provider.getConnectionMap();
Map<Object, BroadcastResponse<T>> responseMap = new HashMap<>(connectionMap.size(), 1f);
for (Map.Entry<? extends Object, ? extends Object> entry : connectionMap.entrySet()) {
Object key = entry.getKey();
Object connection = entry.getValue();
try {
responseMap.put(key, new BroadcastResponse<>(executeCommand(connection, commandObject)));
} catch (RuntimeException re) {
responseMap.put(key, new BroadcastResponse<>(re));
}
}
return responseMap;
}

private <T> T executeCommand(Object connection, CommandObject<T> commandObject) {
if (connection instanceof Connection) {
return ((Connection) connection).executeCommand(commandObject);
}
if (connection instanceof Pool) {
try (Connection _conn = ((Pool<Connection>) connection).getResource()) {
return _conn.executeCommand(commandObject);
}
}
throw new IllegalStateException(connection.getClass() + "is not supported.");
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
}

public Map<?, BroadcastResponse<List<Boolean>>> scriptExists(String... sha1) {
CommandObject<List<Boolean>> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST);
return broadcastCommand(command);
}

public Map<?, BroadcastResponse<List<Boolean>>> scriptExists(byte[]... sha1) {
CommandObject<List<Boolean>> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST);
return broadcastCommand(command);
}

public Map<?, BroadcastResponse<String>> scriptLoad(String script) {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.LOAD).add(script), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, BroadcastResponse<byte[]>> scriptLoad(byte[] script) {
CommandObject<byte[]> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.LOAD).add(script), BuilderFactory.BINARY);
return broadcastCommand(command);
}

public Map<?, BroadcastResponse<String>> scriptFlush() {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.FLUSH), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, BroadcastResponse<String>> scriptFlush(FlushMode flushMode) {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.FLUSH).add(flushMode), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, BroadcastResponse<String>> scriptKill() {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.KILL), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, BroadcastResponse<String>> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) {
CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName)
.addParams(indexOptions).add(SearchKeyword.SCHEMA);
schema.fields.forEach(field -> args.addParams(field));
return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING));
}

public Map<?, BroadcastResponse<String>> ftCreate(String indexName, SchemaField... schemaFields) {
return ftCreate(indexName, Arrays.asList(schemaFields));
}

public Map<?, BroadcastResponse<String>> ftCreate(String indexName, FTCreateParams createParams, SchemaField... schemaFields) {
return ftCreate(indexName, createParams, Arrays.asList(schemaFields));
}

public Map<?, BroadcastResponse<String>> ftCreate(String indexName, Iterable<SchemaField> schemaFields) {
return ftCreate(indexName, FTCreateParams.createParams(), schemaFields);
}

public Map<?, BroadcastResponse<String>> ftCreate(String indexName, FTCreateParams createParams,
Iterable<SchemaField> schemaFields) {
CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName)
.addParams(createParams).add(SearchKeyword.SCHEMA);
schemaFields.forEach(field -> args.addParams(field));
return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,9 @@ public Connection getConnectionFromSlot(int slot) {
}
}
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(getNodes());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package redis.clients.jedis.providers;

import java.util.Collections;
import java.util.Map;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.Connection;

Expand All @@ -8,4 +10,9 @@ public interface ConnectionProvider extends AutoCloseable {
Connection getConnection();

Connection getConnection(CommandArguments args);

default Map<?, ?> getConnectionMap() {
final Connection c = getConnection();
return Collections.singletonMap(c.toString(), c);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package redis.clients.jedis.providers;

import java.util.Collections;
import java.util.Map;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

Expand All @@ -14,18 +16,22 @@
public class PooledConnectionProvider implements ConnectionProvider {

private final Pool<Connection> pool;
private Object connectionMapKey = "";

public PooledConnectionProvider(HostAndPort hostAndPort) {
this(new ConnectionFactory(hostAndPort));
this.connectionMapKey = hostAndPort;
}

public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionPool(hostAndPort, clientConfig));
this.connectionMapKey = hostAndPort;
}

public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
this.connectionMapKey = hostAndPort;
}

public PooledConnectionProvider(PooledObjectFactory<Connection> factory) {
Expand Down Expand Up @@ -59,4 +65,9 @@ public Connection getConnection() {
public Connection getConnection(CommandArguments args) {
return pool.getResource();
}

@Override
public Map<?, Pool<Connection>> getConnectionMap() {
return Collections.singletonMap("", pool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,9 @@ private HostAndPort getNodeFromHash(Long hash) {
}
return tail.get(tail.firstKey());
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(resources);
}
}
14 changes: 14 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -780,4 +781,17 @@ private boolean isAnyNodeHandshaking(Jedis node) {
}
return false;
}

@Test
public void broadcast() {
try (JedisCluster cluster = new JedisCluster(Collections.singleton(new HostAndPort(LOCAL_IP, 7379)),
DefaultJedisClientConfig.builder().password("cluster").build())) {
JedisBroadcast broadcast = new JedisBroadcast(cluster);

Map<?, BroadcastResponse<String>> replies = broadcast.broadcastCommand(new CommandObject<>(
new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING));
assertEquals(3, replies.size());
replies.values().forEach(reply -> assertEquals("PONG", reply.get()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis.clients.jedis.modules.search;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static redis.clients.jedis.util.AssertUtil.assertOK;

import java.util.Map;
import org.junit.BeforeClass;
import org.junit.Test;

import redis.clients.jedis.BroadcastResponse;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisBroadcast;
import redis.clients.jedis.modules.RedisModuleCommandsTestBase;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.search.schemafields.TextField;

public class BroadcastTest extends RedisModuleCommandsTestBase {

private static final String index = "broadcast";

@BeforeClass
public static void prepare() {
RedisModuleCommandsTestBase.prepare();
}
//
// @AfterClass
// public static void tearDown() {
//// RedisModuleCommandsTestBase.tearDown();
// }

@Test
public void broadcast() throws Exception {
final Connection conn = new Connection(hnp);
try (ConnectionProvider provider = new ConnectionProvider() {
@Override
public Connection getConnection() {
return conn;
}

@Override
public Connection getConnection(CommandArguments args) {
return getConnection();
}

@Override
public void close() throws Exception {
conn.close();
}
}) {
JedisBroadcast broadcast = new JedisBroadcast(provider);
Map<?, BroadcastResponse<String>> reply = broadcast.ftCreate(index, TextField.of("t"));
assertEquals(1, reply.size());
assertOK(reply.values().stream().findAny().get().get());
}
assertFalse(conn.isConnected());
}
}