diff --git a/src/main/java/redis/clients/jedis/CommandListener.java b/src/main/java/redis/clients/jedis/CommandListener.java new file mode 100644 index 0000000000..712fc2a8f1 --- /dev/null +++ b/src/main/java/redis/clients/jedis/CommandListener.java @@ -0,0 +1,9 @@ +package redis.clients.jedis; + +import redis.clients.jedis.commands.ProtocolCommand; + +public interface CommandListener { + + void afterCommand(Connection conn, ProtocolCommand cmd, final byte[][] args); + +} diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 28966aaf71..0aadf3d709 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -37,6 +37,7 @@ public class Connection implements Closeable { private SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; + private CommandListener commandListener; public Connection() { } @@ -108,6 +109,14 @@ public void rollbackTimeout() { } } + public void setCommandListener(CommandListener listener) { + this.commandListener = listener; + } + + public void clearCommandListener() { + this.commandListener = null; + } + public Connection sendCommand(final ProtocolCommand cmd, final String... args) { final byte[][] bargs = new byte[args.length][]; for (int i = 0; i < args.length; i++) { @@ -124,6 +133,12 @@ public Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { connect(); Protocol.sendCommand(outputStream, cmd, args); + + // Record commands sent, for retrying in cluster pipeline. + if (commandListener != null) { + commandListener.afterCommand(this, cmd, args); + } + return this; } catch (JedisConnectionException ex) { /* diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 6a110ef023..6bba752624 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -105,7 +105,11 @@ public JedisCluster(Set jedisClusterNode, int connectionTimeout, in public JedisCluster(Set jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig); -} + } + + public JedisClusterPipeline pipelined() { + return new JedisClusterPipeline(connectionHandler); + } @Override public String set(final String key, final String value) { diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index ca764ecd5a..6317bc9559 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -34,6 +34,14 @@ public Map getNodes() { return cache.getNodes(); } + public HostAndPort getSlotNode(int slot) { + return cache.getSlotNode(slot); + } + + public void assignSlotToNode(int slot, HostAndPort targetNode) { + cache.assignSlotToNode(slot, targetNode); + } + private void initializeSlotsCache(Set startNodes, GenericObjectPoolConfig poolConfig, String password, String clientName) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index 462a83d643..1565081729 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -21,6 +21,7 @@ public class JedisClusterInfoCache { private final Map nodes = new HashMap(); private final Map slots = new HashMap(); + private Map mapping = new HashMap(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); @@ -123,6 +124,7 @@ public void renewClusterSlots(Jedis jedis) { private void discoverClusterSlots(Jedis jedis) { List slots = jedis.clusterSlots(); this.slots.clear(); + this.mapping.clear(); for (Object slotInfoObj : slots) { List slotInfo = (List) slotInfoObj; @@ -205,6 +207,7 @@ public void assignSlotToNode(int slot, HostAndPort targetNode) { try { JedisPool targetPool = setupNodeIfNotExist(targetNode); slots.put(slot, targetPool); + mapping.put(slot, targetNode); } finally { w.unlock(); } @@ -216,6 +219,7 @@ public void assignSlotsToNode(List targetSlots, HostAndPort targetNode) JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); + mapping.put(slot, targetNode); } } finally { w.unlock(); @@ -277,6 +281,7 @@ public void reset() { } nodes.clear(); slots.clear(); + mapping.clear(); } finally { w.unlock(); } @@ -302,4 +307,13 @@ private List getAssignedSlotArray(List slotInfo) { } return slotNums; } + + public HostAndPort getSlotNode(int slot) { + r.lock(); + try { + return mapping.get(slot); + } finally { + r.unlock(); + } + } } diff --git a/src/main/java/redis/clients/jedis/JedisClusterPipeline.java b/src/main/java/redis/clients/jedis/JedisClusterPipeline.java new file mode 100644 index 0000000000..cc24122a1c --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisClusterPipeline.java @@ -0,0 +1,333 @@ +package redis.clients.jedis; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import redis.clients.jedis.commands.ProtocolCommand; +import redis.clients.jedis.exceptions.JedisAskDataException; +import redis.clients.jedis.exceptions.JedisClusterException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.exceptions.JedisMovedDataException; +import redis.clients.util.JedisClusterCRC16; + +/** + * This implementation is based on the facts that class "redis.clients.jedis.PipelineBase" is + * implemented in this way. + * + *
+ * public Response get(String key) {
+ *   getClient(key).get(key);
+ *   return getResponse(BuilderFactory.STRING);
+ * }
+ * 
+ * + * All operations defined in "PipelineBase" will call "getClient" and then "getResponse". So we keep + * states in class members and share among these methods. The operation is recorded and will be + * replayed when retrying. + *

+ * Adding cluster pipelining support need just small changes on other classes. + */ +public class JedisClusterPipeline extends PipelineBase implements CommandListener { + + private static final int MAX_PIPELINE_RETRIES = 20; + + private JedisClusterConnectionHandler connectionHandler; + private Map connectionCacheByNode = new HashMap(); + + private List cmds = new ArrayList(); + private List responses = Collections.emptyList(); + + private boolean isRecording = true; + private PipelineCommand lastCommand = null; + private Jedis lastConnection = null; + private HostAndPort lastSlotNode = null; + private int lastSlot = -1; + + private int counterOfAsking = 0; + private int counterOfMoving = 0; + + public int getCounterOfAsking() { + return counterOfAsking; + } + + public int getCounterOfMoving() { + return counterOfMoving; + } + + public JedisClusterPipeline(JedisClusterConnectionHandler connectionHandler) { + this.connectionHandler = connectionHandler; + } + + @Override + public void afterCommand(Connection conn, ProtocolCommand cmd, byte[][] args) { + if (isRecording) { + if (lastConnection.getClient() != conn) { + throw new JedisClusterException("Pipeline state error"); + } + + // Cache commands sent, re-send them if needed. + PipelineCommand operation = new PipelineCommand(); + operation.setConnection(lastConnection); + operation.setClient((Client) conn); + operation.setNode(lastSlotNode); + operation.setCommand(cmd); + operation.setArgs(args); + + // Use the slot in last call on getClient, because every operation requires a client + // for example: getClient(key).decr(key); + operation.setSlot(lastSlot); + lastCommand = operation; + } + } + + @Override + protected Client getClient(String key) { + if (lastConnection != null) { + throw new JedisClusterException("Pipeline state error"); + } + + int slot = JedisClusterCRC16.getSlot(key); + HostAndPort node = connectionHandler.getSlotNode(slot); + Jedis connection = connectionCacheByNode.get(node); + if (connection == null) { + connection = connectionHandler.getConnectionFromNode(node); + connectionCacheByNode.put(node, connection); + } + + lastConnection = connection; + lastSlotNode = node; + lastSlot = slot; + + Client result = connection.getClient(); + result.setCommandListener(this); + return result; + } + + @Override + protected Client getClient(byte[] key) { + if (lastConnection != null) { + throw new JedisClusterException("Pipeline state error"); + } + + int slot = JedisClusterCRC16.getSlot(key); + HostAndPort node = connectionHandler.getSlotNode(slot); + Jedis connection = connectionCacheByNode.get(node); + if (connection == null) { + connection = connectionHandler.getConnectionFromNode(node); + connectionCacheByNode.put(node, connection); + } + + lastConnection = connection; + lastSlotNode = node; + lastSlot = slot; + + Client result = connection.getClient(); + result.setCommandListener(this); + return result; + } + + private Jedis getConnection(HostAndPort node) { + Jedis connection = connectionCacheByNode.get(node); + if (connection == null) { + connection = connectionHandler.getConnectionFromNode(node); + connectionCacheByNode.put(node, connection); + } + + Client result = connection.getClient(); + result.setCommandListener(this); + return connection; + } + + @Override + protected void clean() { + throw new UnsupportedOperationException(); + } + + @Override + protected Response generateResponse(Object data) { + throw new UnsupportedOperationException(); + } + + @Override + protected Response getResponse(Builder builder) { + Response result = new Response(builder); + + if (isRecording && lastCommand != null) { + PipelineCommand cmd = lastCommand; + cmd.setResp(result); + cmds.add(cmd); + + lastCommand = null; + lastConnection = null; + lastSlotNode = null; + lastSlot = -1; + + return result; + } + + throw new IllegalStateException("Must be called after sendCommand"); + } + + @Override + protected boolean hasPipelinedResponse() { + return !cmds.isEmpty(); + } + + @Override + protected int getPipelinedResponseLength() { + return cmds.size(); + } + + protected void syncImpl() { + List retries = new ArrayList<>(cmds.size()); + List operations = new ArrayList<>(cmds.size()); + + operations.addAll(cmds); + + int retriedTimes = 0; + while (retriedTimes < MAX_PIPELINE_RETRIES) { + retriedTimes++; + + // All commands have been sent + for (PipelineCommand operation : operations) { + Client client = operation.getClient(); + + try { + if (operation.isAsking()) { + // Read response of asking, should be always "OK" + client.getOne(); + } + + // Reset asking flag + operation.setAsking(false); + + // Read responses for each command + Object data = client.getOne(); + operation.getResp().set(data); + } catch (JedisMovedDataException e) { + counterOfMoving++; + + // if moved message received, update slots mapping + int slot = e.getSlot(); + HostAndPort node = e.getTargetNode(); + + Jedis connection = getConnection(node); + operation.setClient(connection.getClient()); + operation.setConnection(connection); + operation.setNode(node); + + // update slot-node mapping + connectionHandler.assignSlotToNode(slot, node); + + // Will retry later + retries.add(operation); + } catch (JedisAskDataException e) { + counterOfAsking++; + + // if asked message received, send asking before next retrying, + // but do not cache and update slots mapping + operation.setAsking(true); + HostAndPort node = e.getTargetNode(); + + Jedis connection = getConnection(node); + operation.setClient(connection.getClient()); + operation.setConnection(connection); + operation.setNode(node); + + // Will retry later + retries.add(operation); + } catch (JedisException e) { + operation.setError(e); + + // No more retries, connection may corrupt + releaseConnection(operation.getConnection()); + } + } + + // All commands are completed + if (retries.isEmpty()) { + break; + } + + // Re-send commands for redirection + for (PipelineCommand operation : retries) { + Client client = operation.getClient(); + if (operation.isAsking()) { + client.asking(); + } + + ProtocolCommand cmd = operation.getCommand(); + byte[][] args = operation.getArgs(); + client.sendCommand(cmd, args); + } + + operations.clear(); + operations.addAll(retries); + retries.clear(); + } + } + + private void releaseConnection(Jedis connection) { + if (connection != null) { + Client client = connection.getClient(); + client.clearCommandListener(); + connection.close(); + } + } + + /** + * Release all used connections. + */ + private void reset() { + Collection connectionList = connectionCacheByNode.values(); + for (Jedis connection : connectionList) { + releaseConnection(connection); + } + + cmds = new ArrayList(); + connectionCacheByNode.clear(); + lastCommand = null; + lastConnection = null; + lastSlot = -1; + } + + /** + * Synchronise pipeline by reading all responses. This operation closes the pipeline. In order to + * get return values from pipelined commands, capture the different Response<?> of the + * commands you execute. + */ + public void sync() { + try { + isRecording = false; + counterOfMoving = 0; + counterOfAsking = 0; + + syncImpl(); + } finally { + // Pipeline could be used more than once. + responses = cmds; + isRecording = true; + reset(); + } + } + + public List getResults() { + List result = new ArrayList(); + if (responses != null) { + for (PipelineCommand cmd : responses) { + result.add(cmd.getResp().get()); + } + } + + return result; + } + + public List syncAndReturnAll() { + sync(); + return getResults(); + } +} diff --git a/src/main/java/redis/clients/jedis/PipelineCommand.java b/src/main/java/redis/clients/jedis/PipelineCommand.java new file mode 100644 index 0000000000..ff8dbd413e --- /dev/null +++ b/src/main/java/redis/clients/jedis/PipelineCommand.java @@ -0,0 +1,98 @@ +package redis.clients.jedis; + +import redis.clients.jedis.commands.ProtocolCommand; +import redis.clients.jedis.exceptions.JedisException; + +public class PipelineCommand { + + private Jedis connection; + + private Client client; + + private Response resp; + + private ProtocolCommand cmd; + + private byte[][] args; + + private HostAndPort node; + + private boolean asking = false; + + private int slot = -1; + + private JedisException error; + + public Jedis getConnection() { + return connection; + } + + public void setConnection(Jedis connection) { + this.connection = connection; + } + + public Client getClient() { + return client; + } + + public void setClient(Client client) { + this.client = client; + } + + public Response getResp() { + return resp; + } + + public void setResp(Response resp) { + this.resp = resp; + } + + public ProtocolCommand getCommand() { + return cmd; + } + + public void setCommand(ProtocolCommand cmd) { + this.cmd = cmd; + } + + public HostAndPort getNode() { + return node; + } + + public void setNode(HostAndPort node) { + this.node = node; + } + + public byte[][] getArgs() { + return args; + } + + public void setArgs(byte[][] args) { + this.args = args; + } + + public boolean isAsking() { + return asking; + } + + public void setAsking(boolean asking) { + this.asking = asking; + } + + public int getSlot() { + return slot; + } + + public void setSlot(int slot) { + this.slot = slot; + } + + public JedisException getError() { + return error; + } + + public void setError(JedisException error) { + this.error = error; + } + +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterPipelineTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterPipelineTest.java new file mode 100644 index 0000000000..a383ff8cc1 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterPipelineTest.java @@ -0,0 +1,308 @@ +package redis.clients.jedis.tests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.logging.Logger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisCluster.Reset; +import redis.clients.jedis.JedisClusterPipeline; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.Response; +import redis.clients.jedis.Tuple; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.tests.utils.JedisClusterTestUtil; + +public class JedisClusterPipelineTest { + private static Jedis node1; + private static Jedis node2; + private static Jedis node3; + private static Jedis node4; + private static Jedis nodeSlave2; + private String localHost = "127.0.0.1"; + + private static final int DEFAULT_TIMEOUT = 2000; + private static final int DEFAULT_REDIRECTIONS = 5; + private static final JedisPoolConfig DEFAULT_CONFIG = new JedisPoolConfig(); + + private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0); + private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1); + private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2); + private HostAndPort nodeInfo4 = HostAndPortUtil.getClusterServers().get(3); + private HostAndPort nodeInfoSlave2 = HostAndPortUtil.getClusterServers().get(4); + protected Logger log = Logger.getLogger(getClass().getName()); + + @Before + public void setUp() throws Exception { + node1 = new Jedis(nodeInfo1.getHost(), nodeInfo1.getPort()); + node1.auth("cluster"); + node1.flushAll(); + + node2 = new Jedis(nodeInfo2.getHost(), nodeInfo2.getPort()); + node2.auth("cluster"); + node2.flushAll(); + + node3 = new Jedis(nodeInfo3.getHost(), nodeInfo3.getPort()); + node3.auth("cluster"); + node3.flushAll(); + + node4 = new Jedis(nodeInfo4.getHost(), nodeInfo4.getPort()); + node4.auth("cluster"); + node4.flushAll(); + + nodeSlave2 = new Jedis(nodeInfoSlave2.getHost(), nodeInfoSlave2.getPort()); + nodeSlave2.auth("cluster"); + nodeSlave2.flushAll(); + // ---- configure cluster + + // add nodes to cluster + node1.clusterMeet(localHost, nodeInfo2.getPort()); + node1.clusterMeet(localHost, nodeInfo3.getPort()); + + // split available slots across the three nodes + int slotsPerNode = JedisCluster.HASHSLOTS / 3; + int[] node1Slots = new int[slotsPerNode]; + int[] node2Slots = new int[slotsPerNode + 1]; + int[] node3Slots = new int[slotsPerNode]; + for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0; i < JedisCluster.HASHSLOTS; i++) { + if (i < slotsPerNode) { + node1Slots[slot1++] = i; + } else if (i > slotsPerNode * 2) { + node3Slots[slot3++] = i; + } else { + node2Slots[slot2++] = i; + } + } + + node1.clusterAddSlots(node1Slots); + node2.clusterAddSlots(node2Slots); + node3.clusterAddSlots(node3Slots); + + JedisClusterTestUtil.waitForClusterReady(node1, node2, node3); + } + + @AfterClass + public static void cleanUp() { + node1.flushDB(); + node2.flushDB(); + node3.flushDB(); + node4.flushDB(); + node1.clusterReset(Reset.SOFT); + node2.clusterReset(Reset.SOFT); + node3.clusterReset(Reset.SOFT); + node4.clusterReset(Reset.SOFT); + } + + @After + public void tearDown() throws InterruptedException { + cleanUp(); + } + + @Test + public void pipeline() throws UnsupportedEncodingException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + String clientName = "myAppName"; + JedisCluster jedis = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, + DEFAULT_REDIRECTIONS, "cluster", clientName, DEFAULT_CONFIG); + + try { + JedisClusterPipeline p = jedis.pipelined(); + p.set("foo", "bar"); + p.get("foo"); + List results = p.syncAndReturnAll(); + + assertEquals(2, results.size()); + assertEquals("OK", results.get(0)); + assertEquals("bar", results.get(1)); + } finally { + jedis.close(); + } + } + + @Test + public void pipelineResponse() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + String clientName = "myAppName"; + JedisCluster jedis = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, + DEFAULT_REDIRECTIONS, "cluster", clientName, DEFAULT_CONFIG); + + try { + jedis.set("string", "foo"); + jedis.lpush("list", "foo"); + jedis.hset("hash", "foo", "bar"); + jedis.zadd("zset", 1, "foo"); + jedis.sadd("set", "foo"); + + JedisClusterPipeline p = jedis.pipelined(); + Response string = p.get("string"); + Response del = p.del("string"); + Response emptyString = p.get("string"); + Response list = p.lpop("list"); + Response hash = p.hget("hash", "foo"); + Response> zset = p.zrange("zset", 0, -1); + Response set = p.spop("set"); + Response blist = p.exists("list"); + Response zincrby = p.zincrby("zset", 1, "foo"); + Response zcard = p.zcard("zset"); + p.lpush("list", "bar"); + Response> lrange = p.lrange("list", 0, -1); + Response> hgetAll = p.hgetAll("hash"); + p.sadd("set", "foo"); + Response> smembers = p.smembers("set"); + Response> zrangeWithScores = p.zrangeWithScores("zset", 0, -1); + p.sync(); + + assertEquals("foo", string.get()); + assertEquals(Long.valueOf(1), del.get()); + assertNull(emptyString.get()); + assertEquals("foo", list.get()); + assertEquals("bar", hash.get()); + assertEquals("foo", zset.get().iterator().next()); + assertEquals("foo", set.get()); + assertFalse(blist.get()); + assertEquals(Double.valueOf(2), zincrby.get()); + assertEquals(Long.valueOf(1), zcard.get()); + assertEquals(1, lrange.get().size()); + assertNotNull(hgetAll.get().get("foo")); + assertEquals(1, smembers.get().size()); + assertEquals(1, zrangeWithScores.get().size()); + } finally { + jedis.close(); + } + } + + @Test(expected = JedisDataException.class) + public void pipelineResponseWithinPipeline() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + String clientName = "myAppName"; + JedisCluster jedis = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, + DEFAULT_REDIRECTIONS, "cluster", clientName, DEFAULT_CONFIG); + + try { + jedis.set("string", "foo"); + + JedisClusterPipeline p = jedis.pipelined(); + Response string = p.get("string"); + string.get(); + p.sync(); + } finally { + jedis.close(); + } + } + + @Test + public void canRetrieveUnsetKey() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + String clientName = "myAppName"; + JedisCluster jedis = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, + DEFAULT_REDIRECTIONS, "cluster", clientName, DEFAULT_CONFIG); + + try { + JedisClusterPipeline p = jedis.pipelined(); + Response shouldNotExist = p.get(UUID.randomUUID().toString()); + p.sync(); + assertNull(shouldNotExist.get()); + } finally { + jedis.close(); + } + } + + @Test + public void testSyncWithNoCommandQueued() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + String clientName = "myAppName"; + JedisCluster jedis = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, + DEFAULT_REDIRECTIONS, "cluster", clientName, DEFAULT_CONFIG); + + try { + JedisClusterPipeline pipeline = jedis.pipelined(); + pipeline.sync(); + + pipeline = jedis.pipelined(); + List resp = pipeline.syncAndReturnAll(); + assertTrue(resp.isEmpty()); + } finally { + jedis.close(); + } + } + + /** + * slot->nodes 15363 node3 e + */ + @Test + public void testMigrating() { + log.info("test migrate slot"); + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(nodeInfo1); + JedisCluster jedis = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, + DEFAULT_REDIRECTIONS, "cluster", DEFAULT_CONFIG); + + try { + JedisClusterPipeline pipeline = jedis.pipelined(); + + // Ensure the key "e" does not exist + jedis.del("e"); + + String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes()); + String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes()); + node3.clusterSetSlotMigrating(15363, node2Id); + node2.clusterSetSlotImporting(15363, node3Id); + + pipeline.set("e", "e1"); + pipeline.sync(); + assertEquals(1, pipeline.getCounterOfAsking()); + + Response e1Response = pipeline.get("e"); + pipeline.sync(); + assertEquals("e1", e1Response.get()); + assertEquals(1, pipeline.getCounterOfAsking()); + + pipeline.del("e"); + pipeline.sync(); + assertEquals(1, pipeline.getCounterOfAsking()); + + // Ensure the key "e" does not exist + jedis.del("e"); + + node2.clusterSetSlotNode(15363, node2Id); + node3.clusterSetSlotNode(15363, node2Id); + + pipeline.set("e", "e2"); + pipeline.sync(); + assertEquals(1, pipeline.getCounterOfMoving()); + assertEquals(0, pipeline.getCounterOfAsking()); + + // Then the slot mapping cache has been updated + Response e2Response = pipeline.get("e"); + pipeline.sync(); + assertEquals("e2", e2Response.get()); + assertEquals(0, pipeline.getCounterOfMoving()); + assertEquals(0, pipeline.getCounterOfAsking()); + } finally { + jedis.close(); + } + } + +}