Skip to content

Commit

Permalink
Extension: Eager Thread Pool (apache#1568)
Browse files Browse the repository at this point in the history
* Extension: Enhanced Thread Pool
A thread pool that can provide faster processing speeds when there are more tasks (of course it consumes more resources)
* When the number of tasks exceeds the core size, a new thread is first started to execute the task instead of putting it into the queue.
* When the number of tasks is lower than the core size for a long time, the core size threads are maintained and redundant threads are recycled.
* Compared to the fixed pool:When there are more tasks, provide more workers to handle the tasks.
* Compared to the cached pool:The task queue in the cached pool is actually a SynchronousQueue and does not have the ability to cache tasks.
* Whether to fail fail or put into a queue when a thread runs out:Both are feasible and need to consider which way should be applied according to the business scenario. Delayed scenarios are not allowed. Failfast is more reasonable than queues. However, if there is a certain tolerance for delays, queues are more reasonable than failfast.

* remove * in import

* add license to fix ci failure

* rename the thread pool to EagerThreadPool
modify sth with the code review
format the code file

* remove '*' in import statement

* throw NullPointerException if the param is null.

* throw NullPointerException if the param is null.

* catch throwable and decrease submitted task count anyway
  • Loading branch information
carryxyh authored and beiwei30 committed Apr 9, 2018
1 parent b351a0f commit d27b6fa
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.common.threadpool.support.eager;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
* EagerThreadPool
* When the core threads are all in busy,
* create new thread instead of putting task into blocking queue.
*/
public class EagerThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.common.threadpool.support.eager;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

/**
* EagerThreadPoolExecutor
*/
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.common.threadpool.support.eager;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
* TaskQueue in the EagerThreadPoolExecutor
* It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
* or the currentPoolThreadSize more than executor's maximumPoolSize.
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

private static final long serialVersionUID = -2635853580887179627L;

private EagerThreadPoolExecutor executor;

public TaskQueue(int capacity) {
super(capacity);
}

public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}

@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}

int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}

// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}

// currentPoolThreadSize >= max
return super.offer(runnable);
}

/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool
eager=com.alibaba.dubbo.common.threadpool.support.eager.EagerThreadPool
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.alibaba.dubbo.common.threadpool.support.eager;


import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class EagerThreadPoolExecutorTest {

private static final URL URL = new URL("dubbo", "localhost", 8080);

/**
* It print like this:
* thread number in current pool:1, task number in task queue:0 executor size: 1
* thread number in current pool:2, task number in task queue:0 executor size: 2
* thread number in current pool:3, task number in task queue:0 executor size: 3
* thread number in current pool:4, task number in task queue:0 executor size: 4
* thread number in current pool:5, task number in task queue:0 executor size: 5
* thread number in current pool:6, task number in task queue:0 executor size: 6
* thread number in current pool:7, task number in task queue:0 executor size: 7
* thread number in current pool:8, task number in task queue:0 executor size: 8
* thread number in current pool:9, task number in task queue:0 executor size: 9
* thread number in current pool:10, task number in task queue:0 executor size: 10
* thread number in current pool:10, task number in task queue:4 executor size: 10
* thread number in current pool:10, task number in task queue:3 executor size: 10
* thread number in current pool:10, task number in task queue:2 executor size: 10
* thread number in current pool:10, task number in task queue:1 executor size: 10
* thread number in current pool:10, task number in task queue:0 executor size: 10
* <p>
* We can see , when the core threads are in busy,
* the thread pool create thread (but thread nums always less than max) instead of put task into queue.
*/
@Test
public void testEagerThreadPool() throws Exception {
String name = "eager-tf";
int queues = 5;
int cores = 5;
int threads = 10;
// alive 1 second
long alive = 1000;

//init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues);
final EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedThreadFactory(name, true),
new AbortPolicyWithReport(name, URL));
taskQueue.setExecutor(executor);

for (int i = 0; i < 15; i++) {
Thread.sleep(50);
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("thread number in current pool:"
+ executor.getPoolSize()
+ ", task number in task queue:"
+ executor.getQueue().size()
+ " executor size: "
+ executor.getPoolSize());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(5000);
// cores theads are all alive.
Assert.assertTrue("more than cores threads alive!", executor.getPoolSize() == cores);
}

@Test
public void testSPI() {
ExecutorService executorService = (ExecutorService) ExtensionLoader
.getExtensionLoader(ThreadPool.class)
.getExtension("eager")
.getExecutor(URL);
Assert.assertTrue("test spi fail!",
executorService.getClass().getSimpleName().equals("EagerThreadPoolExecutor"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void test_loadProperties_oneFile_notRootPath() throws Exception {
expected.put("fixed", "com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool");
expected.put("cached", "com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool");
expected.put("limited", "com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool");
expected.put("eager", "com.alibaba.dubbo.common.threadpool.support.eager.EagerThreadPool");

Assert.assertEquals(expected, p);
}
Expand Down

0 comments on commit d27b6fa

Please sign in to comment.