diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java index 5d492a3a4..7c9dc5a40 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java @@ -30,14 +30,24 @@ public AbstractValueDecoder getDecoder(int identityNumber) { return decoderMap.get(identityNumber); } - public synchronized void register(int identityNumber, AbstractValueDecoder decoder) { - decoderMap.put(identityNumber, decoder); - inited = true; + public void register(int identityNumber, AbstractValueDecoder decoder) { + reentrantLock.lock(); + try { + decoderMap.put(identityNumber, decoder); + inited = true; + }finally { + reentrantLock.unlock(); + } } - public synchronized void clear() { - decoderMap.clear(); - inited = true; + public void clear() { + reentrantLock.lock(); + try { + decoderMap.clear(); + inited = true; + }finally { + reentrantLock.unlock(); + } } public void initDefaultDecoder() { diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java index 7b10c7e5d..1e94f7576 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; /** * Created on 2016/10/27. @@ -28,6 +29,7 @@ public class DefaultCacheMonitor implements CacheMonitor { private static final Logger logger = LoggerFactory.getLogger(DefaultCacheMonitor.class); + private final ReentrantLock reentrantLock = new ReentrantLock(); protected CacheStat cacheStat; private String cacheName; @@ -43,44 +45,59 @@ public String getCacheName() { return cacheName; } - public synchronized void resetStat() { - cacheStat = new CacheStat(); - cacheStat.setStatStartTime(System.currentTimeMillis()); - cacheStat.setCacheName(cacheName); + public void resetStat() { + reentrantLock.lock(); + try { + cacheStat = new CacheStat(); + cacheStat.setStatStartTime(System.currentTimeMillis()); + cacheStat.setCacheName(cacheName); + }finally { + reentrantLock.unlock(); + } } - public synchronized CacheStat getCacheStat() { - CacheStat stat = cacheStat.clone(); - stat.setStatEndTime(System.currentTimeMillis()); - return stat; + public CacheStat getCacheStat() { + reentrantLock.lock(); + try { + CacheStat stat = cacheStat.clone(); + stat.setStatEndTime(System.currentTimeMillis()); + return stat; + }finally { + reentrantLock.unlock(); + } } @Override - public synchronized void afterOperation(CacheEvent event) { - if (event instanceof CacheGetEvent) { - CacheGetEvent e = (CacheGetEvent) event; - afterGet(e.getMillis(), e.getKey(), e.getResult()); - } else if (event instanceof CachePutEvent) { - CachePutEvent e = (CachePutEvent) event; - afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult()); - } else if (event instanceof CacheRemoveEvent) { - CacheRemoveEvent e = (CacheRemoveEvent) event; - afterRemove(e.getMillis(), e.getKey(), e.getResult()); - } else if (event instanceof CacheLoadEvent) { - CacheLoadEvent e = (CacheLoadEvent) event; - afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess()); - } else if (event instanceof CacheGetAllEvent) { - CacheGetAllEvent e = (CacheGetAllEvent) event; - afterGetAll(e.getMillis(), e.getKeys(), e.getResult()); - } else if (event instanceof CacheLoadAllEvent) { - CacheLoadAllEvent e = (CacheLoadAllEvent) event; - afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess()); - } else if (event instanceof CachePutAllEvent) { - CachePutAllEvent e = (CachePutAllEvent) event; - afterPutAll(e.getMillis(), e.getMap(), e.getResult()); - } else if (event instanceof CacheRemoveAllEvent) { - CacheRemoveAllEvent e = (CacheRemoveAllEvent) event; - afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult()); + public void afterOperation(CacheEvent event) { + reentrantLock.lock(); + try { + if (event instanceof CacheGetEvent) { + CacheGetEvent e = (CacheGetEvent) event; + afterGet(e.getMillis(), e.getKey(), e.getResult()); + } else if (event instanceof CachePutEvent) { + CachePutEvent e = (CachePutEvent) event; + afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult()); + } else if (event instanceof CacheRemoveEvent) { + CacheRemoveEvent e = (CacheRemoveEvent) event; + afterRemove(e.getMillis(), e.getKey(), e.getResult()); + } else if (event instanceof CacheLoadEvent) { + CacheLoadEvent e = (CacheLoadEvent) event; + afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess()); + } else if (event instanceof CacheGetAllEvent) { + CacheGetAllEvent e = (CacheGetAllEvent) event; + afterGetAll(e.getMillis(), e.getKeys(), e.getResult()); + } else if (event instanceof CacheLoadAllEvent) { + CacheLoadAllEvent e = (CacheLoadAllEvent) event; + afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess()); + } else if (event instanceof CachePutAllEvent) { + CachePutAllEvent e = (CachePutAllEvent) event; + afterPutAll(e.getMillis(), e.getMap(), e.getResult()); + } else if (event instanceof CacheRemoveAllEvent) { + CacheRemoveAllEvent e = (CacheRemoveAllEvent) event; + afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult()); + } + }finally { + reentrantLock.unlock(); } } diff --git a/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java b/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java index db6f0b7ea..c5b99f83a 100644 --- a/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java +++ b/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java @@ -20,6 +20,8 @@ import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.ReentrantLock; + /** * @author huangli */ @@ -35,6 +37,8 @@ public class LettuceBroadcastManager extends BroadcastManager { private final LettuceConnectionManager lettuceConnectionManager; private final BaseRedisAsyncCommands stringAsyncCommands; + private final ReentrantLock reentrantLock = new ReentrantLock(); + public LettuceBroadcastManager(CacheManager cacheManager, RedisLettuceCacheConfig config) { super(cacheManager); @@ -72,20 +76,25 @@ public CacheResult publish(CacheMessage cacheMessage) { } @Override - public synchronized void startSubscribe() { - if (subscribeThreadStart) { - throw new IllegalStateException("startSubscribe has invoked"); - } - this.pubSubAdapter = new RedisPubSubAdapter() { - @Override - public void message(byte[] channel, byte[] message) { - processNotification(message, config.getValueDecoder()); + public void startSubscribe() { + reentrantLock.lock(); + try { + if (subscribeThreadStart) { + throw new IllegalStateException("startSubscribe has invoked"); } - }; - config.getPubSubConnection().addListener(this.pubSubAdapter); - RedisPubSubAsyncCommands asyncCommands = config.getPubSubConnection().async(); - asyncCommands.subscribe(channel); - this.subscribeThreadStart = true; + this.pubSubAdapter = new RedisPubSubAdapter() { + @Override + public void message(byte[] channel, byte[] message) { + processNotification(message, config.getValueDecoder()); + } + }; + config.getPubSubConnection().addListener(this.pubSubAdapter); + RedisPubSubAsyncCommands asyncCommands = config.getPubSubConnection().async(); + asyncCommands.subscribe(channel); + this.subscribeThreadStart = true; + }finally { + reentrantLock.unlock(); + } } @Override diff --git a/jetcache-support/jetcache-redis-springdata/src/main/java/com/alicp/jetcache/redis/springdata/SpringDataBroadcastManager.java b/jetcache-support/jetcache-redis-springdata/src/main/java/com/alicp/jetcache/redis/springdata/SpringDataBroadcastManager.java index 9040b0d00..2837e2e71 100644 --- a/jetcache-support/jetcache-redis-springdata/src/main/java/com/alicp/jetcache/redis/springdata/SpringDataBroadcastManager.java +++ b/jetcache-support/jetcache-redis-springdata/src/main/java/com/alicp/jetcache/redis/springdata/SpringDataBroadcastManager.java @@ -19,6 +19,7 @@ import org.springframework.data.redis.listener.Topic; import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.ReentrantLock; /** * @author huangli @@ -32,6 +33,8 @@ public class SpringDataBroadcastManager extends BroadcastManager { private final byte[] channel; private volatile RedisMessageListenerContainer listenerContainer; + private final ReentrantLock reentrantLock = new ReentrantLock(); + public SpringDataBroadcastManager(CacheManager cacheManager, RedisSpringDataCacheConfig config) { super(cacheManager); this.config = config; @@ -65,23 +68,28 @@ public CacheResult publish(CacheMessage cacheMessage) { } @Override - public synchronized void startSubscribe() { - if (this.listenerContainer != null) { - throw new IllegalStateException("subscribe thread is started"); - } - Topic topic = new ChannelTopic(config.getBroadcastChannel()); - if (config.getListenerContainer() == null) { - RedisMessageListenerContainer c = new RedisMessageListenerContainer(); - c.setConnectionFactory(config.getConnectionFactory()); - c.afterPropertiesSet(); - c.start(); - this.listenerContainer = c; - logger.info("create RedisMessageListenerContainer instance"); - } else { - this.listenerContainer = config.getListenerContainer(); + public void startSubscribe() { + reentrantLock.lock(); + try { + if (this.listenerContainer != null) { + throw new IllegalStateException("subscribe thread is started"); + } + Topic topic = new ChannelTopic(config.getBroadcastChannel()); + if (config.getListenerContainer() == null) { + RedisMessageListenerContainer c = new RedisMessageListenerContainer(); + c.setConnectionFactory(config.getConnectionFactory()); + c.afterPropertiesSet(); + c.start(); + this.listenerContainer = c; + logger.info("create RedisMessageListenerContainer instance"); + } else { + this.listenerContainer = config.getListenerContainer(); + } + this.listenerContainer.addMessageListener(listener, topic); + logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel()); + }finally { + reentrantLock.unlock(); } - this.listenerContainer.addMessageListener(listener, topic); - logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel()); } private void onMessage(Message message, byte[] pattern) { @@ -89,13 +97,18 @@ private void onMessage(Message message, byte[] pattern) { } @Override - public synchronized void close() throws Exception { - if (this.listenerContainer != null) { - this.listenerContainer.removeMessageListener(listener); - if (this.config.getListenerContainer() == null) { - this.listenerContainer.destroy(); + public void close() throws Exception { + reentrantLock.lock(); + try { + if (this.listenerContainer != null) { + this.listenerContainer.removeMessageListener(listener); + if (this.config.getListenerContainer() == null) { + this.listenerContainer.destroy(); + } } + this.listenerContainer = null; + }finally { + reentrantLock.unlock(); } - this.listenerContainer = null; } } diff --git a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java index b6c09134f..3f41b9e2e 100644 --- a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java +++ b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java @@ -13,6 +13,7 @@ import redis.clients.jedis.UnifiedJedis; import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.ReentrantLock; /** * Created on 2022-05-03 @@ -32,6 +33,8 @@ public class RedisBroadcastManager extends BroadcastManager { private volatile boolean subscribe; private boolean subscribeThreadStart; + private final ReentrantLock reentrantLock = new ReentrantLock(); + public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig config) { super(cacheManager); this.channelStr = config.getBroadcastChannel(); @@ -48,16 +51,21 @@ public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig config) { super(cacheManager); checkConfig(config); @@ -33,24 +36,34 @@ public RedissonBroadcastManager(final CacheManager cacheManager, final RedissonC } @Override - public synchronized void startSubscribe() { - if (this.subscribeId == 0 && Objects.nonNull(this.channel) && !this.channel.isEmpty()) { - this.subscribeId = this.client.getTopic(this.channel) - .addListener(byte[].class, (channel, msg) -> processNotification(msg, this.config.getValueDecoder())); + public void startSubscribe() { + reentrantLock.lock(); + try { + if (this.subscribeId == 0 && Objects.nonNull(this.channel) && !this.channel.isEmpty()) { + this.subscribeId = this.client.getTopic(this.channel) + .addListener(byte[].class, (channel, msg) -> processNotification(msg, this.config.getValueDecoder())); + } + }finally { + reentrantLock.unlock(); } } @Override - public synchronized void close() { - final int id; - if ((id = this.subscribeId) > 0 && Objects.nonNull(this.channel)) { - this.subscribeId = 0; - try { - this.client.getTopic(this.channel).removeListener(id); - } catch (Throwable e) { - logger.warn("unsubscribe {} fail", this.channel, e); + public void close() { + reentrantLock.lock(); + try { + final int id; + if ((id = this.subscribeId) > 0 && Objects.nonNull(this.channel)) { + this.subscribeId = 0; + try { + this.client.getTopic(this.channel).removeListener(id); + } catch (Throwable e) { + logger.warn("unsubscribe {} fail", this.channel, e); + } } + }finally { + reentrantLock.unlock(); } }