-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add virtual threads support #224
Conversation
This reverts commit b03e678478cf4600add4f941519a308200bb7e64.
@kawamuray Should I start review while the PR is still in draft state? |
@ocadaruma yes please, from rough design discussions perhaps. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left only nits comment.
I didn't review test-code and benchmark's code in detail but overall looks good!
Can't wait for releasing this feature!
One concern is CI on jdk 8/11/17.
IMO we still should run tests on these versions because we have many users still run these.
After this PR got merged, I plan to submit a follow-up PR to bring back 8/11/17 test env
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
|
||
public class ConcurrentHashSet<E> implements Set<E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not using ConcurrentHashMap.newKeySet
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! I didn't know that API existed, good to know, will fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from comments, AveragingRateLimiter
still uses synchronized
. We should get rid of it too.
Thread.sleep(rand.nextInt(10)); | ||
})) | ||
.propertySupplier(StaticPropertySupplier.of( | ||
Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 16) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In VThreadCoreFunctionalityTest, no point to specify partition concurrency right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, will remove that.
.publishPercentiles(0.5, 0.9, 0.99, 0.999) | ||
.register(registry)); | ||
|
||
public class PerPartitionMetrics extends AbstractMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for merging TaskMetrcis and ProcessMetrics but PerPartitionMetrics sounds confusing because it sounds like the class containing all partitionScope
metrics despite it's not.
Since all metrics inside this class is about tasks
, how about TaskMetrics
?
protected final PerPartitionMetrics perPartitionMetrics; | ||
protected final SchedulerMetrics schedulerMetrics; | ||
|
||
public AbstractSubPartitions(PartitionScope scope, Processors<?> processors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nits] Let's use protected
since this is an abstract class
try { | ||
return unit.asyncClose() | ||
.thenApply(ignored -> null) // To migrate type from Void to Object | ||
.completeOnTimeout(TIMEOUT_INDICATOR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
completeOnTimeout
is introduced in Java9 so shouldn't be used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ogh, good catch. What to do to make this part doesn't look messed up then ...
@@ -68,36 +65,33 @@ private void processTask(TaskRequest request) { | |||
return; | |||
} | |||
|
|||
Timer timer = Utils.timer(); | |||
CompletionStage<Void> processCompletion = CompletableFuture.completedFuture(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might not be a problem but this creates unnecessary CompletableFuture.completedFuture
instance for every task even when a task doesn't end up with exception.
Let's supply completedFuture
inside catch block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
private Processors<?> processors; | ||
|
||
@Test | ||
public void testCleanupPartiallyInitializedUnits() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this test is deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we no longer initialize processor units and instead do it lazily: https://github.com/line/decaton/pull/224/files#diff-c68a7fb9de5afb7968afda54f084633b7d5a38ec068e84fcad000d582ec5eaafR77
Thank you for the explanation. Fair enough. Sounds good to keep synchronized then |
Applied all feedback. PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left only minor comment
} | ||
ScheduledFuture<?> cancelFut = scheduledExecutor.schedule(() -> { | ||
if (!cf.isDone()) { | ||
cf.complete(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this completion handler may include destroyThreadProcessor
call which is potentially costly, shouldn't we have more pool size than 1
? (like available processor num ?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, I think the first argument is "corePoolSize", so my understanding of the behavior is, when there's a running scheduled task at the time of next scheduled execution occurs, the executor creates a new thread for processing it (and it's count is unbound) so a scheduled task execution never blocks the others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, I think your concern is correct. fixing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Magnificent!
This PR aims to introduce Virtual Thread support, which we expect to have very high affinity with a typical workload implemented on Decaton (I/O heavy).
We are providing
DeferredCompletion
(async processing) to supportDecatonProcessor
implementation leveraging asynchronous paradigm, but it has been causing completion leak and then consumption stuck, where the virtual thread could be a perfect solution to eliminate the whole problem.As this involves a lot changes in existing code base, here are the (likely) complete list of changes made out other than the newly added virtual thread support:
synchronized
=>ReentrantLock
- vthread and synchronized does not work well (pinning)TaskMetrics
andProcessMetrics
asPerPartitionMetrics
for better organizationAsyncShutdownable
interface and addedAsyncClosable
for refactoringPartitionProcessor
is now calledSubPartitions
which is an interface with possible many implementations. Not intending to let users implement this interface though.--latency-count
option for benchmark. This is for simulating multiple IO during processing a task to count context switching costs in.-server
option and-Xcomp
option to minimize impact from JIT compilationAbout the performance, I ran several benchmark using the
benchmark
module.The benchmark simulates workload with 5 I/Os (5 context switches forced at least), with varying I/O latency (off-cpu).
First, to compare the maximum performance producable from current THREAD_POOL runtime, I tried to find the optimal count to set for
decaton.partition.concurrency
.Command:
Result:
Found out
decaton.partition.concurrency=300
is the setting of peak performance.Then I ran the same workload with increasing I/O latency against the THREAD_POOL mode and VIRTUAL_THREAD mode.
Command:
Result:
As the above charts shows,
THREAD_POOL
runtime decreases performance as the total I/O latency increases, but with theVIRTUAL_THREAD
runtime throughput remains fairly stable and mitigates impact from I/O latency.