Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future::await hangs indefinitely if the CheckedSupplier throws an InterruptedException #1963

Closed
Ramblurr opened this issue Apr 25, 2017 · 4 comments

Comments

@Ramblurr
Copy link

Future::await will hang indefinitely if the CheckedSupplier throws an InterruptedException

This test hangs indefinitely

 @Test
    public void testFutureInterruptedException()
    {

        Future<String> future = Future.of(() ->
        {
            if (true)
                throw new InterruptedException("interrupted");
            return "hello";
        });

        // this never returns, the test hangs here
        future.await();

        assertThat(future.isFailure(), is(true));
    }

Working example with another exception type, works as expected

    @Test
    public void testFutureOtherException()
    {

        Future<String> future = Future.of(() ->
        {
            if (true)
                throw new IOException("interrupted");
            return "hello";
        });

        // this does return, works as expected
        future.await();

        assertThat(future.isFailure(), is(true));
    }

A CheckedSupplier that uses Thread.sleep can throw an InterruptedException causing the awaiting thread to hang. Also it will obviously occur if something else throws that exception.

@danieldietrich
Copy link
Contributor

Scala act's similar, i.e. not completing the Future in the case of a Fatal exception:

scala> import scala.concurrent._
import scala.concurrent._

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> Future(throw new InterruptedException())
java.lang.InterruptedException
	at $line7.$read$$iw$$iw$$iw$$iw$.$anonfun$res2$1(<console>:16)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:653)
	at scala.util.Success.$anonfun$map$1(Try.scala:251)
	at scala.util.Success.map(Try.scala:209)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:287)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
	at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
res2: scala.concurrent.Future[Nothing] = Future(<not completed>)

Scala's solution:

References:

@danieldietrich
Copy link
Contributor

danieldietrich commented Apr 25, 2017

@Ramblurr this is no bug, it is correct behavior. Scala acts the same (see links above).

This is the critical code section:

    @Override
    public Future<T> await() {
        final Object monitor = new Object();
        // will not be executed if the Future does not complete (e.g. if a fatal exception occurs)
        onComplete(ignored -> {
            synchronized (monitor) {
                monitor.notify();
            }
        });
        synchronized (monitor) {
            if (!isCompleted()) {
                Try.run(monitor::wait);
            }
        }
        return this;
    }

On when the Future completes, the blocking monitor is notified and the Thread awakes.

The result of a Future computation is a Try - namely a Success or a Failure. The Failure does only encapsulate causes that are so-called non-fatal. An InterruptedException is Fatal and re-thrown by the Try.

The fatal exception takes place silently in the ExecutorService Thread. Such a Thread can register an UncaughtExceptionHandler in order to react to uncaught exceptions. However, we currently do not support uncaught exception handlers.

Nevertheless, a handler would not solve the problem of the blocking await() call. The real problem is, that await() waits infinitely long for the Future to complete. Under the presence of a fatal exception the Future will never complete. As a result, await() will never terminate.

Scala act's the same:

// = Future(<not completed>)
Future(Await.ready(Future { throw new InterruptedException() }, Duration.Inf))

I.e. the Await.ready() will never complete when we throw an InterruptedException in Scala (which is also fatal there). If we throw an Exception instead, we get a Failure, as expected.

Please note that we can't just abort await() in the case of a fatal exception. The state of the Future would be not completed. But the semantics of await() is to wait until the Future is completed.

Also Failure can't wrap a fatal exception. The meaning of fatal is, that it can't be handled, so it is re-thrown.


This issue is 'expected' behavior, it can't be 'solved'.

I see these follow-up tasks regarding this issue:

  1. We should provide a mechanism to handle uncaught exceptions. This will give insight into the execution of our program and the ability to handle these exceptions and perform cleanup operations.
  2. We should add a Future.await(timeout) (related to Consider adding javaslang.concurrent.Future#get(timeout) #1871)
  3. We should remove/deprecate Future.await() (i.e. without a timeout param) because it might lead to deadlocks.

Please note that await(timeout) needs to cancel a running thread (by interrupting it). Such an interruption should not leak out of the Future, i.e. it should not be re-thrown as fatal exception.

@danieldietrich danieldietrich added this to the vavr-1.0.0 milestone Apr 25, 2017
@danieldietrich
Copy link
Contributor

The test will look like this:

@Test
public void shouldHandleInterruptedExceptionCorrectlyInAwait() {
    final Future<String> future = Future.of(() -> { throw new InterruptedException(); });
    future.await(Duration.ofMillis(100));
    assertThat(future)isEqualTo(Future.failed(new java.util.TimeoutException("Future timed out after 100 milliseconds")));
}

@Ramblurr
Copy link
Author

A mandatory timeout on await() sounds good to me. Thanks for looking into this.. looking forward to the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants