From c9f9f2ccba84b4c64e2120d6493a120b2b90dc7c Mon Sep 17 00:00:00 2001 From: zt9788 Date: Sun, 15 Oct 2023 13:53:41 +0800 Subject: [PATCH] ScheduledExecutorService use virtual thread --- .../jetcache/support/JetCacheExecutor.java | 36 +++----- .../VirtualScheduledThreadPoolExecutor.java | 45 +++++++++ .../jetcache/support/VirtualThreadUtil.java | 92 +++++++++++++++++++ .../com/alicp/jetcache/VirtualThreadUtil.java | 33 ------- .../anno/support/CacheContextTest.java | 7 -- .../embedded/LinkedHashMapCacheTest.java | 2 +- .../jetcache/support/AbstractEncoderTest.java | 1 - .../jetcache/support/KryoEncoderTest.java | 1 - 8 files changed, 153 insertions(+), 64 deletions(-) create mode 100644 jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualScheduledThreadPoolExecutor.java create mode 100644 jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualThreadUtil.java delete mode 100644 jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java index 324807be..d8c76aab 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java @@ -1,10 +1,6 @@ package com.alicp.jetcache.support; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; /** @@ -17,8 +13,6 @@ public class JetCacheExecutor { protected volatile static ScheduledExecutorService heavyIOExecutor; private static final ReentrantLock reentrantLock = new ReentrantLock(); - private static AtomicInteger threadCount = new AtomicInteger(0); - static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override @@ -40,13 +34,13 @@ public static ScheduledExecutorService defaultExecutor() { reentrantLock.lock(); try{ if (defaultExecutor == null) { - ThreadFactory tf = r -> { - Thread t = new Thread(r, "JetCacheDefaultExecutor"); - t.setDaemon(true); - return t; - }; - defaultExecutor = new ScheduledThreadPoolExecutor( - 1, tf, new ThreadPoolExecutor.DiscardPolicy()); + if(VirtualThreadUtil.isVirtualThreadSupported()) { + defaultExecutor = new VirtualScheduledThreadPoolExecutor("JetCacheDefaultExecutor"); + }else { + defaultExecutor = new ScheduledThreadPoolExecutor( + 1, VirtualThreadUtil.createThreadFactory(false, "JetCacheDefaultExecutor-") + , new ThreadPoolExecutor.DiscardPolicy()); + } } }finally { reentrantLock.unlock(); @@ -61,13 +55,13 @@ public static ScheduledExecutorService heavyIOExecutor() { reentrantLock.lock(); try { if (heavyIOExecutor == null) { - ThreadFactory tf = r -> { - Thread t = new Thread(r, "JetCacheHeavyIOExecutor" + threadCount.getAndIncrement()); - t.setDaemon(true); - return t; - }; - heavyIOExecutor = new ScheduledThreadPoolExecutor( - 10, tf, new ThreadPoolExecutor.DiscardPolicy()); + if(VirtualThreadUtil.isVirtualThreadSupported()) { + heavyIOExecutor = new VirtualScheduledThreadPoolExecutor("JetCacheHeavyIOExecutor"); + }else { + heavyIOExecutor = new ScheduledThreadPoolExecutor( + 10, VirtualThreadUtil.createThreadFactory(false, "JetCacheHeavyIOExecutor-") + , new ThreadPoolExecutor.DiscardPolicy()); + } } }finally { reentrantLock.unlock(); diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualScheduledThreadPoolExecutor.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualScheduledThreadPoolExecutor.java new file mode 100644 index 00000000..415b8584 --- /dev/null +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualScheduledThreadPoolExecutor.java @@ -0,0 +1,45 @@ +package com.alicp.jetcache.support; + +import java.util.concurrent.*; + +/** + * @Description + * @author: zhangtong + * @create: 2023/10/15 12:51 PM + */ +public class VirtualScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + + final ExecutorService executorService; + public VirtualScheduledThreadPoolExecutor(String factoyName) { + super(1); + executorService = VirtualThreadUtil.createExecuteor(factoyName); + } + + @Override + public void execute(Runnable runnable){ + super.execute(() -> { + executorService.execute(runnable); + }); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, java.util.concurrent.TimeUnit unit) { + return super.scheduleAtFixedRate(() -> { + executorService.execute(command); + }, initialDelay, period, unit); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(() -> { + executorService.execute(command); + }, delay, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return super.scheduleWithFixedDelay(() -> { + executorService.execute(command); + }, initialDelay, delay, unit); + } +} diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualThreadUtil.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualThreadUtil.java new file mode 100644 index 00000000..8accba0b --- /dev/null +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/VirtualThreadUtil.java @@ -0,0 +1,92 @@ +package com.alicp.jetcache.support; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cglib.core.ReflectUtils; + +import java.lang.reflect.Method; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @Description + * @author: zhangtong + * @create: 2023/10/5 11:44 AM + */ +public class VirtualThreadUtil { + + private static Logger logger = LoggerFactory.getLogger(VirtualThreadUtil.class); + + private static final Boolean isVirtualThreadSupported; + + static { + isVirtualThreadSupported = checkThreadSupported(); + } + public static ExecutorService createExecuteor(){ + ExecutorService executorService = null; + try { + Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newVirtualThreadPerTaskExecutor", new Class[]{}); + if (method != null) { + logger.info("Test Thread start with newVirtualThreadPerTaskExecutor(Virtual)"); + executorService = (ExecutorService) method.invoke(null); + } + }catch (Exception e){ + logger.warn("JDK version may < 19, this will be skip..."); + return null; + } + return executorService; + } + + public static ExecutorService createExecuteor(String factryName){ + ExecutorService executorService = null; + try { + Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newThreadPerTaskExecutor", new Class[]{ThreadFactory.class}); + if (method != null) { + logger.info("Test Thread start with newVirtualThreadPerTaskExecutor(Virtual)"); + executorService = (ExecutorService) method.invoke(null,createThreadFactory(true,factryName)); + } + }catch (Exception e){ + logger.warn("JDK version may < 19, this will be skip..."); + return null; + } + return executorService; + } + public static boolean isVirtualThreadSupported() { + return isVirtualThreadSupported; + } + + private static boolean checkThreadSupported() { + try { + Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newVirtualThreadPerTaskExecutor", new Class[]{}); + return method != null; + }catch (Exception e){ + return false; + } + } + + private static AtomicInteger threadCount = new AtomicInteger(0); + public static ThreadFactory createThreadFactory(boolean isVirtual ,String factoryName){ + if(isVirtual && isVirtualThreadSupported()) { + try { + Method threadMehod = ReflectUtils.findDeclaredMethod(Thread.class, "ofVirtual", new Class[0]); + Object threadObject = threadMehod.invoke(null); + Class buiderClass = Class.forName("java.lang.Thread$Builder"); + Method setNameMehod = ReflectUtils.findDeclaredMethod(buiderClass, "name", new Class[]{String.class, Long.TYPE}); + threadObject = setNameMehod.invoke(threadObject, factoryName, 0L); + Method factoryMehod = ReflectUtils.findDeclaredMethod(buiderClass, "factory", new Class[0]); + Object factory = factoryMehod.invoke(threadObject); + return (ThreadFactory) factory; + } catch (Exception e) { + logger.warn("JDK version may < 19, this will be skip..."); + } + } + return r -> { + Thread t = new Thread(r, factoryName + threadCount.getAndIncrement()); + t.setDaemon(true); + return t; + }; + } +} diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java b/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java deleted file mode 100644 index 27380d26..00000000 --- a/jetcache-test/src/test/java/com/alicp/jetcache/VirtualThreadUtil.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.alicp.jetcache; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.cglib.core.ReflectUtils; - -import java.lang.reflect.Method; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * @Description - * @author: zhangtong - * @create: 2023/10/5 11:44 AM - */ -public class VirtualThreadUtil { - - private static Logger logger = LoggerFactory.getLogger(VirtualThreadUtil.class); - public static ExecutorService createExecuteor(){ - ExecutorService executorService = null; - try { - Method method = ReflectUtils.findDeclaredMethod(java.util.concurrent.Executors.class, "newVirtualThreadPerTaskExecutor", new Class[]{}); - if (method != null) { - logger.info("Test Thread start with newVirtualThreadPerTaskExecutor(Virtual)"); - executorService = (ExecutorService) method.invoke(null); - } - }catch (Exception e){ - logger.warn("JDK version may < 19, The Test will be skip..."); - return null; - } - return executorService; - } -} diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java index 9d028260..659009a1 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/anno/support/CacheContextTest.java @@ -3,15 +3,8 @@ */ package com.alicp.jetcache.anno.support; -import com.alicp.jetcache.VirtualThreadUtil; import org.junit.Assert; import org.junit.Test; -import org.springframework.cglib.core.ReflectUtils; - -import java.lang.reflect.Method; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; /** * @author huangli diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java index a43ba63b..b6bc5bdf 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/embedded/LinkedHashMapCacheTest.java @@ -6,7 +6,7 @@ import com.alicp.jetcache.Cache; import com.alicp.jetcache.CacheConfig; import com.alicp.jetcache.CacheResultCode; -import com.alicp.jetcache.VirtualThreadUtil; +import com.alicp.jetcache.support.VirtualThreadUtil; import org.junit.Assert; import org.junit.Test; import org.springframework.cglib.core.ReflectUtils; diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java index c9d0813d..a8e4daf2 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/AbstractEncoderTest.java @@ -1,7 +1,6 @@ package com.alicp.jetcache.support; import com.alicp.jetcache.CacheValueHolder; -import com.alicp.jetcache.VirtualThreadUtil; import java.util.ArrayList; import java.util.Arrays; diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java index e9c824e4..3e17b55d 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/KryoEncoderTest.java @@ -1,6 +1,5 @@ package com.alicp.jetcache.support; -import com.alicp.jetcache.VirtualThreadUtil; import com.alicp.jetcache.anno.SerialPolicy; import org.junit.jupiter.api.Test;