Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster pipelining #1455

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/CommandListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package redis.clients.jedis;

import redis.clients.jedis.commands.ProtocolCommand;

public interface CommandListener {

void afterCommand(Connection conn, ProtocolCommand cmd, final byte[][] args);

}
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Connection implements Closeable {
private SSLSocketFactory sslSocketFactory;
private SSLParameters sslParameters;
private HostnameVerifier hostnameVerifier;
private CommandListener commandListener;

public Connection() {
}
Expand Down Expand Up @@ -108,6 +109,14 @@ public void rollbackTimeout() {
}
}

public void setCommandListener(CommandListener listener) {
this.commandListener = listener;
}

public void clearCommandListener() {
this.commandListener = null;
}

public Connection sendCommand(final ProtocolCommand cmd, final String... args) {
final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < args.length; i++) {
Expand All @@ -124,6 +133,12 @@ public Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
connect();
Protocol.sendCommand(outputStream, cmd, args);

// Record commands sent, for retrying in cluster pipeline.
if (commandListener != null) {
commandListener.afterCommand(this, cmd, args);
}

return this;
} catch (JedisConnectionException ex) {
/*
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, in
public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig) {
super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig);
}
}

public JedisClusterPipeline pipelined() {
return new JedisClusterPipeline(connectionHandler);
}

@Override
public String set(final String key, final String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ public Map<String, JedisPool> getNodes() {
return cache.getNodes();
}

public HostAndPort getSlotNode(int slot) {
return cache.getSlotNode(slot);
}

public void assignSlotToNode(int slot, HostAndPort targetNode) {
cache.assignSlotToNode(slot, targetNode);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password, String clientName) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class JedisClusterInfoCache {
private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
private Map<Integer, HostAndPort> mapping = new HashMap<Integer, HostAndPort>();

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
Expand Down Expand Up @@ -123,6 +124,7 @@ public void renewClusterSlots(Jedis jedis) {
private void discoverClusterSlots(Jedis jedis) {
List<Object> slots = jedis.clusterSlots();
this.slots.clear();
this.mapping.clear();

for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
Expand Down Expand Up @@ -205,6 +207,7 @@ public void assignSlotToNode(int slot, HostAndPort targetNode) {
try {
JedisPool targetPool = setupNodeIfNotExist(targetNode);
slots.put(slot, targetPool);
mapping.put(slot, targetNode);
} finally {
w.unlock();
}
Expand All @@ -216,6 +219,7 @@ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode)
JedisPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
mapping.put(slot, targetNode);
}
} finally {
w.unlock();
Expand Down Expand Up @@ -277,6 +281,7 @@ public void reset() {
}
nodes.clear();
slots.clear();
mapping.clear();
} finally {
w.unlock();
}
Expand All @@ -302,4 +307,13 @@ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
}
return slotNums;
}

public HostAndPort getSlotNode(int slot) {
r.lock();
try {
return mapping.get(slot);
} finally {
r.unlock();
}
}
}
Loading