-
-
Notifications
You must be signed in to change notification settings - Fork 639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Removed Promise in favor of Future.join(). Made ForkJoinPool.commonPool() default ExecutorService. #2093
Removed Promise in favor of Future.join(). Made ForkJoinPool.commonPool() default ExecutorService. #2093
Conversation
Does the deadlock only occur on the CI server? Can you reproduce it locally? |
Only on the CI server, locally all is fine. I started a new build without junit test forking. |
It did not work. Now I print the enter/exit information during FutureTest: @Rule
public TestRule watcher = new TestWatcher() {
@Override
protected void starting(Description description) {
System.out.println("Starting test: " + description.getMethodName());
}
@Override
protected void finished(Description description) {
System.out.println("Finished test: " + description.getMethodName());
}
}; Let's see, which method causes the deadlock... |
The blocking guy seems to be |
I don't know. It would be good to see what are the differences between CI instance and local one. Maybe CI is virtualized so that it only sees 1 CPU, which might cause differences between the default JVM setup of common concurrency primitives? It would be much easier to debug if we could reproduce the issue locally. Can you find out the Travis instance VCPU count? |
After I commented out @Test
public void shouldFilterFuture() {
final Future<Integer> future = Future.successful(42);
assertThat(future.filter(i -> i == 42).get()).isEqualTo(42);
assertThat(future.filter(i -> i == 43).isEmpty()).isTrue();
} the tests proceeded much further but blocked again at @Test
public void shouldAwaitOnGet() {
final Future<Integer> future = Future.of(() -> {
Try.run(() -> Thread.sleep(250L));
return 1;
});
assertThat(future.get()).isEqualTo(1);
} I think it is the Future.get() method that causes the trouble: @Override
default T get() {
return await().getValue().get().get();
} More specifically the await() method: /**
* Blocks the current Thread until this Future completed or returns immediately if this Future is already completed.
*
* @return this {@code Future} instance
*/
Future<T> await(); Which is implemented in FutureImpl.java: @Override
public Future<T> await() { // 1)
if (!isCompleted()) {
final Object monitor = new Object();
onComplete(ignored -> { // 2)
synchronized (monitor) {
monitor.notify();
}
});
synchronized (monitor) { // 3)
if (!isCompleted()) {
Try.run(monitor::wait);
}
}
}
return this;
} **One possibility is that the I will wrap my head around it after getting some coffee :) |
An execution like the following would explain the failure. But such an execution is not possible because we synchronize wait/notify on
|
I pulled your changes from your branch and changed the following: Index: pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- pom.xml (date 1505604858000)
+++ pom.xml (revision )
@@ -302,6 +302,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven.surefire.version}</version>
<configuration>
+ <argLine>-Djava.util.concurrent.ForkJoinPool.common.parallelism=1</argLine>
<parallel>none</parallel>
<!--
<parallel>all</parallel> With this change I was able to reproduce the Travis deadlock. It should be easier to debug it locally with this. |
@nfekete that's great! Maybe it is related to this: |
I think I met a similar problem with parallel streams a while ago, that's why I volunteered to help with ForkJoinPool issues. I can't recall precisely what happened there, but there was a similar pattern: tasks in an FJP waiting on other tasks in the same pool. |
I've now a better understanding of the 'error'. I'm not sure, if we are able to control the fork/join mechanism within one thread pool. There is ForkJoinPool.ManagedBlocker but we can't hard-code it into FutureImpl because we do not know which ExecutorService is used at runtime. I will sleep over the topic and look into it tomorrow. |
Maybe I find a way to implement |
Status: I reimplement await() now (without the low-level wait/notify, which does not seem to work on a ForkJoinPool): @Override
public Future<T> await() {
// lock is the guard for all future result related fields (i.e. the completion state)
synchronized (lock) {
if (!isCompleted()) {
try {
// job is of type j.u.c.Future, get() blocks - and could also receive a timeout parameter
complete(job.get());
} catch(Throwable x) {
complete(Try.failure(x));
}
}
}
return this;
} However, it is still not working - but should in my opinion :) |
[STARTING] 2017-09-17T12:08:10.944 shouldFilterFuture(io.vavr.concurrent.FutureTest)
Future.success(42)
---
[Promise] Thread[main,5,main] tryComplete(Success(42))
[Future] Thread[main,5,main] tryComplete: value = Success(42)
[Future] Thread[main,5,main] tryComplete: isCompleted() = false
[Future] Thread[main,5,main] tryComplete: complete(Success(42))
[Future] Thread[main,5,main] complete: value = Success(42)
[Future] Thread[main,5,main] complete: setting value
[Future] Thread[main,5,main] complete: executing actions
[Future] Thread[main,5,main] tryComplete: completed
asserting...
---
[Future] Thread[main,5,main] filter: io.vavr.concurrent.FutureTest$$Lambda$6/1058025095@27abe2cd
[Future] Thread[main,5,main] filterTry: io.vavr.concurrent.Future$$Lambda$7/1359044626@29444d75
[Future] Thread[main,5,main] filterTry: creating promise
[Future] Thread[main,5,main] filterTry: onComplete(...)
[Future] Thread[main,5,main] perform: spawning new onComplete thread
[Future] Thread[main,5,main] filterTry: return promise.future()
[Future] Thread[main,5,main] await()
get()
---
[Future] Thread[main,5,main] await: synchronized (lock) {
[Future] Thread[main,5,main] await: job = null
[Future] Thread[main,5,main] await: value = None
[Future] Thread[main,5,main] await: !isCompleted()
#
# Update: this is the root cause: job can't be null if the future is not completed, yet (see below invariant)
#
[Future] Thread[main,5,main] await: job.get()
filtering i == 42
[Promise] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete(Success(42))
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: value = Success(42)
#
# Here is the error. The job (j.u.c.Future(() -> Success(42).filter(i -> i == 42))) is finished before the Promise is completed!?
#
[Future] Thread[main,5,main] await: completed
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: isCompleted() = false
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: complete(Success(42))
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] complete: value = Success(42)
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] complete: setting value
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] complete: executing actions
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: completed
[FINISHED] 2017-09-17T12:08:10.988 shouldFilterFuture(io.vavr.concurrent.FutureTest) Update: The problem with this new implementation is, that Promise.make() creates a Future that has no underlying job (= null). I will find a solution. The invariant has to be future.isCompleted() <=> future.job == null && future.value != null In |
Currently I see the following solution:
static <T> Future<T> complete(CheckedConsumer<CheckedConsumer<Try<? extends T>>> resultConsumer) {
return complete(DEFAULT_EXECUTOR_SERVICE, resultConsumer);
}
static <T> Future<T> complete(ExecutorService executorService, CheckedConsumer<CheckedConsumer<Try<? extends T>>> resultConsumer) {
return null; // TODO
}
static <T> Future<T> complete(CheckedBiConsumer<CheckedConsumer<? extends T>, CheckedConsumer<? extends Throwable>> resultConsumer) {
return complete(DEFAULT_EXECUTOR_SERVICE, resultConsumer);
}
static <T> Future<T> complete(ExecutorService executorService, CheckedBiConsumer<CheckedConsumer<? extends T>, CheckedConsumer<? extends Throwable>> resultConsumer) {
return null; // TODO
}
static void examples() {
// calls Future.complete(CheckedConsumer<CheckedConsumer<Try<T>>)
Future.complete(complete -> complete.accept(Try.of(() -> null)));
// calls Future.complete(CheckedBiConsumer<CheckedConsumer<T>, CheckedConsumer<Throwable>>)
Future.complete((success, failure) -> success.accept(null));
}
@FunctionalInterface
interface CheckedBiConsumer<T, U> {
void accept(T t, U u) throws Throwable;
default CheckedBiConsumer<T, U> andThen(CheckedBiConsumer<? super T, ? super U> after) {
Objects.requireNonNull(after);
return (l, r) -> {
accept(l, r);
after.accept(l, r);
};
}
} (Please note: It is correct here to use Checked(Bi)Consumers in a covariant way rather than a contravariant way, like usual.) Given the suggested await() / await(timout, timeunit), we will solve here the following issues:
Note: We already planned to remove Promise in 1.0.0 and possibly rename Future to Promise because of a name clash with java.util.concurrent.Future. See this blog post. |
This change has a too great impact for a patch release. I will target 1.0 instead of 0.9.1. |
Pushed a test-version with some /*TODO*/ sections. |
I've collected some test data and ran the Tests run: 221, Failures: 0, Errors: 0, Skipped: 2, Time elapsed: 13.722 sec - in io.vavr.concurrent.FutureTest I observe that the 'queued submission' of the common ForkJoinPool goes up at some point if the parallelism is low (1, 2). Then the deadlock occurs. If we increase the parallelism (4, 8), the tests work as expected. |
@danieldietrich empirical evidence FTW |
I could imagine that we need to fine-tune the ForkJoinPool behavior and the thread creation. (See also scala/bug#8955). I could also imagine that the probability of deadlocks increases because we use the blocking |
We need a different blocking strategy. Currently we have // FutureImpl
@Override
public Future<T> await() throws InterruptedException {
if (!isCompleted()) {
final BlockingQueue<Try<T>> queue = new ArrayBlockingQueue<>(1);
onComplete(queue::add);
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override
public boolean block() throws InterruptedException {
if (value.isEmpty()) {
// Possibly blocks until one element is available.
// If the current Thread receives the element,
// this Future is already completed because
// actions are performed after completion.
queue.take();
}
return true;
}
@Override
public boolean isReleasable() {
return value.isDefined();
}
});
}
return this;
} It may be that the use of LockSupport makes more sense (parking/unparking threads). This would release resources. However, the CompletableFuture implementation is more than ugly. Usages of UNSAFE everywhere. However, blocking is discouraged anyway. If a async program makes extensive use of blocking, there's something wrong. Blocking in the presence of low parallelism might be also dangerous. We need to understand the domain better. The only thing we can do is measuring. I tested the ignored FutureTest unit tests with CompletableFuture (see #1530). This means that our I think the whole deadlock problem is all about blocking Futures - more specifically, the wrong blocking strategy. A thread should wake up periodically in order to check the completion state. However, we can't loop without parking threads because it would eat up 100% of one core. Parking/unparking threads and periodically checking the finished state is the strategy of CompletableFuture. Parking is not the problem. The question is: who unparks a thread. I think it is hidden somewhere in the ForkJoin framework, might be specific for ForkJoinWorkerThreads. We should play around with ThreadFactories. This leads to an own ForkJoinPool as DEFAULT_EXECUTOR_SERVICE. We can't use the ForkJoinPool.commonPool() anymore. new ForkJoinPool(
parallelism, // int
factory, // ForkJoinPool.ForkJoinWorkerThreadFactory
handler, // Thread.UncaughtExceptionHandler
asyncMode // boolean
) Especially the asyncMode controls the way, queued tasks are run. The default ( |
There are some minor changes in here. I will split this PR into smaller ones. However, for now I will go on to do some improvements here (in this PR). This PR goes into 1.0.0. But I think that I'm able to provide a fix that will go into 0.9.2, namely switching to ForkJoinPool.commonPool() and re-implementing a more effective await() + await(timeout, timeunit). But now to something completely different. Here is an idea on how to optimize the 'blocking' await() method. In Java there are several Thread states: Image: © by http://uml-diagrams.org, created by Kirill Fakhroutdinov I want to investigate if it is possible to put an existing (the current) thread in an 'inactive' state in the way that other threads are allowed to run instead (if they otherwise had to wait until more 'slots' are available in the given context). I see the opportunity to use LockSupport#park() for that purpose. Here is my implementation idea:
Technical details:
|
The park/unpark does not seem to help regarding our problem of deadlocks when executing the FutureTest test suite using ForkJoinPool parallelism=1. I've implement several await() strategies, namely
None of them solves the deadlock problem described above. I tend to use the ManagedBlocker for the final version because this is the way to go in a ForkJoin context. Update: The ManagedBlocker itself does not block the thread. We will use it and internally park/unpark because it is considered fast. CompletableFuture acts the same way. Observation: It is interesting that each test runs fine if started as a single test. Only the whole test suite seams to deadlock, even if we do not allow parallel tests (as surefire configuration). The next step I take is to use a custom ForkJoinPool that differs in two aspects from ForkJoinPool.commonPool():
|
Oh oh! 😱 Our unit tests contain Futures that block forever. This might be the reason for deadlocks when having parallelism=1 (or=2): @Test
public void shouldGetValueOfUncompletedFuture() {
final Future<?> future = Future.of(Concurrent::waitForever);
assertThat(future.getValue()).isEqualTo(Option.none());
} |
Codecov Report
@@ Coverage Diff @@
## master #2093 +/- ##
========================================
Coverage ? 97.4%
Complexity ? 5196
========================================
Files ? 92
Lines ? 11909
Branches ? 1573
========================================
Hits ? 11600
Misses ? 154
Partials ? 155
Continue to review full report at Codecov.
|
Fixes #2085