From f8f7e9d9b964244ec1c2e7dfb860067da731caa7 Mon Sep 17 00:00:00 2001 From: zt9788 Date: Thu, 5 Oct 2023 15:21:53 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E4=BC=98=E5=8C=96threadlocal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jetcache/anno/support/CacheContext.java | 47 ++++++++++++++- .../jetcache/support/Kryo5ValueDecoder.java | 12 +++- .../jetcache/support/Kryo5ValueEncoder.java | 60 ++++++++++++------- .../com/alicp/jetcache/VirtualThreadUtil.java | 28 +++++++++ .../anno/support/CacheContextTest.java | 30 ++++++++++ .../embedded/LinkedHashMapCacheTest.java | 17 ++---- .../jetcache/support/Kryo5EncoderTest.java | 50 ++++++++++++++++ 7 files changed, 207 insertions(+), 37 deletions(-) create mode 100644 jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java diff --git a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java index e005c5c6b..8bf4cab3f 100644 --- a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java +++ b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java @@ -12,7 +12,9 @@ import com.alicp.jetcache.template.QuickConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cglib.core.ReflectUtils; +import java.lang.reflect.Method; import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -134,6 +136,8 @@ public static T enableCache(Supplier callback) { return callback.get(); } finally { var.setEnabledCount(var.getEnabledCount() - 1); + if(var.getEnabledCount() <= 0) + clear(); } } @@ -145,10 +149,51 @@ protected static void enable() { protected static void disable() { CacheThreadLocal var = cacheThreadLocal.get(); var.setEnabledCount(var.getEnabledCount() - 1); + if(var.getEnabledCount() <= 0) + clear(); + } + + /** + * copy snippet from com.alibaba.fastjson2.util.JDKUtils + */ + private static int jdkVersion(){ + try { + String javaSpecVer = System.getProperty("java.specification.version"); + if (javaSpecVer.startsWith("1.")) { + javaSpecVer = javaSpecVer.substring(2); + } + if (javaSpecVer.indexOf('.') == -1) { + return Integer.parseInt(javaSpecVer); + } + } catch (Throwable ignored) { + return 8; + } + return 8; + } + + /** + * when jdkVersion >= 19 and current Thread is virtual , we need to clear the cacheThreadLocal + */ + protected static void clear(){ + try { + if(jdkVersion() >= 19) { + Thread current = Thread.currentThread(); + Method method = ReflectUtils.findDeclaredMethod(Thread.class, "isVirtual", new Class[0]); + Boolean isVirtual = (Boolean) method.invoke(current); + if (isVirtual) { + cacheThreadLocal.remove(); + } + } + } catch (Exception ignored) { + + } } protected static boolean isEnabled() { - return cacheThreadLocal.get().getEnabledCount() > 0; + if(cacheThreadLocal.get() != null) + return cacheThreadLocal.get().getEnabledCount() > 0; + else + return false; } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java index 12924b886..4f9799a75 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java @@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.kryo5.io.Input; import java.io.ByteArrayInputStream; +import java.lang.ref.WeakReference; /** * Created on 2016/10/4. @@ -27,7 +28,16 @@ public Object doApply(byte[] buffer) { in = new ByteArrayInputStream(buffer); } Input input = new Input(in); - Kryo kryo = (Kryo) Kryo5ValueEncoder.kryoThreadLocal.get()[0]; + WeakReference weakReference = Kryo5ValueEncoder.kryoThreadLocal.get(); + Kryo5ValueEncoder.KryoCache kryoCache = null; + if(weakReference == null || weakReference.get() == null){ + kryoCache = new Kryo5ValueEncoder.KryoCache(); + weakReference = new WeakReference<>(kryoCache); + Kryo5ValueEncoder.kryoThreadLocal.set(weakReference); + }else{ + kryoCache = weakReference.get(); + } + Kryo kryo = (Kryo) kryoCache.getKryo();//Kryo5ValueEncoder.kryoThreadLocal.get()[0]; ClassLoader classLoader = Kryo5ValueDecoder.class.getClassLoader(); Thread t = Thread.currentThread(); if (t != null) { diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java index 8f7279eba..8fe0d9f0c 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java @@ -18,16 +18,31 @@ public class Kryo5ValueEncoder extends AbstractValueEncoder { private static int INIT_BUFFER_SIZE = 256; - static ThreadLocal kryoThreadLocal = ThreadLocal.withInitial(() -> { - Kryo kryo = new Kryo(); - kryo.setDefaultSerializer(CompatibleFieldSerializer.class); - kryo.setRegistrationRequired(false); + static ThreadLocal> kryoThreadLocal = ThreadLocal.withInitial(() -> { + KryoCache kryo = new KryoCache(); + WeakReference ref = new WeakReference<>(kryo); + return ref; + }); - Output output = new Output(INIT_BUFFER_SIZE, -1); + static class KryoCache{ + final Output output; + final Kryo kryo; + public KryoCache(){ + kryo = new Kryo(); + kryo.setDefaultSerializer(CompatibleFieldSerializer.class); + kryo.setRegistrationRequired(false); + output = new Output(INIT_BUFFER_SIZE, -1); + } - WeakReference ref = new WeakReference<>(output); - return new Object[]{kryo, ref}; - }); + public Output getOutput(){ + return output; + } + + public Kryo getKryo(){ + return kryo; + } + + } public Kryo5ValueEncoder(boolean useIdentityNumber) { super(useIdentityNumber); @@ -35,28 +50,27 @@ public Kryo5ValueEncoder(boolean useIdentityNumber) { @Override public byte[] apply(Object value) { + KryoCache kryoCache = null; try { - Object[] kryoAndBuffer = kryoThreadLocal.get(); - Kryo kryo = (Kryo) kryoAndBuffer[0]; - WeakReference ref = (WeakReference) kryoAndBuffer[1]; - Output output = ref.get(); - if (output == null) { - output = new Output(INIT_BUFFER_SIZE, -1); + WeakReference weakRef = kryoThreadLocal.get(); + if(weakRef == null || weakRef.get() == null){ + kryoCache = new KryoCache(); + weakRef = new WeakReference<>(kryoCache); + kryoThreadLocal.set(weakRef); + }else{ + kryoCache = weakRef.get(); } - try { if (useIdentityNumber) { - writeInt(output, SerialPolicy.IDENTITY_NUMBER_KRYO5); + writeInt(kryoCache.getOutput(), SerialPolicy.IDENTITY_NUMBER_KRYO5); } - kryo.reset(); - kryo.writeClassAndObject(output, value); - return output.toBytes(); + kryoCache.getKryo().reset(); + kryoCache.getKryo().writeClassAndObject(kryoCache.getOutput(), value); + return kryoCache.getOutput().toBytes(); } finally { //reuse buffer if possible - output.reset(); - if (ref.get() == null) { - ref = new WeakReference<>(output); - kryoAndBuffer[1] = ref; + if(kryoCache != null) { + kryoCache.getOutput().reset(); } } } catch (Exception e) { diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java b/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java new file mode 100644 index 000000000..3222b28fd --- /dev/null +++ b/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java @@ -0,0 +1,28 @@ +package com.alicp.jetcache; + +import org.springframework.cglib.core.ReflectUtils; + +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @Description + * @author: zhangtong + * @create: 2023/10/5 11:44 AM + */ +public class VirtualThreadUtil { + public static ExecutorService createExecuteor(){ + ExecutorService executorService = null; + try { + Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newVirtualThreadPerTaskExecutor", new Class[]{}); + if (method != null) { + System.out.println("use newVirtualThreadPerTaskExecutor"); + executorService = (ExecutorService) method.invoke(null); + } + }catch (Exception e){ + return null; + } + return executorService; + } +} diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java index 9d10e553c..3125af5f8 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java @@ -3,13 +3,43 @@ */ package com.alicp.jetcache.anno.support; +import com.alicp.jetcache.VirtualThreadUtil; import org.junit.Assert; import org.junit.Test; +import org.springframework.cglib.core.ReflectUtils; + +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * @author huangli */ public class CacheContextTest { + + @Test + public void testVirtualThreadTL() throws InterruptedException { + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null) return; + for (int i = 0; i < 1000; i++) { + executorService.submit(this::test); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + @Test + public void testFixThreadTL() throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(2); + for (int i = 0; i < 10; i++) { + executorService.submit(this::test); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + @Test public void test() { CacheContext.enable(); diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java index d19cf9e8c..9268e5318 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java @@ -6,6 +6,7 @@ import com.alicp.jetcache.Cache; import com.alicp.jetcache.CacheConfig; import com.alicp.jetcache.CacheResultCode; +import com.alicp.jetcache.VirtualThreadUtil; import org.junit.Assert; import org.junit.Test; import org.springframework.cglib.core.ReflectUtils; @@ -37,20 +38,11 @@ public void test() throws Exception { public void mutilTest() throws InterruptedException{ cache = EmbeddedCacheBuilder.createEmbeddedCacheBuilder() .buildFunc(getBuildFunc()).expireAfterWrite(5000, TimeUnit.MILLISECONDS).limit(10240).buildCache(); - ExecutorService executorService = null; - try { - Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newVirtualThreadPerTaskExecutor", new Class[]{}); - if (method != null) { - System.out.println("use newVirtualThreadPerTaskExecutor"); - executorService = (ExecutorService) method.invoke(null); - } else { - System.out.println("use newFixedThreadPool"); - executorService = Executors.newFixedThreadPool(3); - } - }catch (Exception e){ + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null){ executorService = Executors.newFixedThreadPool(3); } - executorService.awaitTermination(10,TimeUnit.SECONDS); + executorService.submit(() -> { for (int i = 0; i < 20480; i+=2) { cache.putIfAbsent("K" + i, "V" + i); @@ -69,6 +61,7 @@ public void mutilTest() throws InterruptedException{ } }); executorService.shutdown(); + executorService.awaitTermination(10,TimeUnit.SECONDS); } @Test diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java index 18d0e29fb..aa7315f01 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java @@ -1,8 +1,13 @@ package com.alicp.jetcache.support; +import com.alicp.jetcache.VirtualThreadUtil; import com.alicp.jetcache.anno.SerialPolicy; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -11,6 +16,7 @@ * @author huangli */ public class Kryo5EncoderTest extends AbstractEncoderTest { + @Test public void test() { encoder = Kryo5ValueEncoder.INSTANCE; @@ -66,4 +72,48 @@ public void gcTest() { super.gcTest(); } + + @Test + public void testVirtualThreadTL() throws InterruptedException { + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null) return; + for (int i = 0; i < 1000; i++) { + executorService.submit(this::test); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + @Test + public void testVirtualThreadGC() throws InterruptedException { + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null) return; + for (int i = 0; i < 1000; i++) { + executorService.submit(this::gcTest); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + @Test + public void testFixThreadTL() throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(2); + for (int i = 0; i < 10; i++) { + executorService.submit(this::test); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + @Test + public void testFixThreadGC() throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(2); + for (int i = 0; i < 10; i++) { + executorService.submit(this::gcTest); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + } From cd3bb3b64866d31f99de8f5649bd442d3fc4eb62 Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sat, 7 Oct 2023 10:23:30 +0800 Subject: [PATCH 2/9] use simple objectpool replace Cache of Threadlocal --- .../jetcache/anno/support/CacheContext.java | 47 +----------- .../java/com/alicp/jetcache/ObjectPool.java | 52 ++++++++++++++ .../jetcache/support/Kryo5ValueDecoder.java | 36 +++++----- .../jetcache/support/Kryo5ValueEncoder.java | 55 +++++++------- .../jetcache/support/KryoValueDecoder.java | 26 ++++--- .../jetcache/support/KryoValueEncoder.java | 71 +++++++++++-------- .../jetcache/support/Kryo5EncoderTest.java | 4 +- 7 files changed, 154 insertions(+), 137 deletions(-) create mode 100644 jetcache-core/src/main/java/com/alicp/jetcache/ObjectPool.java diff --git a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java index 8bf4cab3f..e005c5c6b 100644 --- a/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java +++ b/jetcache-anno/src/main/java/com/alicp/jetcache/anno/support/CacheContext.java @@ -12,9 +12,7 @@ import com.alicp.jetcache.template.QuickConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.cglib.core.ReflectUtils; -import java.lang.reflect.Method; import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -136,8 +134,6 @@ public static T enableCache(Supplier callback) { return callback.get(); } finally { var.setEnabledCount(var.getEnabledCount() - 1); - if(var.getEnabledCount() <= 0) - clear(); } } @@ -149,51 +145,10 @@ protected static void enable() { protected static void disable() { CacheThreadLocal var = cacheThreadLocal.get(); var.setEnabledCount(var.getEnabledCount() - 1); - if(var.getEnabledCount() <= 0) - clear(); - } - - /** - * copy snippet from com.alibaba.fastjson2.util.JDKUtils - */ - private static int jdkVersion(){ - try { - String javaSpecVer = System.getProperty("java.specification.version"); - if (javaSpecVer.startsWith("1.")) { - javaSpecVer = javaSpecVer.substring(2); - } - if (javaSpecVer.indexOf('.') == -1) { - return Integer.parseInt(javaSpecVer); - } - } catch (Throwable ignored) { - return 8; - } - return 8; - } - - /** - * when jdkVersion >= 19 and current Thread is virtual , we need to clear the cacheThreadLocal - */ - protected static void clear(){ - try { - if(jdkVersion() >= 19) { - Thread current = Thread.currentThread(); - Method method = ReflectUtils.findDeclaredMethod(Thread.class, "isVirtual", new Class[0]); - Boolean isVirtual = (Boolean) method.invoke(current); - if (isVirtual) { - cacheThreadLocal.remove(); - } - } - } catch (Exception ignored) { - - } } protected static boolean isEnabled() { - if(cacheThreadLocal.get() != null) - return cacheThreadLocal.get().getEnabledCount() > 0; - else - return false; + return cacheThreadLocal.get().getEnabledCount() > 0; } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/ObjectPool.java b/jetcache-core/src/main/java/com/alicp/jetcache/ObjectPool.java new file mode 100644 index 000000000..4a2cb951a --- /dev/null +++ b/jetcache-core/src/main/java/com/alicp/jetcache/ObjectPool.java @@ -0,0 +1,52 @@ +package com.alicp.jetcache; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * @Description + * @author: zhangtong + * @create: 2023/10/6 3:27 PM + */ +public class ObjectPool { + private final ArrayBlockingQueue queue; + private final int size; + private final ObjectFactory factory; + private static final Logger logger = LoggerFactory.getLogger(ObjectPool.class); + + public ObjectPool(int size, ObjectFactory factory) { + this.size = size; + this.factory = factory; + queue = new ArrayBlockingQueue<>(size); + for (int i = 0; i < size; i++) { + queue.add(factory.create()); + } + logger.debug("Init the object pool with size {}", size); + } + + public T borrowObject() { + T t = queue.poll(); + if(t == null) { + logger.debug("The pool is not enough, create a new object"); + return factory.create(); + } + return t; + } + + public void returnObject(T obj) { + if (obj == null) { + return; + } + factory.reset(obj); + queue.offer(obj); + } + + public interface ObjectFactory { + T create(); + void reset(T obj); + } +} diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java index 4f9799a75..b549a20c2 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueDecoder.java @@ -4,7 +4,6 @@ import com.esotericsoftware.kryo.kryo5.io.Input; import java.io.ByteArrayInputStream; -import java.lang.ref.WeakReference; /** * Created on 2016/10/4. @@ -28,25 +27,24 @@ public Object doApply(byte[] buffer) { in = new ByteArrayInputStream(buffer); } Input input = new Input(in); - WeakReference weakReference = Kryo5ValueEncoder.kryoThreadLocal.get(); - Kryo5ValueEncoder.KryoCache kryoCache = null; - if(weakReference == null || weakReference.get() == null){ - kryoCache = new Kryo5ValueEncoder.KryoCache(); - weakReference = new WeakReference<>(kryoCache); - Kryo5ValueEncoder.kryoThreadLocal.set(weakReference); - }else{ - kryoCache = weakReference.get(); - } - Kryo kryo = (Kryo) kryoCache.getKryo();//Kryo5ValueEncoder.kryoThreadLocal.get()[0]; - ClassLoader classLoader = Kryo5ValueDecoder.class.getClassLoader(); - Thread t = Thread.currentThread(); - if (t != null) { - ClassLoader ctxClassLoader = t.getContextClassLoader(); - if (ctxClassLoader != null) { - classLoader = ctxClassLoader; + Kryo5ValueEncoder.Kryo5Cache kryoCache = null; + try { + kryoCache = Kryo5ValueEncoder.kryoCacheObjectPool.borrowObject(); + Kryo kryo = kryoCache.getKryo(); + ClassLoader classLoader = Kryo5ValueDecoder.class.getClassLoader(); + Thread t = Thread.currentThread(); + if (t != null) { + ClassLoader ctxClassLoader = t.getContextClassLoader(); + if (ctxClassLoader != null) { + classLoader = ctxClassLoader; + } + } + kryo.setClassLoader(classLoader); + return kryo.readClassAndObject(input); + }finally { + if(kryoCache != null){ + Kryo5ValueEncoder.kryoCacheObjectPool.returnObject(kryoCache); } } - kryo.setClassLoader(classLoader); - return kryo.readClassAndObject(input); } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java index 8fe0d9f0c..5538cc695 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java @@ -1,12 +1,11 @@ package com.alicp.jetcache.support; +import com.alicp.jetcache.ObjectPool; import com.alicp.jetcache.anno.SerialPolicy; import com.esotericsoftware.kryo.kryo5.Kryo; import com.esotericsoftware.kryo.kryo5.io.Output; import com.esotericsoftware.kryo.kryo5.serializers.CompatibleFieldSerializer; -import java.lang.ref.WeakReference; - /** * Created on 2016/10/4. * @@ -16,18 +15,26 @@ public class Kryo5ValueEncoder extends AbstractValueEncoder { public static final Kryo5ValueEncoder INSTANCE = new Kryo5ValueEncoder(true); - private static int INIT_BUFFER_SIZE = 256; + private static final int INIT_BUFFER_SIZE = 256; + + //Default size = 1M + static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*1024/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { + @Override + public Kryo5Cache create() { + return new Kryo5Cache(); + } - static ThreadLocal> kryoThreadLocal = ThreadLocal.withInitial(() -> { - KryoCache kryo = new KryoCache(); - WeakReference ref = new WeakReference<>(kryo); - return ref; + @Override + public void reset(Kryo5Cache obj) { + obj.getKryo().reset(); + obj.getOutput().reset(); + } }); - static class KryoCache{ + public static class Kryo5Cache { final Output output; final Kryo kryo; - public KryoCache(){ + public Kryo5Cache(){ kryo = new Kryo(); kryo.setDefaultSerializer(CompatibleFieldSerializer.class); kryo.setRegistrationRequired(false); @@ -50,33 +57,21 @@ public Kryo5ValueEncoder(boolean useIdentityNumber) { @Override public byte[] apply(Object value) { - KryoCache kryoCache = null; + Kryo5Cache kryoCache = null; try { - WeakReference weakRef = kryoThreadLocal.get(); - if(weakRef == null || weakRef.get() == null){ - kryoCache = new KryoCache(); - weakRef = new WeakReference<>(kryoCache); - kryoThreadLocal.set(weakRef); - }else{ - kryoCache = weakRef.get(); - } - try { - if (useIdentityNumber) { - writeInt(kryoCache.getOutput(), SerialPolicy.IDENTITY_NUMBER_KRYO5); - } - kryoCache.getKryo().reset(); - kryoCache.getKryo().writeClassAndObject(kryoCache.getOutput(), value); - return kryoCache.getOutput().toBytes(); - } finally { - //reuse buffer if possible - if(kryoCache != null) { - kryoCache.getOutput().reset(); - } + kryoCache = kryoCacheObjectPool.borrowObject(); + if (useIdentityNumber) { + writeInt(kryoCache.getOutput(), SerialPolicy.IDENTITY_NUMBER_KRYO5); } + kryoCache.getKryo().writeClassAndObject(kryoCache.getOutput(), value); + return kryoCache.getOutput().toBytes(); } catch (Exception e) { StringBuilder sb = new StringBuilder("Kryo Encode error. "); sb.append("msg=").append(e.getMessage()); throw new CacheEncodeException(sb.toString(), e); + }finally { + if(kryoCache != null) + kryoCacheObjectPool.returnObject(kryoCache); } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueDecoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueDecoder.java index fc5705e93..db8c8449a 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueDecoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueDecoder.java @@ -27,16 +27,24 @@ public Object doApply(byte[] buffer) { in = new ByteArrayInputStream(buffer); } Input input = new Input(in); - Kryo kryo = (Kryo) KryoValueEncoder.kryoThreadLocal.get()[0]; - ClassLoader classLoader = KryoValueDecoder.class.getClassLoader(); - Thread t = Thread.currentThread(); - if (t != null) { - ClassLoader ctxClassLoader = t.getContextClassLoader(); - if (ctxClassLoader != null) { - classLoader = ctxClassLoader; + KryoValueEncoder.KryoCache kryoCache = null; + try { + kryoCache = KryoValueEncoder.kryoCacheObjectPool.borrowObject(); + Kryo kryo = kryoCache.getKryo(); + ClassLoader classLoader = KryoValueDecoder.class.getClassLoader(); + Thread t = Thread.currentThread(); + if (t != null) { + ClassLoader ctxClassLoader = t.getContextClassLoader(); + if (ctxClassLoader != null) { + classLoader = ctxClassLoader; + } + } + kryo.setClassLoader(classLoader); + return kryo.readClassAndObject(input); + }finally { + if(kryoCache != null){ + KryoValueEncoder.kryoCacheObjectPool.returnObject(kryoCache); } } - kryo.setClassLoader(classLoader); - return kryo.readClassAndObject(input); } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java index c830108f0..76f0c1734 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java @@ -1,11 +1,13 @@ package com.alicp.jetcache.support; +import com.alicp.jetcache.ObjectPool; import com.alicp.jetcache.anno.SerialPolicy; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer; import java.lang.ref.WeakReference; +import java.util.Arrays; /** * Created on 2016/10/4. @@ -16,53 +18,60 @@ public class KryoValueEncoder extends AbstractValueEncoder { public static final KryoValueEncoder INSTANCE = new KryoValueEncoder(true); - private static int INIT_BUFFER_SIZE = 256; + private static final int INIT_BUFFER_SIZE = 256; - static ThreadLocal kryoThreadLocal = ThreadLocal.withInitial(() -> { - Kryo kryo = new Kryo(); - kryo.setDefaultSerializer(CompatibleFieldSerializer.class); -// kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); -// kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); - - byte[] buffer = new byte[INIT_BUFFER_SIZE]; + //Default size = 1M + static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*1024/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { + @Override + public KryoCache create() { + return new KryoCache(); + } - WeakReference ref = new WeakReference<>(buffer); - return new Object[]{kryo, ref}; + @Override + public void reset(KryoCache obj) { + obj.getKryo().reset(); + Arrays.fill(obj.buffer, (byte) 0); + } }); + public static class KryoCache { + final byte[] buffer; + final Kryo kryo; + public KryoCache(){ + kryo = new Kryo(); + kryo.setDefaultSerializer(CompatibleFieldSerializer.class); + buffer = new byte[INIT_BUFFER_SIZE]; + } + public byte[] getBuffer(){ + return buffer; + } + public Kryo getKryo(){ + return kryo; + } + } + public KryoValueEncoder(boolean useIdentityNumber) { super(useIdentityNumber); } @Override public byte[] apply(Object value) { + KryoCache kryoCache = null; try { - Object[] kryoAndBuffer = kryoThreadLocal.get(); - Kryo kryo = (Kryo) kryoAndBuffer[0]; - WeakReference ref = (WeakReference) kryoAndBuffer[1]; - byte[] buffer = ref.get(); - if (buffer == null) { - buffer = new byte[INIT_BUFFER_SIZE]; - } - Output output = new Output(buffer, -1); - - try { - if (useIdentityNumber) { - writeInt(output, SerialPolicy.IDENTITY_NUMBER_KRYO4); - } - kryo.writeClassAndObject(output, value); - return output.toBytes(); - } finally { - //reuse buffer if possible - if (ref.get() == null || buffer != output.getBuffer()) { - ref = new WeakReference<>(output.getBuffer()); - kryoAndBuffer[1] = ref; - } + kryoCache = kryoCacheObjectPool.borrowObject(); + Output output = new Output(kryoCache.getBuffer(), -1); + if (useIdentityNumber) { + writeInt(output, SerialPolicy.IDENTITY_NUMBER_KRYO4); } + kryoCache.getKryo().writeClassAndObject(output, value); + return output.toBytes(); } catch (Exception e) { StringBuilder sb = new StringBuilder("Kryo Encode error. "); sb.append("msg=").append(e.getMessage()); throw new CacheEncodeException(sb.toString(), e); + } finally { + if(kryoCache != null) + kryoCacheObjectPool.returnObject(kryoCache); } } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java index aa7315f01..46a4b6189 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java @@ -74,7 +74,7 @@ public void gcTest() { @Test - public void testVirtualThreadTL() throws InterruptedException { + public void testVirtualThreadPool() throws InterruptedException { ExecutorService executorService = VirtualThreadUtil.createExecuteor(); if(executorService == null) return; for (int i = 0; i < 1000; i++) { @@ -96,7 +96,7 @@ public void testVirtualThreadGC() throws InterruptedException { } @Test - public void testFixThreadTL() throws InterruptedException { + public void testFixThreadPool() throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i++) { executorService.submit(this::test); From b3dbbfee99af6b88ca330dc915ce54fc385a8687 Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sat, 7 Oct 2023 11:24:16 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E7=BC=A9=E5=B0=8Fpool=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E5=A4=A7=E5=B0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/alicp/jetcache/support/Kryo5ValueEncoder.java | 4 ++-- .../java/com/alicp/jetcache/support/KryoValueEncoder.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java index 5538cc695..d18c34ae7 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java @@ -17,8 +17,8 @@ public class Kryo5ValueEncoder extends AbstractValueEncoder { private static final int INIT_BUFFER_SIZE = 256; - //Default size = 1M - static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*1024/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { + //Default size = 256K + static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*256/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { @Override public Kryo5Cache create() { return new Kryo5Cache(); diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java index 76f0c1734..71819c4e3 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java @@ -20,8 +20,8 @@ public class KryoValueEncoder extends AbstractValueEncoder { private static final int INIT_BUFFER_SIZE = 256; - //Default size = 1M - static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*1024/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { + //Default size = 256K + static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*256/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { @Override public KryoCache create() { return new KryoCache(); From 64941b45286e5248290bebd65c80942b2dfa873f Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sat, 7 Oct 2023 13:44:33 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=B8=80=E4=B8=8Bpool?= =?UTF-8?q?=E5=A4=A7=E5=B0=8F=E5=92=8C=E5=8C=85=E4=BD=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jetcache/support/Kryo5ValueEncoder.java | 7 ++--- .../jetcache/support/KryoValueEncoder.java | 24 ++++++++-------- .../jetcache/{ => support}/ObjectPool.java | 4 +-- .../com/alicp/jetcache/VirtualThreadUtil.java | 6 +++- .../anno/support/CacheContextTest.java | 23 --------------- .../jetcache/support/KryoEncoderTest.java | 28 +++++++++++++++++++ 6 files changed, 50 insertions(+), 42 deletions(-) rename jetcache-core/src/main/java/com/alicp/jetcache/{ => support}/ObjectPool.java (91%) diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java index d18c34ae7..05e334266 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/Kryo5ValueEncoder.java @@ -1,6 +1,5 @@ package com.alicp.jetcache.support; -import com.alicp.jetcache.ObjectPool; import com.alicp.jetcache.anno.SerialPolicy; import com.esotericsoftware.kryo.kryo5.Kryo; import com.esotericsoftware.kryo.kryo5.io.Output; @@ -15,10 +14,10 @@ public class Kryo5ValueEncoder extends AbstractValueEncoder { public static final Kryo5ValueEncoder INSTANCE = new Kryo5ValueEncoder(true); - private static final int INIT_BUFFER_SIZE = 256; + private static final int INIT_BUFFER_SIZE = 2048; - //Default size = 256K - static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*256/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { + //Default size = 32K + static ObjectPool kryoCacheObjectPool = new ObjectPool<>(16, new ObjectPool.ObjectFactory() { @Override public Kryo5Cache create() { return new Kryo5Cache(); diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java index 71819c4e3..13fbd1240 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/KryoValueEncoder.java @@ -1,12 +1,10 @@ package com.alicp.jetcache.support; -import com.alicp.jetcache.ObjectPool; import com.alicp.jetcache.anno.SerialPolicy; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer; -import java.lang.ref.WeakReference; import java.util.Arrays; /** @@ -18,10 +16,10 @@ public class KryoValueEncoder extends AbstractValueEncoder { public static final KryoValueEncoder INSTANCE = new KryoValueEncoder(true); - private static final int INIT_BUFFER_SIZE = 256; + private static final int INIT_BUFFER_SIZE = 2048; - //Default size = 256K - static ObjectPool kryoCacheObjectPool = new ObjectPool<>(1024*256/INIT_BUFFER_SIZE, new ObjectPool.ObjectFactory() { + //Default size = 32K + static ObjectPool kryoCacheObjectPool = new ObjectPool<>(16, new ObjectPool.ObjectFactory() { @Override public KryoCache create() { return new KryoCache(); @@ -30,20 +28,22 @@ public KryoCache create() { @Override public void reset(KryoCache obj) { obj.getKryo().reset(); - Arrays.fill(obj.buffer, (byte) 0); + obj.getOutput().clear(); } }); public static class KryoCache { - final byte[] buffer; + final Output output; final Kryo kryo; public KryoCache(){ kryo = new Kryo(); kryo.setDefaultSerializer(CompatibleFieldSerializer.class); - buffer = new byte[INIT_BUFFER_SIZE]; + byte[] buffer = new byte[INIT_BUFFER_SIZE]; + output = new Output(buffer, -1); } - public byte[] getBuffer(){ - return buffer; + + public Output getOutput(){ + return output; } public Kryo getKryo(){ return kryo; @@ -59,7 +59,9 @@ public byte[] apply(Object value) { KryoCache kryoCache = null; try { kryoCache = kryoCacheObjectPool.borrowObject(); - Output output = new Output(kryoCache.getBuffer(), -1); +// Output output = new Output(kryoCache.getBuffer(), -1); +// output.clear(); + Output output = kryoCache.getOutput(); if (useIdentityNumber) { writeInt(output, SerialPolicy.IDENTITY_NUMBER_KRYO4); } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/ObjectPool.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/ObjectPool.java similarity index 91% rename from jetcache-core/src/main/java/com/alicp/jetcache/ObjectPool.java rename to jetcache-core/src/main/java/com/alicp/jetcache/support/ObjectPool.java index 4a2cb951a..e7803a661 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/ObjectPool.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/ObjectPool.java @@ -1,11 +1,9 @@ -package com.alicp.jetcache; +package com.alicp.jetcache.support; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; /** * @Description diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java b/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java index 3222b28fd..a9a51e39f 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java @@ -1,5 +1,7 @@ package com.alicp.jetcache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.cglib.core.ReflectUtils; import java.lang.reflect.Method; @@ -12,12 +14,14 @@ * @create: 2023/10/5 11:44 AM */ public class VirtualThreadUtil { + + private static Logger logger = LoggerFactory.getLogger(VirtualThreadUtil.class); public static ExecutorService createExecuteor(){ ExecutorService executorService = null; try { Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newVirtualThreadPerTaskExecutor", new Class[]{}); if (method != null) { - System.out.println("use newVirtualThreadPerTaskExecutor"); + logger.info("Test Thread start with newVirtualThreadPerTaskExecutor(Virtual)"); executorService = (ExecutorService) method.invoke(null); } }catch (Exception e){ diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java index 3125af5f8..8697ae3cd 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java @@ -17,29 +17,6 @@ * @author huangli */ public class CacheContextTest { - - @Test - public void testVirtualThreadTL() throws InterruptedException { - ExecutorService executorService = VirtualThreadUtil.createExecuteor(); - if(executorService == null) return; - for (int i = 0; i < 1000; i++) { - executorService.submit(this::test); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); - } - - @Test - public void testFixThreadTL() throws InterruptedException { - ExecutorService executorService = Executors.newFixedThreadPool(2); - for (int i = 0; i < 10; i++) { - executorService.submit(this::test); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); - } - - @Test public void test() { CacheContext.enable(); diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java index 04ad53002..492657c6c 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java @@ -1,8 +1,12 @@ package com.alicp.jetcache.support; +import com.alicp.jetcache.VirtualThreadUtil; import com.alicp.jetcache.anno.SerialPolicy; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -66,4 +70,28 @@ public void gcTest() { super.gcTest(); } + + + @Test + public void testVirtualThreadPool() throws InterruptedException { + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null) return; + for (int i = 0; i < 1000; i++) { + executorService.submit(this::test); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + @Test + public void testVirtualThreadGC() throws InterruptedException { + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null) return; + for (int i = 0; i < 1000; i++) { + executorService.submit(this::gcTest); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + } From 9dbfa201d0c14afcb95cf3279024e0b6c087e767 Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sat, 7 Oct 2023 13:52:48 +0800 Subject: [PATCH 5/9] java encode --- .../jetcache/support/JavaValueEncoder.java | 49 ++++++++++--------- .../jetcache/support/JavaEncoderTest.java | 28 +++++++++++ 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java index dcd483f39..51724b1b5 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java @@ -22,39 +22,40 @@ public JavaValueEncoder(boolean useIdentityNumber) { super(useIdentityNumber); } - private static ThreadLocal> threadLocal = - ThreadLocal.withInitial(() -> new WeakReference<>(new ByteArrayOutputStream(INIT_BUF_SIZE))); + static ObjectPool bosPool = new ObjectPool<>(16, new ObjectPool.ObjectFactory() { + @Override + public ByteArrayOutputStream create() { + return new ByteArrayOutputStream(INIT_BUF_SIZE); + } + + @Override + public void reset(ByteArrayOutputStream obj) { + obj.reset(); + } + }); @Override public byte[] apply(Object value) { + ByteArrayOutputStream bos = null; try { - WeakReference ref = threadLocal.get(); - ByteArrayOutputStream bos = ref.get(); - if (bos == null) { - bos = new ByteArrayOutputStream(INIT_BUF_SIZE); - threadLocal.set(new WeakReference<>(bos)); - } - - try { - if (useIdentityNumber) { - bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 24) & 0xFF); - bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 16) & 0xFF); - bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 8) & 0xFF); - bos.write(SerialPolicy.IDENTITY_NUMBER_JAVA & 0xFF); - } - - - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(value); - oos.flush(); - return bos.toByteArray(); - } finally { - bos.reset(); + bos = bosPool.borrowObject(); + if (useIdentityNumber) { + bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 24) & 0xFF); + bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 16) & 0xFF); + bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 8) & 0xFF); + bos.write(SerialPolicy.IDENTITY_NUMBER_JAVA & 0xFF); } + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(value); + oos.flush(); + return bos.toByteArray(); } catch (IOException e) { StringBuilder sb = new StringBuilder("Java Encode error. "); sb.append("msg=").append(e.getMessage()); throw new CacheEncodeException(sb.toString(), e); + }finally { + if(bos != null) + bosPool.returnObject(bos); } } } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java index 4e5f4e14c..d459ba97d 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java @@ -1,8 +1,12 @@ package com.alicp.jetcache.support; +import com.alicp.jetcache.VirtualThreadUtil; import com.alicp.jetcache.anno.SerialPolicy; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -66,4 +70,28 @@ public void gcTest() { super.gcTest(); } + + @Test + public void testVirtualThreadPool() throws InterruptedException { + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null) return; + for (int i = 0; i < 1000; i++) { + executorService.submit(this::test); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + @Test + public void testVirtualThreadGC() throws InterruptedException { + ExecutorService executorService = VirtualThreadUtil.createExecuteor(); + if(executorService == null) return; + for (int i = 0; i < 1000; i++) { + executorService.submit(this::gcTest); + } + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } + + } From 27cdbc2e83f9df260c502ab19d4136e9ef95e1cd Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sat, 7 Oct 2023 13:54:51 +0800 Subject: [PATCH 6/9] java encode2 --- .../main/java/com/alicp/jetcache/support/JavaValueEncoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java index 51724b1b5..6794d3676 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/JavaValueEncoder.java @@ -16,7 +16,7 @@ public class JavaValueEncoder extends AbstractValueEncoder { public static final JavaValueEncoder INSTANCE = new JavaValueEncoder(true); - private static final int INIT_BUF_SIZE = 256; + private static final int INIT_BUF_SIZE = 2048; public JavaValueEncoder(boolean useIdentityNumber) { super(useIdentityNumber); From 95ca7d06ff903e159fea89db71c9aee17fe02e14 Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sat, 7 Oct 2023 15:56:27 +0800 Subject: [PATCH 7/9] =?UTF-8?q?test=20=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/alicp/jetcache/VirtualThreadUtil.java | 1 + .../jetcache/support/AbstractEncoderTest.java | 22 +++++++++++ .../jetcache/support/JavaEncoderTest.java | 29 ++++++--------- .../jetcache/support/Kryo5EncoderTest.java | 37 ++----------------- .../jetcache/support/KryoEncoderTest.java | 27 ++++++-------- 5 files changed, 50 insertions(+), 66 deletions(-) diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java b/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java index a9a51e39f..27380d269 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java @@ -25,6 +25,7 @@ public static ExecutorService createExecuteor(){ executorService = (ExecutorService) method.invoke(null); } }catch (Exception e){ + logger.warn("JDK version may < 19, The Test will be skip..."); return null; } return executorService; diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java index cbb6fd530..ec6074b53 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java @@ -1,6 +1,7 @@ package com.alicp.jetcache.support; import com.alicp.jetcache.CacheValueHolder; +import com.alicp.jetcache.VirtualThreadUtil; import java.util.ArrayList; import java.util.Arrays; @@ -10,6 +11,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -136,4 +140,22 @@ protected void writeHeader(byte[] buf, int header) { buf[2] = (byte) (header >> 8 & 0xFF); buf[3] = (byte) (header & 0xFF); } + + + public void testByThreadPool(boolean isVirtual, int core, int size, Runnable runnable) throws InterruptedException { + ExecutorService executorService = null; + if(isVirtual) + executorService = VirtualThreadUtil.createExecuteor(); + else + executorService = Executors.newFixedThreadPool(core); + if(executorService == null) { + return; + } + for (int i = 0; i < size; i++) { + executorService.submit(runnable); + } + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } + } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java index d459ba97d..eec976c33 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java @@ -1,12 +1,8 @@ package com.alicp.jetcache.support; -import com.alicp.jetcache.VirtualThreadUtil; import com.alicp.jetcache.anno.SerialPolicy; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -73,25 +69,22 @@ public void gcTest() { @Test public void testVirtualThreadPool() throws InterruptedException { - ExecutorService executorService = VirtualThreadUtil.createExecuteor(); - if(executorService == null) return; - for (int i = 0; i < 1000; i++) { - executorService.submit(this::test); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(true,-1,1000,this::test); } @Test public void testVirtualThreadGC() throws InterruptedException { - ExecutorService executorService = VirtualThreadUtil.createExecuteor(); - if(executorService == null) return; - for (int i = 0; i < 1000; i++) { - executorService.submit(this::gcTest); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(true,-1,1000,this::gcTest); + } + + @Test + public void testFixThreadPool() throws InterruptedException { + testByThreadPool(false,3,1000,this::test); } + @Test + public void testFixThreadGC() throws InterruptedException { + testByThreadPool(false,3,1000,this::gcTest); + } } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java index 46a4b6189..8f6ff6652 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java @@ -1,13 +1,8 @@ package com.alicp.jetcache.support; -import com.alicp.jetcache.VirtualThreadUtil; import com.alicp.jetcache.anno.SerialPolicy; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -72,47 +67,23 @@ public void gcTest() { super.gcTest(); } - @Test public void testVirtualThreadPool() throws InterruptedException { - ExecutorService executorService = VirtualThreadUtil.createExecuteor(); - if(executorService == null) return; - for (int i = 0; i < 1000; i++) { - executorService.submit(this::test); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(true,-1,1000,this::test); } @Test public void testVirtualThreadGC() throws InterruptedException { - ExecutorService executorService = VirtualThreadUtil.createExecuteor(); - if(executorService == null) return; - for (int i = 0; i < 1000; i++) { - executorService.submit(this::gcTest); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(true,-1,1000,this::gcTest); } - @Test public void testFixThreadPool() throws InterruptedException { - ExecutorService executorService = Executors.newFixedThreadPool(2); - for (int i = 0; i < 10; i++) { - executorService.submit(this::test); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(false,3,1000,this::test); } @Test public void testFixThreadGC() throws InterruptedException { - ExecutorService executorService = Executors.newFixedThreadPool(2); - for (int i = 0; i < 10; i++) { - executorService.submit(this::gcTest); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(false,3,1000,this::gcTest); } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java index 492657c6c..0039ea5c1 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java @@ -71,27 +71,24 @@ public void gcTest() { } - @Test public void testVirtualThreadPool() throws InterruptedException { - ExecutorService executorService = VirtualThreadUtil.createExecuteor(); - if(executorService == null) return; - for (int i = 0; i < 1000; i++) { - executorService.submit(this::test); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(true,-1,1000,this::test); } @Test public void testVirtualThreadGC() throws InterruptedException { - ExecutorService executorService = VirtualThreadUtil.createExecuteor(); - if(executorService == null) return; - for (int i = 0; i < 1000; i++) { - executorService.submit(this::gcTest); - } - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); + testByThreadPool(true,-1,1000,this::gcTest); + } + + @Test + public void testFixThreadPool() throws InterruptedException { + testByThreadPool(false,3,1000,this::test); + } + + @Test + public void testFixThreadGC() throws InterruptedException { + testByThreadPool(false,3,1000,this::gcTest); } } From 26f4677db31219502fd6be8cffd889c77cc956b5 Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sat, 7 Oct 2023 16:19:28 +0800 Subject: [PATCH 8/9] retest --- .../com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java index 9268e5318..0c10f8afe 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java @@ -44,17 +44,17 @@ public void mutilTest() throws InterruptedException{ } executorService.submit(() -> { - for (int i = 0; i < 20480; i+=2) { + for (int i = 0; i < 1000; i+=2) { cache.putIfAbsent("K" + i, "V" + i); } }); executorService.submit(() -> { - for (int i = 1; i < 20480; i+=2) { + for (int i = 1; i < 1000; i+=2) { cache.remove("K" + i); } }); executorService.submit(() -> { - for (int i = 0; i < 20480; i++) { + for (int i = 0; i < 1000; i++) { Object value = cache.get("K" + i); if(!Objects.isNull(value)) Assert.assertEquals(("V"+i),value); From 757115a20953d927a5448b561f7e087fc2019db1 Mon Sep 17 00:00:00 2001 From: zhangtong2 Date: Sat, 7 Oct 2023 16:41:24 +0800 Subject: [PATCH 9/9] retest2 --- .../com/alicp/jetcache/support/AbstractEncoderTest.java | 6 ++++-- .../java/com/alicp/jetcache/support/JavaEncoderTest.java | 8 ++++---- .../java/com/alicp/jetcache/support/Kryo5EncoderTest.java | 8 ++++---- .../java/com/alicp/jetcache/support/KryoEncoderTest.java | 8 ++++---- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java index ec6074b53..d79da6e70 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java @@ -144,13 +144,15 @@ protected void writeHeader(byte[] buf, int header) { public void testByThreadPool(boolean isVirtual, int core, int size, Runnable runnable) throws InterruptedException { ExecutorService executorService = null; - if(isVirtual) + if(isVirtual) { executorService = VirtualThreadUtil.createExecuteor(); - else + }else if(core > 0) { executorService = Executors.newFixedThreadPool(core); + } if(executorService == null) { return; } + if(size <= 0) size = 100; for (int i = 0; i < size; i++) { executorService.submit(runnable); } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java index eec976c33..a7fc39fd3 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/JavaEncoderTest.java @@ -69,22 +69,22 @@ public void gcTest() { @Test public void testVirtualThreadPool() throws InterruptedException { - testByThreadPool(true,-1,1000,this::test); + testByThreadPool(true,-1,100,this::test); } @Test public void testVirtualThreadGC() throws InterruptedException { - testByThreadPool(true,-1,1000,this::gcTest); + testByThreadPool(true,-1,100,this::gcTest); } @Test public void testFixThreadPool() throws InterruptedException { - testByThreadPool(false,3,1000,this::test); + testByThreadPool(false,3,100,this::test); } @Test public void testFixThreadGC() throws InterruptedException { - testByThreadPool(false,3,1000,this::gcTest); + testByThreadPool(false,3,100,this::gcTest); } } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java index 8f6ff6652..f8154b749 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/Kryo5EncoderTest.java @@ -69,21 +69,21 @@ public void gcTest() { @Test public void testVirtualThreadPool() throws InterruptedException { - testByThreadPool(true,-1,1000,this::test); + testByThreadPool(true,-1,100,this::test); } @Test public void testVirtualThreadGC() throws InterruptedException { - testByThreadPool(true,-1,1000,this::gcTest); + testByThreadPool(true,-1,100,this::gcTest); } @Test public void testFixThreadPool() throws InterruptedException { - testByThreadPool(false,3,1000,this::test); + testByThreadPool(false,3,100,this::test); } @Test public void testFixThreadGC() throws InterruptedException { - testByThreadPool(false,3,1000,this::gcTest); + testByThreadPool(false,3,100,this::gcTest); } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java index 0039ea5c1..3fc3a76d0 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java @@ -73,22 +73,22 @@ public void gcTest() { @Test public void testVirtualThreadPool() throws InterruptedException { - testByThreadPool(true,-1,1000,this::test); + testByThreadPool(true,-1,100,this::test); } @Test public void testVirtualThreadGC() throws InterruptedException { - testByThreadPool(true,-1,1000,this::gcTest); + testByThreadPool(true,-1,100,this::gcTest); } @Test public void testFixThreadPool() throws InterruptedException { - testByThreadPool(false,3,1000,this::test); + testByThreadPool(false,3,100,this::test); } @Test public void testFixThreadGC() throws InterruptedException { - testByThreadPool(false,3,1000,this::gcTest); + testByThreadPool(false,3,100,this::gcTest); } }