From d9de023863b0b3ab147da263565920e8d7c6b495 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 22 Mar 2024 16:22:06 +0800 Subject: [PATCH 1/6] feat: update spring, support jedis cluster pipeline --- .../jetcache-starter-redisson/pom.xml | 2 +- .../com/alicp/jetcache/redis/RedisCache.java | 49 +++++----- jetcache-test/pom.xml | 5 +- pom.xml | 89 ++++++++++++++----- 4 files changed, 89 insertions(+), 56 deletions(-) diff --git a/jetcache-starter/jetcache-starter-redisson/pom.xml b/jetcache-starter/jetcache-starter-redisson/pom.xml index c7bd6a0f..f616f91b 100644 --- a/jetcache-starter/jetcache-starter-redisson/pom.xml +++ b/jetcache-starter/jetcache-starter-redisson/pom.xml @@ -33,4 +33,4 @@ ${redisson.version} - \ No newline at end of file + diff --git a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java index 6bfd37b2..4146288f 100644 --- a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java +++ b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java @@ -16,6 +16,7 @@ import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPooled; import redis.clients.jedis.Pipeline; +import redis.clients.jedis.PipelineBase; import redis.clients.jedis.Response; import redis.clients.jedis.UnifiedJedis; import redis.clients.jedis.commands.KeyBinaryCommands; @@ -143,7 +144,7 @@ static int randomIndex(int[] weights) { int x = 0; for (int i = 0; i < weights.length; i++) { x += weights[i]; - if(r < x){ + if (r < x) { return i; } } @@ -276,36 +277,28 @@ protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit tim @Override protected CacheResult do_PUT_ALL(Map map, long expireAfterWrite, TimeUnit timeUnit) { StringBinaryCommands commands = null; - Connection connection = null; + PipelineBase pipeline = null; try { commands = (StringBinaryCommands) writeCommands(); int failCount = 0; - if(commands instanceof Jedis || commands instanceof JedisPooled) { - List> responses = new ArrayList<>(); - Pipeline pipeline = null; - if(commands instanceof JedisPooled) { - connection = ((JedisPooled) commands).getPool().getResource(); - pipeline = new Pipeline(connection); - } else { - pipeline = new Pipeline((Jedis) commands); - } - for (Map.Entry en : map.entrySet()) { - CacheValueHolder holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite)); - Response resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder)); - responses.add(resp); - } - pipeline.sync(); - for (Response resp : responses) { - if (!"OK".equals(resp.get())) { - failCount++; - } - } + List> responses = new ArrayList<>(); + if (commands instanceof JedisPooled) { + Connection connection = ((JedisPooled) commands).getPool().getResource(); + pipeline = new Pipeline(connection); + } else if (commands instanceof JedisCluster) { + pipeline = ((JedisCluster) commands).pipelined(); } else { - for (Map.Entry en : map.entrySet()) { - CacheResult r = do_PUT(en.getKey(), en.getValue(), expireAfterWrite, timeUnit); - if (!r.isSuccess()) { - failCount++; - } + pipeline = new Pipeline((Jedis) commands); + } + for (Map.Entry en : map.entrySet()) { + CacheValueHolder holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite)); + Response resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder)); + responses.add(resp); + } + pipeline.sync(); + for (Response resp : responses) { + if (!"OK".equals(resp.get())) { + failCount++; } } return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG : @@ -315,7 +308,7 @@ protected CacheResult do_PUT_ALL(Map map, long expireA return new CacheResult(ex); } finally { closeJedis(commands); - close(connection); + close(pipeline); } } diff --git a/jetcache-test/pom.xml b/jetcache-test/pom.xml index fc3b84ec..c5316e0f 100644 --- a/jetcache-test/pom.xml +++ b/jetcache-test/pom.xml @@ -103,7 +103,6 @@ net.bytebuddy byte-buddy - 1.12.13 org.springframework @@ -134,12 +133,10 @@ com.fasterxml.jackson.core jackson-core - ${jackson.version} com.fasterxml.jackson.core jackson-annotations - ${jackson.version} org.mvel @@ -179,4 +176,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index b36eba86..bf344fd7 100644 --- a/pom.xml +++ b/pom.xml @@ -168,6 +168,40 @@ + + org.apache.maven.plugins + maven-enforcer-plugin + 3.4.1 + + + + dependency-convergence + validate + + enforce + + + + + + + + + + ban-duplicate + validate + + enforce + + + + + + + + + @@ -200,19 +234,26 @@ 6.1.10.RELEASE --> + + + 6.1.0 + 3.2.0 + 6.3.0.RELEASE + + 5.0.2 + 3.25.0 - 2.15.2 + 2.15.3 5.9.3 2.0.9 1.4.11 @@ -244,30 +285,37 @@ - org.springframework - spring-core - ${spring.framework.version} + com.fasterxml.jackson + jackson-bom + ${jackson.version} + pom + import + - org.springframework - spring-context - ${spring.framework.version} + org.springframework.boot + spring-boot-dependencies + ${spring.boot.version} + pom + import + - org.springframework - spring-test - ${spring.framework.version} + redis.clients + jedis + ${jedis.version} - org.springframework.boot - spring-boot-autoconfigure - ${spring.boot.version} + org.objenesis + objenesis + 3.3 + - org.springframework.boot - spring-boot-starter - ${spring.boot.version} + net.bytebuddy + byte-buddy + 1.12.18 @@ -280,11 +328,6 @@ fastjson2 2.0.45 - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - org.mvel mvel2 From 3ce46169256b0b8855ca59e1cf9aa7af19f0af5b Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 22 Mar 2024 16:24:13 +0800 Subject: [PATCH 2/6] chore: reduce change --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index bf344fd7..31f6343c 100644 --- a/pom.xml +++ b/pom.xml @@ -253,7 +253,7 @@ 2.2.5.RELEASE --> - 2.15.3 + 2.15.2 5.9.3 2.0.9 1.4.11 From 62423d2c50c3f4773ff52e26778086c468960f4b Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 22 Mar 2024 16:31:26 +0800 Subject: [PATCH 3/6] fix: type check issue --- .../java/com/alicp/jetcache/redis/RedisCache.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java index 4146288f..0cdb5e18 100644 --- a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java +++ b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java @@ -11,11 +11,9 @@ import com.alicp.jetcache.external.AbstractExternalCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.Connection; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPooled; -import redis.clients.jedis.Pipeline; import redis.clients.jedis.PipelineBase; import redis.clients.jedis.Response; import redis.clients.jedis.UnifiedJedis; @@ -282,13 +280,14 @@ protected CacheResult do_PUT_ALL(Map map, long expireA commands = (StringBinaryCommands) writeCommands(); int failCount = 0; List> responses = new ArrayList<>(); - if (commands instanceof JedisPooled) { - Connection connection = ((JedisPooled) commands).getPool().getResource(); - pipeline = new Pipeline(connection); + if (commands instanceof Jedis) { + pipeline = ((Jedis) commands).pipelined(); + } else if (commands instanceof JedisPooled) { + pipeline = ((JedisPooled) commands).pipelined(); } else if (commands instanceof JedisCluster) { pipeline = ((JedisCluster) commands).pipelined(); } else { - pipeline = new Pipeline((Jedis) commands); + throw new IllegalArgumentException("unrecognized redis client type"); } for (Map.Entry en : map.entrySet()) { CacheValueHolder holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite)); From 418a70d3a6acef5d7fde866c07c651e031bddedd Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 22 Mar 2024 16:50:36 +0800 Subject: [PATCH 4/6] fix: compatibility kryo4 --- jetcache-starter/jetcache-autoconfigure/pom.xml | 8 +++++++- .../jetcache/autoconfigure/RedissonAutoConfiguration.java | 7 +++++++ .../com/alicp/jetcache/autoconfigure/MockStarterTest.java | 3 ++- .../jetcache/autoconfigure/RedisLettuceStarterTest.java | 6 +++--- .../alicp/jetcache/autoconfigure/RedisStarterTest.java | 3 ++- .../autoconfigure/SpringDataRedisStarterTest.java | 3 ++- .../jetcache/redisson/RedissonBroadcastManagerTest.java | 3 +++ .../com/alicp/jetcache/redisson/RedissonCacheTest.java | 2 ++ .../java/com/alicp/jetcache/support/SyncLocalTest.java | 2 ++ 9 files changed, 30 insertions(+), 7 deletions(-) diff --git a/jetcache-starter/jetcache-autoconfigure/pom.xml b/jetcache-starter/jetcache-autoconfigure/pom.xml index 0f65532d..0f8cb5cf 100644 --- a/jetcache-starter/jetcache-autoconfigure/pom.xml +++ b/jetcache-starter/jetcache-autoconfigure/pom.xml @@ -41,10 +41,16 @@ true + + org.redisson + redisson-spring-boot-starter + ${redisson.version} + + org.springframework.boot spring-boot-starter - \ No newline at end of file + diff --git a/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedissonAutoConfiguration.java b/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedissonAutoConfiguration.java index e89db50c..609aefb0 100644 --- a/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedissonAutoConfiguration.java +++ b/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedissonAutoConfiguration.java @@ -5,6 +5,8 @@ import com.alicp.jetcache.external.ExternalCacheBuilder; import com.alicp.jetcache.redisson.RedissonCacheBuilder; import org.redisson.api.RedissonClient; +import org.redisson.codec.KryoCodec; +import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -70,4 +72,9 @@ public void setApplicationContext(final ApplicationContext context) throws Beans this.context = context; } } + + @Bean("redissonCodecCustomizer") + public RedissonAutoConfigurationCustomizer redissonCodecCustomizer() { + return config -> config.setCodec(new KryoCodec()); + } } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/MockStarterTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/MockStarterTest.java index 59bf83ec..456f11b0 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/MockStarterTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/MockStarterTest.java @@ -11,6 +11,7 @@ import com.alicp.jetcache.test.spring.SpringTest; import org.junit.Assert; import org.junit.Test; +import org.redisson.spring.starter.RedissonAutoConfigurationV2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Bean; @@ -27,7 +28,7 @@ * @author huangli */ @Configuration -@EnableAutoConfiguration +@EnableAutoConfiguration(exclude = {RedissonAutoConfigurationV2.class}) @ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableCreateCacheAnnotation diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisLettuceStarterTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisLettuceStarterTest.java index b8c8e9fa..a95c67b6 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisLettuceStarterTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisLettuceStarterTest.java @@ -4,9 +4,8 @@ import com.alicp.jetcache.anno.CreateCache; import com.alicp.jetcache.anno.config.EnableCreateCacheAnnotation; import com.alicp.jetcache.anno.config.EnableMethodCache; -import com.alicp.jetcache.redis.lettuce.RedisLettuceCacheTest; import com.alicp.jetcache.redis.lettuce.RedisLettuceCacheConfig; -import com.alicp.jetcache.support.FastjsonKeyConvertor; +import com.alicp.jetcache.redis.lettuce.RedisLettuceCacheTest; import com.alicp.jetcache.test.beans.MyFactoryBean; import com.alicp.jetcache.test.spring.SpringTest; import io.lettuce.core.RedisClient; @@ -20,6 +19,7 @@ import io.lettuce.core.cluster.api.sync.RedisClusterCommands; import org.junit.Assert; import org.junit.Test; +import org.redisson.spring.starter.RedissonAutoConfigurationV2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -37,7 +37,7 @@ * @author huangli */ @Configuration -@EnableAutoConfiguration +@EnableAutoConfiguration(exclude = {RedissonAutoConfigurationV2.class}) @ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableCreateCacheAnnotation diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisStarterTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisStarterTest.java index 455965b2..71c898f3 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisStarterTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/RedisStarterTest.java @@ -12,6 +12,7 @@ import com.alicp.jetcache.test.spring.SpringTest; import org.junit.Assert; import org.junit.Test; +import org.redisson.spring.starter.RedissonAutoConfigurationV2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -35,7 +36,7 @@ * @author huangli */ @Configuration -@EnableAutoConfiguration +@EnableAutoConfiguration(exclude = RedissonAutoConfigurationV2.class) @ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableCreateCacheAnnotation diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/SpringDataRedisStarterTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/SpringDataRedisStarterTest.java index 2a0ab54c..6608caa4 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/SpringDataRedisStarterTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/autoconfigure/SpringDataRedisStarterTest.java @@ -8,6 +8,7 @@ import com.alicp.jetcache.test.spring.SpringTest; import org.junit.Assert; import org.junit.Test; +import org.redisson.spring.starter.RedissonAutoConfigurationV2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Bean; @@ -26,7 +27,7 @@ * @author huangli */ @Configuration -@EnableAutoConfiguration +@EnableAutoConfiguration(exclude = RedissonAutoConfigurationV2.class) @ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"}) @EnableCreateCacheAnnotation diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonBroadcastManagerTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonBroadcastManagerTest.java index e4691050..25b8c453 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonBroadcastManagerTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonBroadcastManagerTest.java @@ -11,6 +11,7 @@ import org.junit.Test; import org.redisson.Redisson; import org.redisson.api.RedissonClient; +import org.redisson.codec.KryoCodec; import org.redisson.config.Config; import java.util.Random; @@ -27,6 +28,8 @@ public class RedissonBroadcastManagerTest extends AbstractBroadcastManagerTest { public void initRedissonClient() { final Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); + // 兼容 kryo4 + config.setCodec(new KryoCodec()); this.client = Redisson.create(config); } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonCacheTest.java index 3692c7d7..b7caf321 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/redisson/RedissonCacheTest.java @@ -11,6 +11,7 @@ import org.junit.Test; import org.redisson.Redisson; import org.redisson.api.RedissonClient; +import org.redisson.codec.KryoCodec; import org.redisson.config.Config; import java.util.Random; @@ -27,6 +28,7 @@ public class RedissonCacheTest extends AbstractExternalCacheTest { public void redissonTest() throws Exception { final Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); + config.setCodec(new KryoCodec()); doTest(Redisson.create(config)); } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/SyncLocalTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/SyncLocalTest.java index e8ed82a7..18e33af2 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/SyncLocalTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/SyncLocalTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.Test; import org.redisson.Redisson; import org.redisson.api.RedissonClient; +import org.redisson.codec.KryoCodec; import org.redisson.config.Config; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.UnifiedJedis; @@ -78,6 +79,7 @@ public void testLettuce() throws Exception { public void testRedisson() throws Exception { final String keyPrefix = getClass().getSimpleName() + "testRedisson"; final Config config = new Config(); + config.setCodec(new KryoCodec()); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); RedissonClient client1 = Redisson.create(config); RedissonClient client2 = Redisson.create(config); From 0056c4a8d2a77b528ce63039176c04deb0f8973f Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 22 Mar 2024 19:32:08 +0800 Subject: [PATCH 5/6] fix: flaky unit test --- .../jetcache/test/AbstractCacheTest.java | 73 +++++++++++++++---- 1 file changed, 57 insertions(+), 16 deletions(-) diff --git a/jetcache-test/src/main/java/com/alicp/jetcache/test/AbstractCacheTest.java b/jetcache-test/src/main/java/com/alicp/jetcache/test/AbstractCacheTest.java index da362251..98a576c1 100644 --- a/jetcache-test/src/main/java/com/alicp/jetcache/test/AbstractCacheTest.java +++ b/jetcache-test/src/main/java/com/alicp/jetcache/test/AbstractCacheTest.java @@ -13,7 +13,6 @@ import com.alicp.jetcache.support.DefaultCacheMonitor; import com.alicp.jetcache.support.StatInfo; import com.alicp.jetcache.support.StatInfoLogger; -import com.alicp.jetcache.test.anno.TestUtil; import com.alicp.jetcache.test.support.DynamicQuery; import org.junit.Assert; import org.slf4j.Logger; @@ -34,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import static com.alicp.jetcache.test.support.Tick.tick; @@ -920,42 +920,83 @@ private static void penetrationProtectReEntryTest(Cache cache) { private static void penetrationProtectTimeoutTest(Cache cache) throws Exception { String keyPrefix = "penetrationProtectTimeoutTest_"; - AtomicInteger loadSuccess = new AtomicInteger(0); - Function loader = k -> { + final Duration SHORT_PROTECT_DURATION = Duration.ofMillis(1); + final Duration LONG_PROTECT_DURATION = Duration.ofSeconds(60); + cache.config().setPenetrationProtectTimeout(SHORT_PROTECT_DURATION); + final AtomicInteger loadSuccess = new AtomicInteger(0); + final CountDownLatch firstBarrier = new CountDownLatch(1); + final Function normalLoader = k -> { + loadSuccess.incrementAndGet(); + return k + "_V"; + }; + // 1. 第一个 Loader 需要等待 firstBarrier 释放, 并记录线程等待的时间 + AtomicReference duration = new AtomicReference<>(); + Function firstBarrierLoader = k -> { + long start = System.nanoTime(); try { - Thread.sleep(tick(75)); + firstBarrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } - loadSuccess.incrementAndGet(); - return k + "_V"; + duration.set(Duration.ofNanos(System.nanoTime() - start)); + return normalLoader.apply(k); }; - cache.config().setPenetrationProtectTimeout(Duration.ofMillis(1)); - Runnable runnable = () -> cache.computeIfAbsent(keyPrefix + 1, loader); - Thread t1 = new Thread(runnable); - Thread t2 = new Thread(runnable); + Thread t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, firstBarrierLoader)); + Thread t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, normalLoader)); + // 先启动线程 1,验证 barrierLoader 阻塞线程 1 t1.start(); + Thread.sleep(tick(25)); + Assert.assertEquals(0, loadSuccess.intValue()); + // 启动线程 2 再延迟一段时间,验证线程 2 过了保护时间后可以成功加载 t2.start(); + Thread.sleep(tick(25)); + Assert.assertEquals(1, loadSuccess.intValue()); + // 释放线程 1,验证两个都加载好了, 并且线程 1 等待时间超过保护时间 + firstBarrier.countDown(); t1.join(); t2.join(); Assert.assertEquals(2, loadSuccess.intValue()); + Assert.assertTrue(SHORT_PROTECT_DURATION.compareTo(duration.get()) < 0); - cache.config().setPenetrationProtectTimeout(Duration.ofSeconds(60)); + // 设置长保护时间, 重置 loadSuccess 和 duration + cache.config().setPenetrationProtectTimeout(LONG_PROTECT_DURATION); + CountDownLatch secondBarrier = new CountDownLatch(1); loadSuccess.set(0); - runnable = () -> cache.computeIfAbsent(keyPrefix + 2, loader); - t1 = new Thread(runnable); - t2 = new Thread(runnable); - Thread t3 = new Thread(runnable); + duration.set(Duration.ZERO); + Function secondBarrierLoader = k -> { + long start = System.nanoTime(); + try { + secondBarrier.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + duration.set(Duration.ofNanos(System.nanoTime() - start)); + return normalLoader.apply(k); + }; + // 设置线程 1,3 为长时间等待,线程 2 为正常加载 + t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader)); + t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, normalLoader)); + Thread t3 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader)); + // 延迟启动 1、2、3 t1.start(); + Thread.sleep(tick(25)); t2.start(); Thread.sleep(tick(25)); t3.start(); Thread.sleep(tick(25)); - t3.interrupt(); + // 验证延迟一段之间后,仍然是 0,即使有普通加载的 t2 线程 + Assert.assertEquals(0, loadSuccess.intValue()); + // 先中断线程 2,然后延迟一段时间释放 barrier,验证线程 1、3 都加载成功 + t2.interrupt(); + Thread.sleep(tick(25)); + secondBarrier.countDown(); t1.join(); t2.join(); t3.join(); + // 验证只有两个线程加载成功,线程 2 加载失败 Assert.assertEquals(2, loadSuccess.intValue()); + // 验证时间没超过保护时间 + Assert.assertTrue(LONG_PROTECT_DURATION.compareTo(duration.get()) > 0); } } From b84aefb8032b35e5ccf53f9bc6abf270e0584331 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 26 Mar 2024 18:20:20 +0800 Subject: [PATCH 6/6] add todo task --- .../src/main/java/com/alicp/jetcache/redis/RedisCache.java | 1 + 1 file changed, 1 insertion(+) diff --git a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java index 0cdb5e18..5acab434 100644 --- a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java +++ b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java @@ -281,6 +281,7 @@ protected CacheResult do_PUT_ALL(Map map, long expireA int failCount = 0; List> responses = new ArrayList<>(); if (commands instanceof Jedis) { + // FIXME 这里的 Connection 能关吗? pipeline = ((Jedis) commands).pipelined(); } else if (commands instanceof JedisPooled) { pipeline = ((JedisPooled) commands).pipelined();