diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java index b053bd9337a..90419c4d900 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java @@ -22,6 +22,7 @@ import org.apache.dubbo.common.threadpool.ThreadPool; import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; @@ -53,10 +54,18 @@ public Executor getExecutor(URL url) { int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); - return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, - queues == 0 ? new SynchronousQueue() : - (queues < 0 ? new MemorySafeLinkedBlockingQueue() - : new LinkedBlockingQueue(queues)), + + BlockingQueue blockingQueue; + + if (queues == 0) { + blockingQueue = new SynchronousQueue<>(); + } else if (queues < 0) { + blockingQueue = new MemorySafeLinkedBlockingQueue<>(); + } else { + blockingQueue = new LinkedBlockingQueue<>(queues); + } + + return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, blockingQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java index f0203e6676b..de70f3c5121 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java @@ -23,6 +23,7 @@ import org.apache.dubbo.common.threadpool.ThreadPool; import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; @@ -50,10 +51,18 @@ public Executor getExecutor(URL url) { int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); - return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, - queues == 0 ? new SynchronousQueue() : - (queues < 0 ? new MemorySafeLinkedBlockingQueue() - : new LinkedBlockingQueue(queues)), + + BlockingQueue blockingQueue; + + if (queues == 0) { + blockingQueue = new SynchronousQueue<>(); + } else if (queues < 0) { + blockingQueue = new MemorySafeLinkedBlockingQueue<>(); + } else { + blockingQueue = new LinkedBlockingQueue<>(queues); + } + + return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, blockingQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index a2cba73bc45..71392adf822 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -40,14 +40,13 @@ import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER; +import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME; /** * AbstractClient */ public abstract class AbstractClient extends AbstractEndpoint implements Client { - protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler"; - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class); private final Lock connectLock = new ReentrantLock(); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java index e585bff61b2..d60807f836e 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java @@ -20,7 +20,6 @@ import org.apache.dubbo.common.Version; import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.ExecutorUtil; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; @@ -29,6 +28,7 @@ import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient; import org.apache.dubbo.remoting.transport.netty4.ssl.SslClientTlsHandler; import org.apache.dubbo.remoting.transport.netty4.ssl.SslContexts; +import org.apache.dubbo.remoting.utils.UrlUtils; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; @@ -45,15 +45,12 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; -import org.apache.dubbo.remoting.utils.UrlUtils; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL; -import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT; @@ -82,10 +79,7 @@ public NettyConnectionClient(URL url, ChannelHandler handler) throws RemotingExc @Override protected void initConnectionClient() { - URL url = ExecutorUtil.setThreadName(getUrl(), "DubboClientHandler"); - url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); - setUrl(url); - this.protocol = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol()); + this.protocol = getUrl().getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getExtension(getUrl().getProtocol()); this.remote = getConnectAddress(); this.connectingPromise = new AtomicReference<>(); this.connectionListener = new ConnectionListener(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java index 4ec09e4a565..8f9925da797 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java @@ -44,6 +44,10 @@ import java.util.Set; import java.util.concurrent.ExecutorService; +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL; +import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY; +import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME; import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME; import static org.apache.dubbo.rpc.Constants.H2_IGNORE_1_0_0_KEY; import static org.apache.dubbo.rpc.Constants.H2_RESOLVE_FALLBACK_TO_DEFAULT_KEY; @@ -154,8 +158,7 @@ public void afterUnExport() { @Override public Invoker refer(Class type, URL url) throws RpcException { optimizeSerialization(url); - ExecutorService streamExecutor = getOrCreateStreamExecutor( - url.getOrDefaultApplicationModel(), url); + ExecutorService streamExecutor = getOrCreateStreamExecutor(url.getOrDefaultApplicationModel(), url); AbstractConnectionClient connectionClient = PortUnificationExchanger.connect(url, new DefaultPuHandler()); TripleInvoker invoker = new TripleInvoker<>(type, url, acceptEncodings, connectionClient, invokers, streamExecutor); @@ -164,9 +167,10 @@ public Invoker refer(Class type, URL url) throws RpcException { } private ExecutorService getOrCreateStreamExecutor(ApplicationModel applicationModel, URL url) { - ExecutorService executor = ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)); - Objects.requireNonNull(executor, - String.format("No available executor found in %s", url)); + url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME) + .addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); + ExecutorService executor = ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(url); + Objects.requireNonNull(executor, String.format("No available executor found in %s", url)); return executor; }