diff --git a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/config/CacheAnnotationParser.java b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/config/CacheAnnotationParser.java index db356e383..6ba6149d3 100644 --- a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/config/CacheAnnotationParser.java +++ b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/config/CacheAnnotationParser.java @@ -19,19 +19,27 @@ import org.w3c.dom.Element; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; /** * @author huangli */ public class CacheAnnotationParser implements BeanDefinitionParser { + private final ReentrantLock reentrantLock = new ReentrantLock(); + @Override public BeanDefinition parse(Element element, ParserContext parserContext) { - doParse(element, parserContext); + reentrantLock.lock(); + try { + doParse(element, parserContext); + }finally { + reentrantLock.unlock(); + } return null; } - private synchronized void doParse(Element element, ParserContext parserContext) { + private void doParse(Element element, ParserContext parserContext) { String[] basePackages = StringUtils.tokenizeToStringArray(element.getAttribute("base-package"), ",; \t\n"); AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element); if (!parserContext.getRegistry().containsBeanDefinition(CacheAdvisor.CACHE_ADVISOR_BEAN_NAME)) { diff --git a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/field/CreateCacheAnnotationBeanPostProcessor.java b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/field/CreateCacheAnnotationBeanPostProcessor.java index 169faf7ae..92640ec87 100644 --- a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/field/CreateCacheAnnotationBeanPostProcessor.java +++ b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/field/CreateCacheAnnotationBeanPostProcessor.java @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; /** * Created on 2016/12/9. @@ -38,6 +39,7 @@ public class CreateCacheAnnotationBeanPostProcessor extends AutowiredAnnotationB private ConfigurableListableBeanFactory beanFactory; private final Map injectionMetadataCache = new ConcurrentHashMap(); + private final ReentrantLock reentrantLock = new ReentrantLock(); @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { @@ -77,7 +79,8 @@ private InjectionMetadata findAutowiringMetadata(String beanName, Class clazz // Quick check on the concurrent map first, with minimal locking. InjectionMetadata metadata = this.injectionMetadataCache.get(cacheKey); if (InjectionMetadata.needsRefresh(metadata, clazz)) { - synchronized (this.injectionMetadataCache) { + reentrantLock.lock(); + try{ metadata = this.injectionMetadataCache.get(cacheKey); if (InjectionMetadata.needsRefresh(metadata, clazz)) { if (metadata != null) { @@ -91,6 +94,8 @@ private InjectionMetadata findAutowiringMetadata(String beanName, Class clazz "] for autowiring metadata: could not find class that it depends on", err); } } + }finally { + reentrantLock.unlock(); } } return metadata; diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java b/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java index 800ea231b..a24c28978 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; @@ -36,13 +37,17 @@ public abstract class AbstractCache implements Cache { private volatile ConcurrentHashMap loaderMap; protected volatile boolean closed; + private static final ReentrantLock reentrantLock = new ReentrantLock(); ConcurrentHashMap initOrGetLoaderMap() { if (loaderMap == null) { - synchronized (this) { + reentrantLock.lock(); + try { if (loaderMap == null) { loaderMap = new ConcurrentHashMap<>(); } + }finally { + reentrantLock.unlock(); } } return loaderMap; diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/embedded/AbstractEmbeddedCache.java b/jetcache-core/src/main/java/com/alicp/jetcache/embedded/AbstractEmbeddedCache.java index 5133fa6ca..1bdfd31da 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/embedded/AbstractEmbeddedCache.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/embedded/AbstractEmbeddedCache.java @@ -3,19 +3,14 @@ */ package com.alicp.jetcache.embedded; -import com.alicp.jetcache.AbstractCache; -import com.alicp.jetcache.CacheConfig; -import com.alicp.jetcache.CacheGetResult; -import com.alicp.jetcache.CacheResult; -import com.alicp.jetcache.CacheResultCode; -import com.alicp.jetcache.CacheValueHolder; -import com.alicp.jetcache.MultiGetResult; +import com.alicp.jetcache.*; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -28,6 +23,8 @@ public abstract class AbstractEmbeddedCache extends AbstractCache { protected abstract InnerMap createAreaCache(); + private final ReentrantLock lock = new ReentrantLock(); + public AbstractEmbeddedCache(EmbeddedCacheConfig config) { this.config = config; innerMap = createAreaCache(); @@ -61,7 +58,8 @@ protected CacheGetResult parseHolderResult(CacheValueHolder holder) { } else if (now >= holder.getExpireTime()) { return CacheGetResult.EXPIRED_WITHOUT_MSG; } else { - synchronized (holder) { + lock.lock(); + try{ long accessTime = holder.getAccessTime(); if (config.isExpireAfterAccess()) { long expireAfterAccess = config.getExpireAfterAccessInMillis(); @@ -70,6 +68,8 @@ protected CacheGetResult parseHolderResult(CacheValueHolder holder) { } } holder.setAccessTime(now); + }finally { + lock.unlock(); } return new CacheGetResult(CacheResultCode.SUCCESS, null, holder); diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/embedded/Cleaner.java b/jetcache-core/src/main/java/com/alicp/jetcache/embedded/Cleaner.java index f9d4df125..5f426ded4 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/embedded/Cleaner.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/embedded/Cleaner.java @@ -7,6 +7,7 @@ import java.util.LinkedList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * Created on 2017/2/28. @@ -16,6 +17,7 @@ class Cleaner { static LinkedList> linkedHashMapCaches = new LinkedList<>(); + private static final ReentrantLock reentrantLock = new ReentrantLock(); static { ScheduledExecutorService executorService = JetCacheExecutor.defaultExecutor(); @@ -23,13 +25,17 @@ class Cleaner { } static void add(LinkedHashMapCache cache) { - synchronized (linkedHashMapCaches) { + reentrantLock.lock(); + try{ linkedHashMapCaches.add(new WeakReference<>(cache)); + }finally { + reentrantLock.unlock(); } } static void run() { - synchronized (linkedHashMapCaches) { + reentrantLock.lock(); + try{ Iterator> it = linkedHashMapCaches.iterator(); while (it.hasNext()) { WeakReference ref = it.next(); @@ -40,6 +46,8 @@ static void run() { c.cleanExpiredEntry(); } } + }finally { + reentrantLock.unlock(); } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/embedded/LinkedHashMapCache.java b/jetcache-core/src/main/java/com/alicp/jetcache/embedded/LinkedHashMapCache.java index 6c654637e..e5de6473e 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/embedded/LinkedHashMapCache.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/embedded/LinkedHashMapCache.java @@ -9,6 +9,8 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author huangli @@ -28,7 +30,7 @@ protected void addToCleaner() { @Override protected InnerMap createAreaCache() { - return new LRUMap(config.getLimit(), this); + return new LRUMap(config.getLimit()); } @Override @@ -46,12 +48,13 @@ public void cleanExpiredEntry() { final class LRUMap extends LinkedHashMap implements InnerMap { private final int max; - private Object lock; +// private final Object lockObj; + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - public LRUMap(int max, Object lock) { + public LRUMap(int max) { super((int) (max * 1.4f), 0.75f, true); this.max = max; - this.lock = lock; +// this.lockObj = lockObj; } @Override @@ -60,7 +63,9 @@ protected boolean removeEldestEntry(Map.Entry eldest) { } void cleanExpiredEntry() { - synchronized (lock) { + Lock lock = readWriteLock.writeLock(); + lock.lock(); + try{ for (Iterator it = entrySet().iterator(); it.hasNext();) { Map.Entry en = (Map.Entry) it.next(); Object value = en.getValue(); @@ -78,67 +83,95 @@ void cleanExpiredEntry() { } } } + }finally { + lock.unlock(); } } @Override public Object getValue(Object key) { - synchronized (lock) { + Lock lock = readWriteLock.readLock(); + lock.lock(); + try{ return get(key); + }finally { + lock.unlock(); } } @Override public Map getAllValues(Collection keys) { + Lock lock = readWriteLock.readLock(); + lock.lock(); Map values = new HashMap(); - synchronized (lock) { + try{ for (Object key : keys) { Object v = get(key); if (v != null) { values.put(key, v); } } + }finally { + lock.unlock(); } return values; } @Override public void putValue(Object key, Object value) { - synchronized (lock) { + Lock lock = readWriteLock.writeLock(); + lock.lock(); + try{ put(key, value); + }finally { + lock.unlock(); } } @Override public void putAllValues(Map map) { - synchronized (lock) { + Lock lock = readWriteLock.writeLock(); + lock.lock(); + try{ Set set = map.entrySet(); for (Map.Entry en : set) { put(en.getKey(), en.getValue()); } + }finally { + lock.unlock(); } } @Override public boolean removeValue(Object key) { - synchronized (lock) { + Lock lock = readWriteLock.writeLock(); + lock.lock(); + try{ return remove(key) != null; + }finally { + lock.unlock(); } } @Override public void removeAllValues(Collection keys) { - synchronized (lock) { + Lock lock = readWriteLock.writeLock(); + lock.lock(); + try{ for (Object k : keys) { remove(k); } + }finally { + lock.unlock(); } } @Override @SuppressWarnings("unchecked") public boolean putIfAbsentValue(Object key, Object value) { - synchronized (lock) { + Lock lock = readWriteLock.writeLock(); + lock.lock(); + try{ CacheValueHolder h = (CacheValueHolder) get(key); if (h == null || parseHolderResult(h).getResultCode() == CacheResultCode.EXPIRED) { put(key, value); @@ -146,6 +179,8 @@ public boolean putIfAbsentValue(Object key, Object value) { } else { return false; } + }finally { + lock.unlock(); } } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java index dea14e948..5d492a3a4 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/DecoderMap.java @@ -6,6 +6,7 @@ import com.alicp.jetcache.anno.SerialPolicy; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; /** * @author huangli @@ -14,6 +15,7 @@ public class DecoderMap { private final ConcurrentHashMap decoderMap = new ConcurrentHashMap<>(); private volatile boolean inited = false; + private final ReentrantLock reentrantLock = new ReentrantLock(); private static final DecoderMap instance = new DecoderMap(); @@ -42,7 +44,8 @@ public void initDefaultDecoder() { if (inited) { return; } - synchronized (this) { + reentrantLock.lock(); + try { if (inited) { return; } @@ -51,6 +54,8 @@ public void initDefaultDecoder() { register(SerialPolicy.IDENTITY_NUMBER_KRYO5, Kryo5ValueDecoder.INSTANCE); // register(SerialPolicy.IDENTITY_NUMBER_FASTJSON2, Fastjson2ValueDecoder.INSTANCE); inited = true; + }finally { + reentrantLock.unlock(); } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java index 1187ba84f..d9e41d56e 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java @@ -5,6 +5,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; /** * Created on 2017/5/3. @@ -14,6 +15,7 @@ public class JetCacheExecutor { protected volatile static ScheduledExecutorService defaultExecutor; protected volatile static ScheduledExecutorService heavyIOExecutor; + private static final ReentrantLock reentrantLock = new ReentrantLock(); private static AtomicInteger threadCount = new AtomicInteger(0); @@ -35,7 +37,8 @@ public static ScheduledExecutorService defaultExecutor() { if (defaultExecutor != null) { return defaultExecutor; } - synchronized (JetCacheExecutor.class) { + reentrantLock.lock(); + try{ if (defaultExecutor == null) { ThreadFactory tf = r -> { Thread t = new Thread(r, "JetCacheDefaultExecutor"); @@ -45,6 +48,8 @@ public static ScheduledExecutorService defaultExecutor() { defaultExecutor = new ScheduledThreadPoolExecutor( 1, tf, new ThreadPoolExecutor.DiscardPolicy()); } + }finally { + reentrantLock.unlock(); } return defaultExecutor; } @@ -53,7 +58,8 @@ public static ScheduledExecutorService heavyIOExecutor() { if (heavyIOExecutor != null) { return heavyIOExecutor; } - synchronized (JetCacheExecutor.class) { + reentrantLock.lock(); + try { if (heavyIOExecutor == null) { ThreadFactory tf = r -> { Thread t = new Thread(r, "JetCacheHeavyIOExecutor" + threadCount.getAndIncrement()); @@ -63,6 +69,8 @@ public static ScheduledExecutorService heavyIOExecutor() { heavyIOExecutor = new ScheduledThreadPoolExecutor( 10, tf, new ThreadPoolExecutor.DiscardPolicy()); } + }finally { + reentrantLock.unlock(); } return heavyIOExecutor; } diff --git a/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/AbstractCacheAutoInit.java b/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/AbstractCacheAutoInit.java index a33134866..a3d2c9b31 100644 --- a/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/AbstractCacheAutoInit.java +++ b/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/AbstractCacheAutoInit.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; /** * Created on 2016/11/29. @@ -31,6 +32,8 @@ public abstract class AbstractCacheAutoInit implements InitializingBean { @Autowired protected AutoConfigureBeans autoConfigureBeans; + private final ReentrantLock reentrantLock = new ReentrantLock(); + protected String[] typeNames; private volatile boolean inited = false; @@ -44,12 +47,15 @@ public AbstractCacheAutoInit(String... cacheTypes) { @Override public void afterPropertiesSet() { if (!inited) { - synchronized (this) { + reentrantLock.lock(); + try{ if (!inited) { process("jetcache.local.", autoConfigureBeans.getLocalCacheBuilders(), true); process("jetcache.remote.", autoConfigureBeans.getRemoteCacheBuilders(), false); inited = true; } + }finally { + reentrantLock.unlock(); } } } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java index 63340a6ab..d19cf9e8c 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java @@ -8,7 +8,13 @@ import com.alicp.jetcache.CacheResultCode; import org.junit.Assert; import org.junit.Test; +import org.springframework.cglib.core.ReflectUtils; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -27,6 +33,44 @@ public void test() throws Exception { super.test(100, true); } + @Test + public void mutilTest() throws InterruptedException{ + cache = EmbeddedCacheBuilder.createEmbeddedCacheBuilder() + .buildFunc(getBuildFunc()).expireAfterWrite(5000, TimeUnit.MILLISECONDS).limit(10240).buildCache(); + ExecutorService executorService = null; + try { + Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newVirtualThreadPerTaskExecutor", new Class[]{}); + if (method != null) { + System.out.println("use newVirtualThreadPerTaskExecutor"); + executorService = (ExecutorService) method.invoke(null); + } else { + System.out.println("use newFixedThreadPool"); + executorService = Executors.newFixedThreadPool(3); + } + }catch (Exception e){ + executorService = Executors.newFixedThreadPool(3); + } + executorService.awaitTermination(10,TimeUnit.SECONDS); + executorService.submit(() -> { + for (int i = 0; i < 20480; i+=2) { + cache.putIfAbsent("K" + i, "V" + i); + } + }); + executorService.submit(() -> { + for (int i = 1; i < 20480; i+=2) { + cache.remove("K" + i); + } + }); + executorService.submit(() -> { + for (int i = 0; i < 20480; i++) { + Object value = cache.get("K" + i); + if(!Objects.isNull(value)) + Assert.assertEquals(("V"+i),value); + } + }); + executorService.shutdown(); + } + @Test public void cleanTest() throws Exception { cache = EmbeddedCacheBuilder.createEmbeddedCacheBuilder() @@ -38,4 +82,5 @@ public void cleanTest() throws Exception { Assert.assertEquals(CacheResultCode.NOT_EXISTS, cache.GET("K1").getResultCode()); } + }