Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute Future operations on the same Thread by default #1530

Closed
danieldietrich opened this issue Aug 24, 2016 · 5 comments
Closed

Execute Future operations on the same Thread by default #1530

danieldietrich opened this issue Aug 24, 2016 · 5 comments

Comments

@danieldietrich
Copy link
Contributor

danieldietrich commented Aug 24, 2016

UPDATE: We can't do execute all action on one thread. That would be inefficient. In general, the actions are executed in parallel.

However, the issues mentioned below (currently @ignored in our unit tests) should be solved. Therefore I will leave this ticket open.


OBSOLETE:

It is interesting that Scala's Future does not reuse threads. Spawning new threads is a waste of resources - more context switches and parallel threads than necessary. Some are even completed and wait to be returned to the pool. This makes it intransparent for the call-site/user how resources are managed. The risk of a deadlock increases when having bounded pools.
I see only bebefits in reusing threads that already idle because the Future already completed on subsequent operations.

It is common to add an onComplete handler or an onSuccess and an onFailure handler. We rarely see adding many complete handlers. This means that only one handler is executed when the underlying Future is complete and does idle. In other words the theead can be reused.

If another behavior is needed, an ExecutorService should be explicitly specified when calling an operation or registering a handler. We will provide additional methods for that.

This change of behavior is expected to be binary backward compatible.

@danieldietrich danieldietrich added this to the 2.1.0 milestone Aug 24, 2016
@danieldietrich danieldietrich self-assigned this Aug 24, 2016
@danieldietrich
Copy link
Contributor Author

danieldietrich commented Aug 25, 2016

The Future core (FutureImpl) is small and well organized. By changing a single line we managed to execute all actions on the same Future Thread:

     private void perform(Consumer<? super Try<T>> action) {
-        Try.run(() -> executorService.execute(() -> action.accept(value.get())));
+        Try.run(() -> value.forEach(action));
     }

But this is only for testing purposes and not the solution for the problem. We still need to execute the actions by an ExecutorService. It has to be a new single thread ExecutorService (singleton) by default with the option to exchange that:

final Future<T> future = Future.of(someExecutor, () -> computation());

 // these use the same thread of future to process result
future.onComplete(Handler::processResult);
future.map(mapper);
...

// these use a new thread provided by fooExecutor to process result
future.onComplete(fooExecutor, Handler::processResult);
future.map(fooExecutor, mapper);
...

@danieldietrich
Copy link
Contributor Author

TODO: Re-enable @Ignored unit tests in FutureTest.java

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Aug 26, 2016

This issue should solve the following problem (tests by @mattjtodd):

        BlockingQueue<Runnable> queue = new SynchronousQueue<>();
        ExecutorService service = new ThreadPoolExecutor(1, 1 , 0L, TimeUnit.MILLISECONDS, queue);

        Future
            .of(service, () -> { Thread.sleep(100); return true; })
            .onComplete(System.out::println);

        Thread.sleep(2000);
        service.shutdown();

Which results in the onComplete callback never being invoked and no output produced, the callback is never invoked and the DeferredResult is never completed and the request will hang until timeout. In reality with a larger pool and other work eventually competing this should not happen. Java 8's CompletableFuture has variants for expression binding which allow the definition of an Executor. We can show the same effect as above like this:

        CompletableFuture
            .supplyAsync(() -> Try
                .run(() -> TimeUnit.MILLISECONDS.sleep(100))
                .map(result -> true)
                .getOrElseThrow(() -> new IllegalStateException()), service)
            .whenCompleteAsync((result, thrown) -> {
                System.out.println(result + " : " + thrown);
            }, service);

It would be good to have the expression run on the task thread by default and have variants to supply an Executor. Maybe in a similar vein to RxJava an Future.on(Executor executor) method which would define the Executor for all subsequent expressions.

In Javaslang this would look like this:

Future.of(fooExecutor, this::computation) // executes the async computation using fooExecutor
    .on(barExecutor) // this line is optional, it will execute all supsequent operations using barExecutor
    .map(...) // this will execute the map operation on the same task the future currently runs on
    .map(bazExecutor, ...) // this will execute (only) this map operation on a task created by bazExecutor
    ...

I created a new issue for the future.on(ExecutorService) method: #1537

@danieldietrich
Copy link
Contributor Author

fromJavaFuture and fromJavaCompletableFuture should reuse the existing Thread if possible

@danieldietrich
Copy link
Contributor Author

The issues mentioned here are solved with #2093 / #2110 / #2111.

@danieldietrich danieldietrich removed this from the vavr-1.0.0 milestone Oct 23, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant