Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update spring, support jedis cluster pipeline #869

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion jetcache-starter/jetcache-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${redisson.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.alicp.jetcache.external.ExternalCacheBuilder;
import com.alicp.jetcache.redisson.RedissonCacheBuilder;
import org.redisson.api.RedissonClient;
import org.redisson.codec.KryoCodec;
import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand Down Expand Up @@ -70,4 +72,9 @@ public void setApplicationContext(final ApplicationContext context) throws Beans
this.context = context;
}
}

@Bean("redissonCodecCustomizer")
public RedissonAutoConfigurationCustomizer redissonCodecCustomizer() {
return config -> config.setCodec(new KryoCodec());
}
}
2 changes: 1 addition & 1 deletion jetcache-starter/jetcache-starter-redisson/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@
<version>${redisson.version}</version>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
import com.alicp.jetcache.external.AbstractExternalCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Connection;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.Response;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.commands.KeyBinaryCommands;
Expand Down Expand Up @@ -143,7 +142,7 @@ static int randomIndex(int[] weights) {
int x = 0;
for (int i = 0; i < weights.length; i++) {
x += weights[i];
if(r < x){
if (r < x) {
return i;
}
}
Expand Down Expand Up @@ -276,36 +275,30 @@ protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit tim
@Override
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
StringBinaryCommands commands = null;
Connection connection = null;
PipelineBase pipeline = null;
try {
commands = (StringBinaryCommands) writeCommands();
int failCount = 0;
if(commands instanceof Jedis || commands instanceof JedisPooled) {
List<Response<String>> responses = new ArrayList<>();
Pipeline pipeline = null;
if(commands instanceof JedisPooled) {
connection = ((JedisPooled) commands).getPool().getResource();
pipeline = new Pipeline(connection);
} else {
pipeline = new Pipeline((Jedis) commands);
}
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
pipeline.sync();
for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
List<Response<String>> responses = new ArrayList<>();
if (commands instanceof Jedis) {
// FIXME 这里的 Connection 能关吗?
pipeline = ((Jedis) commands).pipelined();
} else if (commands instanceof JedisPooled) {
pipeline = ((JedisPooled) commands).pipelined();
} else if (commands instanceof JedisCluster) {
pipeline = ((JedisCluster) commands).pipelined();
} else {
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheResult r = do_PUT(en.getKey(), en.getValue(), expireAfterWrite, timeUnit);
if (!r.isSuccess()) {
failCount++;
}
throw new IllegalArgumentException("unrecognized redis client type");
}
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
pipeline.sync();
for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
Expand All @@ -315,7 +308,7 @@ protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireA
return new CacheResult(ex);
} finally {
closeJedis(commands);
close(connection);
close(pipeline);
}
}

Expand Down
5 changes: 1 addition & 4 deletions jetcache-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.12.13</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
Expand Down Expand Up @@ -134,12 +133,10 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.mvel</groupId>
Expand Down Expand Up @@ -179,4 +176,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.alicp.jetcache.support.DefaultCacheMonitor;
import com.alicp.jetcache.support.StatInfo;
import com.alicp.jetcache.support.StatInfoLogger;
import com.alicp.jetcache.test.anno.TestUtil;
import com.alicp.jetcache.test.support.DynamicQuery;
import org.junit.Assert;
import org.slf4j.Logger;
Expand All @@ -34,6 +33,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static com.alicp.jetcache.test.support.Tick.tick;
Expand Down Expand Up @@ -920,42 +920,83 @@ private static void penetrationProtectReEntryTest(Cache cache) {

private static void penetrationProtectTimeoutTest(Cache cache) throws Exception {
String keyPrefix = "penetrationProtectTimeoutTest_";
AtomicInteger loadSuccess = new AtomicInteger(0);
Function loader = k -> {
final Duration SHORT_PROTECT_DURATION = Duration.ofMillis(1);
final Duration LONG_PROTECT_DURATION = Duration.ofSeconds(60);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

穿透保护相关的单测涉及到线程执行效率都比较 flaky,我重写了一版,应该不会改变语义

cache.config().setPenetrationProtectTimeout(SHORT_PROTECT_DURATION);
final AtomicInteger loadSuccess = new AtomicInteger(0);
final CountDownLatch firstBarrier = new CountDownLatch(1);
final Function normalLoader = k -> {
loadSuccess.incrementAndGet();
return k + "_V";
};
// 1. 第一个 Loader 需要等待 firstBarrier 释放, 并记录线程等待的时间
AtomicReference<Duration> duration = new AtomicReference<>();
Function firstBarrierLoader = k -> {
long start = System.nanoTime();
try {
Thread.sleep(tick(75));
firstBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
loadSuccess.incrementAndGet();
return k + "_V";
duration.set(Duration.ofNanos(System.nanoTime() - start));
return normalLoader.apply(k);
};

cache.config().setPenetrationProtectTimeout(Duration.ofMillis(1));
Runnable runnable = () -> cache.computeIfAbsent(keyPrefix + 1, loader);
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
Thread t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, firstBarrierLoader));
Thread t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, normalLoader));
// 先启动线程 1,验证 barrierLoader 阻塞线程 1
t1.start();
Thread.sleep(tick(25));
Assert.assertEquals(0, loadSuccess.intValue());
// 启动线程 2 再延迟一段时间,验证线程 2 过了保护时间后可以成功加载
t2.start();
Thread.sleep(tick(25));
Assert.assertEquals(1, loadSuccess.intValue());
// 释放线程 1,验证两个都加载好了, 并且线程 1 等待时间超过保护时间
firstBarrier.countDown();
t1.join();
t2.join();
Assert.assertEquals(2, loadSuccess.intValue());
Assert.assertTrue(SHORT_PROTECT_DURATION.compareTo(duration.get()) < 0);

cache.config().setPenetrationProtectTimeout(Duration.ofSeconds(60));
// 设置长保护时间, 重置 loadSuccess 和 duration
cache.config().setPenetrationProtectTimeout(LONG_PROTECT_DURATION);
CountDownLatch secondBarrier = new CountDownLatch(1);
loadSuccess.set(0);
runnable = () -> cache.computeIfAbsent(keyPrefix + 2, loader);
t1 = new Thread(runnable);
t2 = new Thread(runnable);
Thread t3 = new Thread(runnable);
duration.set(Duration.ZERO);
Function secondBarrierLoader = k -> {
long start = System.nanoTime();
try {
secondBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
duration.set(Duration.ofNanos(System.nanoTime() - start));
return normalLoader.apply(k);
};
// 设置线程 1,3 为长时间等待,线程 2 为正常加载
t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader));
t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, normalLoader));
Thread t3 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader));
// 延迟启动 1、2、3
t1.start();
Thread.sleep(tick(25));
t2.start();
Thread.sleep(tick(25));
t3.start();
Thread.sleep(tick(25));
t3.interrupt();
// 验证延迟一段之间后,仍然是 0,即使有普通加载的 t2 线程
Assert.assertEquals(0, loadSuccess.intValue());
// 先中断线程 2,然后延迟一段时间释放 barrier,验证线程 1、3 都加载成功
t2.interrupt();
Thread.sleep(tick(25));
secondBarrier.countDown();
t1.join();
t2.join();
t3.join();
// 验证只有两个线程加载成功,线程 2 加载失败
Assert.assertEquals(2, loadSuccess.intValue());
// 验证时间没超过保护时间
Assert.assertTrue(LONG_PROTECT_DURATION.compareTo(duration.get()) > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.alicp.jetcache.test.spring.SpringTest;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.spring.starter.RedissonAutoConfigurationV2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
Expand All @@ -27,7 +28,7 @@
* @author huangli
*/
@Configuration
@EnableAutoConfiguration
@EnableAutoConfiguration(exclude = {RedissonAutoConfigurationV2.class})
@ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableCreateCacheAnnotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import com.alicp.jetcache.anno.CreateCache;
import com.alicp.jetcache.anno.config.EnableCreateCacheAnnotation;
import com.alicp.jetcache.anno.config.EnableMethodCache;
import com.alicp.jetcache.redis.lettuce.RedisLettuceCacheTest;
import com.alicp.jetcache.redis.lettuce.RedisLettuceCacheConfig;
import com.alicp.jetcache.support.FastjsonKeyConvertor;
import com.alicp.jetcache.redis.lettuce.RedisLettuceCacheTest;
import com.alicp.jetcache.test.beans.MyFactoryBean;
import com.alicp.jetcache.test.spring.SpringTest;
import io.lettuce.core.RedisClient;
Expand All @@ -20,6 +19,7 @@
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.spring.starter.RedissonAutoConfigurationV2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
Expand All @@ -37,7 +37,7 @@
* @author huangli
*/
@Configuration
@EnableAutoConfiguration
@EnableAutoConfiguration(exclude = {RedissonAutoConfigurationV2.class})
@ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableCreateCacheAnnotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.alicp.jetcache.test.spring.SpringTest;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.spring.starter.RedissonAutoConfigurationV2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
Expand All @@ -35,7 +36,7 @@
* @author huangli
*/
@Configuration
@EnableAutoConfiguration
@EnableAutoConfiguration(exclude = RedissonAutoConfigurationV2.class)
@ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableCreateCacheAnnotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.alicp.jetcache.test.spring.SpringTest;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.spring.starter.RedissonAutoConfigurationV2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
Expand All @@ -26,7 +27,7 @@
* @author huangli
*/
@Configuration
@EnableAutoConfiguration
@EnableAutoConfiguration(exclude = RedissonAutoConfigurationV2.class)
@ComponentScan(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableMethodCache(basePackages = {"com.alicp.jetcache.test.beans", "com.alicp.jetcache.anno.inittestbeans"})
@EnableCreateCacheAnnotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.KryoCodec;
import org.redisson.config.Config;

import java.util.Random;
Expand All @@ -27,6 +28,8 @@ public class RedissonBroadcastManagerTest extends AbstractBroadcastManagerTest {
public void initRedissonClient() {
final Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
// 兼容 kryo4
config.setCodec(new KryoCodec());
this.client = Redisson.create(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.KryoCodec;
import org.redisson.config.Config;

import java.util.Random;
Expand All @@ -27,6 +28,7 @@ public class RedissonCacheTest extends AbstractExternalCacheTest {
public void redissonTest() throws Exception {
final Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
config.setCodec(new KryoCodec());
doTest(Redisson.create(config));
}

Expand Down
Loading
Loading