-
Notifications
You must be signed in to change notification settings - Fork 26.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extension: Eager Thread Pool #1568
Conversation
A thread pool that can provide faster processing speeds when there are more tasks (of course it consumes more resources) * When the number of tasks exceeds the core size, a new thread is first started to execute the task instead of putting it into the queue. * When the number of tasks is lower than the core size for a long time, the core size threads are maintained and redundant threads are recycled. * Compared to the fixed pool:When there are more tasks, provide more workers to handle the tasks. * Compared to the cached pool:The task queue in the cached pool is actually a SynchronousQueue and does not have the ability to cache tasks. * Whether to fail fail or put into a queue when a thread runs out:Both are feasible and need to consider which way should be applied according to the business scenario. Delayed scenarios are not allowed. Failfast is more reasonable than queues. However, if there is a certain tolerance for delays, queues are more reasonable than failfast.
@beiwei30 |
Please add a license at the beginning of your codes to fix ci failed. |
@zonghaishang thx! |
Codecov Report
@@ Coverage Diff @@
## master #1568 +/- ##
==========================================
+ Coverage 32.17% 32.25% +0.08%
==========================================
Files 682 685 +3
Lines 32972 33019 +47
Branches 6597 6603 +6
==========================================
+ Hits 10608 10650 +42
+ Misses 20468 20464 -4
- Partials 1896 1905 +9
Continue to review full report at Codecov.
|
@beiwei30 @zonghaishang |
Great ! |
throw new RejectedExecutionException("enhanced queue does not have executor !"); | ||
} | ||
int currentPoolThreadSize = executor.getPoolSize(); | ||
//have free worker. put task into queue to let the worker deal with task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leave space after //
*/ | ||
public boolean retryOffer(Runnable o) { | ||
if (executor.isShutdown()) { | ||
throw new RejectedExecutionException("Executor is shutdown !"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change "Executor is shutdown !" to "Executor is shutdown!", there's one extra space.
import java.util.concurrent.RejectedExecutionException; | ||
|
||
/** | ||
* enhanced task queue in the enhanced thread pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
describe the behavior here
|
||
//init queue and enhanced executor | ||
EnhancedTaskQueue<Runnable> enhancedTaskQueue = new EnhancedTaskQueue<Runnable>(queues <= 0 ? 1 : queues); | ||
EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, enhancedTaskQueue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep line margin 120 chars.
|
||
/** | ||
* enhanced thread pool. | ||
* When the core threads are all in busy , create new thread instead of putting task into blocking queue . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra space from 'blocking queue .'
//task count | ||
private final AtomicInteger submittedTaskCount = new AtomicInteger(0); | ||
|
||
public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, EnhancedTaskQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep line margin 120 chars.
|
||
@Override | ||
public void execute(Runnable command) { | ||
//do not increment in method beforeExecute! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add space after //
try { | ||
super.execute(command); | ||
} catch (RejectedExecutionException rx) { | ||
//retry to offer the task into queue . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add space after //
submittedTaskCount.incrementAndGet(); | ||
try { | ||
super.execute(command); | ||
} catch (RejectedExecutionException rx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if exception other than RejectedExecutionException
is thrown? then submittedTaskCount may have no change to decrease.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can not be a problem because the submittedTaskCount
will be decreased in the 'afterExecute' method.
But the RejectedExecutionException is thrown before the task run. So when the RejectedExecutionException is thrown , we should decrease the submittedTaskCount.
* limitations under the License. | ||
*/ | ||
|
||
package com.alibaba.dubbo.common.threadpool.support.enhanced; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest change 'enhanced' into other word, more accurate to describe the behavior, for example: eager or greedy? once decided, pls. change package name, class name, and comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about EagerThreadPoolExecutor means the executor is eager to more tasks.
modify sth with the code review format the code file
* Extension: Enhanced Thread Pool A thread pool that can provide faster processing speeds when there are more tasks (of course it consumes more resources) * When the number of tasks exceeds the core size, a new thread is first started to execute the task instead of putting it into the queue. * When the number of tasks is lower than the core size for a long time, the core size threads are maintained and redundant threads are recycled. * Compared to the fixed pool:When there are more tasks, provide more workers to handle the tasks. * Compared to the cached pool:The task queue in the cached pool is actually a SynchronousQueue and does not have the ability to cache tasks. * Whether to fail fail or put into a queue when a thread runs out:Both are feasible and need to consider which way should be applied according to the business scenario. Delayed scenarios are not allowed. Failfast is more reasonable than queues. However, if there is a certain tolerance for delays, queues are more reasonable than failfast. * remove * in import * add license to fix ci failure * rename the thread pool to EagerThreadPool modify sth with the code review format the code file * remove '*' in import statement * throw NullPointerException if the param is null. * throw NullPointerException if the param is null. * catch throwable and decrease submitted task count anyway
A thread pool that can provide faster processing speeds when there are more tasks (of course it consumes more resources)