From 897aff83febfa8144291e3e7fa603abc39bfa1ad Mon Sep 17 00:00:00 2001 From: Calvin Date: Sat, 13 Sep 2014 17:13:39 +0800 Subject: [PATCH] #393 add race condition when pool init and switch --- .../nosql/redis/pool/JedisSentinelPool.java | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java index 07c599837..250dd3cb0 100644 --- a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java @@ -2,8 +2,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; @@ -11,6 +9,7 @@ import org.springside.modules.nosql.redis.JedisTemplate; import org.springside.modules.nosql.redis.JedisTemplate.JedisAction; import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.utils.Threads; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; @@ -33,7 +32,7 @@ public final class JedisSentinelPool extends JedisPool { private String masterName; private JedisPoolConfig masterPoolConfig; private ConnectionInfo masterConnectionInfo; - private CountDownLatch poolInitLock = new CountDownLatch(1); + private AtomicBoolean poolInit = new AtomicBoolean(false); /** * Creates a new instance of JedisSentinelPool. @@ -43,7 +42,7 @@ public final class JedisSentinelPool extends JedisPool { * @param sentinelAddresses Array of HostAndPort to sentinel instances. * @param masterName One sentinel can monitor several redis master-slave pair, use master name to identify them. * @param masterConnectionInfo The the other information like password,timeout. - * @param masterPoolConfig Config of redis pool. + * @param masterPoolConfig Configuration of redis pool. * */ public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, ConnectionInfo masterConnectionInfo, @@ -71,20 +70,32 @@ public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, Con // Start MasterSwitchListener thread ,internal poll will be start in the thread masterSwitchListener = new MasterSwitchListener(); masterSwitchListener.start(); - try { - boolean result = poolInitLock.await(10, TimeUnit.SECONDS); - if (!result) { - logger.warn("jedis pool can't not init after 10 seconds"); - } - } catch (InterruptedException e) { - return; - } + + waitForPoolInit(5000); } public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, JedisPoolConfig masterPoolConfig) { this(sentinelAddresses, masterName, new ConnectionInfo(), masterPoolConfig); } + @Override + public Jedis getResource() { + if (!poolInit.get()) { + waitForPoolInit(1000); + } + return super.getResource(); + } + + private void waitForPoolInit(long mills) { + long startTime = System.currentTimeMillis(); + while (!poolInit.get() && ((System.currentTimeMillis() - startTime) < mills)) { + Threads.sleep(100); + } + if (!poolInit.get()) { + logger.warn("Wait for pool init but timeout"); + } + } + @Override public void destroy() { // shutdown the listener thread @@ -109,7 +120,8 @@ public void destroy() { } protected void destroyInternelPool() { - super.destroy(); + closeInternalPool(); + address = null; connectionInfo = null; internalPool = null; @@ -173,14 +185,15 @@ public void run() { if ((internalPool != null) && isAddressChange(masterAddress)) { logger.info("The internalPool {} had changed, destroy it now.", previousMasterAddress); + poolInit.set(false); destroyInternelPool(); } - if ((internalPool == null) || isAddressChange(masterAddress)) { + if (internalPool == null) { logger.info("The internalPool {} is not init or the address had changed, init it now.", masterAddress); initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig); - poolInitLock.countDown(); + poolInit.set(true); } previousMasterAddress = masterAddress; @@ -281,14 +294,14 @@ public void onMessage(String channel, String message) { String[] switchMasterMsg = message.split(" "); // if the master name equals my master name, destroy the old pool and init a new pool if (masterName.equals(switchMasterMsg[0])) { - destroyInternelPool(); HostAndPort masterAddress = new HostAndPort(switchMasterMsg[3], Integer.parseInt(switchMasterMsg[4])); logger.info("Switch master to " + masterAddress); - + poolInit.set(false); + destroyInternelPool(); initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig); - + poolInit.set(true); previousMasterAddress = masterAddress; } }