Skip to content

Commit

Permalink
Use InternalThreadLocal in consumer side (#1825)
Browse files Browse the repository at this point in the history
* SerializerFactory 获取Serializer时,锁住整个hashmap,导致整个过程被block

* 单元测试。保证一个class只有一个serializer和deserializer。单线程和多线程测试

* 增加线程数 50 模拟多个线程来获取serializer和deserializer

* 当cores线程数全都使用的情况下,默认线程池会把任务放入到队列中。队列满则再创建线程(总数不会超过Max线程数)
增强线程池:在请求量阶段性出现高峰时使用
特性:cores线程全部使用的情况下,优先创建线程(总数不会超过max),当max个线程全都在忙的情况下,才将任务放入队列。请求量下降时,线程池会自动维持cores个线程,多余的线程退出。

* 当cores线程数全都使用的情况下,默认线程池会把任务放入到队列中。队列满则再创建线程(总数不会超过Max线程数)
增强线程池:在请求量阶段性出现高峰时使用
特性:cores线程全部使用的情况下,优先创建线程(总数不会超过max),当max个线程全都在忙的情况下,才将任务放入队列。请求量下降时,线程池会自动维持cores个线程,多余的线程退出。

* 补全单元测试,测试扩展是否生效

* 错误命名

* 增加@OverRide注解
long 初始化赋值时,小写l改为大写L防止误读

* 修复单元测试

* remove enhanced

* remove enhanced

* Change ThreadFactory for consumer side which is to use InternalThreadLocal in RpcContext.
  • Loading branch information
carryxyh authored and beiwei30 committed May 25, 2018
1 parent 47adfd0 commit 0bf6910
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.cluster.Directory;
import com.alibaba.dubbo.rpc.cluster.LoadBalance;

Expand All @@ -50,7 +51,13 @@ public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

private static final long RETRY_FAILED_PERIOD = 5 * 1000;

private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));
/**
* Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
* which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
*/
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
new NamedInternalThreadFactory("failback-cluster-timer", true));

private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private volatile ScheduledFuture<?> retryFuture;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.alibaba.dubbo.rpc.cluster.support;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
Expand All @@ -43,7 +43,12 @@
*/
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));
/**
* Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
* which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
*/
private final ExecutorService executor = Executors.newCachedThreadPool(
new NamedInternalThreadFactory("forking-cluster-timer", true));

public ForkingClusterInvoker(Directory<T> directory) {
super(directory);
Expand Down

0 comments on commit 0bf6910

Please sign in to comment.