From 6535de1770fa94e4aef77d9d156136b46f0c5277 Mon Sep 17 00:00:00 2001 From: Daniel Dietrich Date: Mon, 25 Sep 2017 00:21:40 +0200 Subject: [PATCH 1/7] Added output to FutureTest test methods --- .../java/io/vavr/concurrent/FutureTest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java index 716f1554ed..0e3fbe28aa 100644 --- a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java +++ b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java @@ -27,9 +27,14 @@ import io.vavr.control.Option; import io.vavr.control.Try; import org.assertj.core.api.IterableAssert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; import java.io.IOException; +import java.time.LocalDateTime; import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.concurrent.*; @@ -44,6 +49,21 @@ public class FutureTest extends AbstractValueTest { + @Rule + public TestRule watcher = new TestWatcher() { + @Override + protected void starting(Description desc) { + printInfo("[STARTING]", desc); + } + @Override + protected void finished(Description desc) { + printInfo("[FINISHED]", desc); + } + private void printInfo(String prefix, Description desc) { + System.out.println(String.format("%s %s %s", prefix, LocalDateTime.now(), desc.getDisplayName())); + } + }; + @Override protected IterableAssert assertThat(Iterable actual) { return new IterableAssert(actual) { From c95ca571a6189d747e7002ad1cdf957313f06140 Mon Sep 17 00:00:00 2001 From: Daniel Dietrich Date: Mon, 25 Sep 2017 22:45:58 +0200 Subject: [PATCH 2/7] Changed Future.DEFAULT_EXECUTOR_SERVICE to ForkJoinPool.commonPool() Removed Promise. --- pom.xml | 11 +- vavr/src/main/java/io/vavr/Value.java | 3 - .../main/java/io/vavr/concurrent/Future.java | 321 +++++++++------- .../java/io/vavr/concurrent/FutureImpl.java | 205 +++++----- .../main/java/io/vavr/concurrent/Promise.java | 345 ----------------- .../java/io/vavr/concurrent/Concurrent.java | 30 +- .../java/io/vavr/concurrent/FutureTest.java | 357 ++++++++++-------- .../java/io/vavr/concurrent/PromiseTest.java | 132 ------- 8 files changed, 487 insertions(+), 917 deletions(-) delete mode 100644 vavr/src/main/java/io/vavr/concurrent/Promise.java delete mode 100644 vavr/src/test/java/io/vavr/concurrent/PromiseTest.java diff --git a/pom.xml b/pom.xml index 54b63560c8..be55a38c40 100644 --- a/pom.xml +++ b/pom.xml @@ -302,10 +302,13 @@ We use these goals frequently to keep the dependencies and plugins up-to-date: maven-surefire-plugin ${maven.surefire.version} - - -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 - - none + + + all + 4 + true diff --git a/vavr/src/main/java/io/vavr/Value.java b/vavr/src/main/java/io/vavr/Value.java index 2a6cec17b0..11e15322e2 100644 --- a/vavr/src/main/java/io/vavr/Value.java +++ b/vavr/src/main/java/io/vavr/Value.java @@ -38,7 +38,6 @@ import io.vavr.collection.TreeMap; import io.vavr.collection.TreeSet; import io.vavr.collection.Vector; -import io.vavr.concurrent.Future; import io.vavr.control.Either; import io.vavr.control.Option; import io.vavr.control.Try; @@ -1353,8 +1352,6 @@ default Stream toStream() { default Try toTry() { if (this instanceof Try) { return (Try) this; - } else if (this instanceof Future) { - return ((Future) this).await().getValue().get(); } else { return Try.of(this::get); } diff --git a/vavr/src/main/java/io/vavr/concurrent/Future.java b/vavr/src/main/java/io/vavr/concurrent/Future.java index 9c648cf747..df9b4e748d 100644 --- a/vavr/src/main/java/io/vavr/concurrent/Future.java +++ b/vavr/src/main/java/io/vavr/concurrent/Future.java @@ -30,6 +30,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.*; @@ -55,10 +56,30 @@ public interface Future extends Value { /** - * The default executor service is {@link Executors#newCachedThreadPool()}. - * Please note that it may prevent the VM from shutdown. + * The default executor service is {@link ForkJoinPool#commonPool()}. + *

+ * Facts about ForkJoinPool: + * + *

    + *
  • It is work-stealing, i.e. all threads in the pool attempt to find work submitted to the pool. + * Especially this is efficient under heavy load (many small tasks), e.g. when tasks create subtasks + * (recursive threads).
  • + *
  • The ForkJoinPool is dynamic, it has a maximum of 32767 running threads. Compared to fixed-size pools, + * this reduces the risk of dead-locks.
  • + *
  • The commonPool() is shared across the entire VM. Keep this in mind when also using + * {@link java.util.stream.Stream#parallel()} and {@link java.util.concurrent.CompletableFuture}}
  • + *
+ * + * The ForkJoinPool creates daemon threads but its run state is unaffected by attempts to shutdown() or shutdownNow(). + * However, all running tasks are immediately terminated upon program System.exit(int). + *

+ * IMPORTANT: Invoke {@code ForkJoinPool.commonPool().awaitQuiescence(long, TimeUnit)} before exit in order to + * ensure that all running async tasks complete before program termination. + * + * @see ForkJoinPool#awaitQuiescence(long, TimeUnit) */ - ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool(); + // See https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks + ExecutorService DEFAULT_EXECUTOR_SERVICE = ForkJoinPool.commonPool(); /** * Creates a failed {@code Future} with the given {@code exception}, backed by the {@link #DEFAULT_EXECUTOR_SERVICE}. @@ -85,7 +106,7 @@ static Future failed(Throwable exception) { static Future failed(ExecutorService executorService, Throwable exception) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(exception, "exception is null"); - return Promise. failed(executorService, exception).future(); + return new FutureImpl<>(executorService, Try.failure(exception)); } /** @@ -121,31 +142,32 @@ static Future> find(ExecutorService executorService, Iterable> promise = Promise.make(executorService); final List> list = List.ofAll(futures); if (list.isEmpty()) { - promise.success(Option.none()); + return successful(executorService, Option.none()); } else { - final AtomicInteger count = new AtomicInteger(list.length()); - list.forEach(future -> future.onComplete(result -> { - synchronized (count) { - // if the promise is already completed we already found our result and there is nothing more to do. - if (!promise.isCompleted()) { - // when there are no more results we return a None - final boolean wasLast = count.decrementAndGet() == 0; - // when result is a Failure or predicate is false then we check in onFailure for finish - result.filter(predicate) - .onSuccess(value -> promise.trySuccess(Option.some(value))) - .onFailure(ignored -> { - if (wasLast) { - promise.trySuccess(Option.none()); - } - }); + return promise(executorService, tryComplete -> { + final AtomicBoolean completed = new AtomicBoolean(false); + final AtomicInteger count = new AtomicInteger(list.length()); + list.forEach(future -> future.onComplete(result -> { + synchronized (count) { + // if the promise is already completed we already found our result and there is nothing more to do. + if (!completed.get()) { + // when there are no more results we return a None + final boolean wasLast = count.decrementAndGet() == 0; + // when result is a Failure or predicate is false then we check in onFailure for finish + result.filter(predicate) + .onSuccess(value -> completed.set(tryComplete.test(Try.success(Option.some(value))))) + .onFailure(ignored -> { + if (wasLast) { + completed.set(tryComplete.test(Try.success(Option.none()))); + } + }); + } } - } - })); + })); + }); } - return promise.future(); } /** @@ -174,10 +196,7 @@ static Future firstCompletedOf(Iterable> fu static Future firstCompletedOf(ExecutorService executorService, Iterable> futures) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(futures, "futures is null"); - final Promise promise = Promise.make(executorService); - final Consumer> completeFirst = promise::tryComplete; - futures.forEach(future -> future.onComplete(completeFirst)); - return promise.future(); + return promise(executorService, tryComplete -> futures.forEach(future -> future.onComplete(tryComplete::test))); } /** @@ -234,7 +253,7 @@ static Future fold(ExecutorService executorService, Iterable Future fromJavaFuture(java.util.concurrent.Future future) { Objects.requireNonNull(future, "future is null"); - return Future.of(DEFAULT_EXECUTOR_SERVICE, future::get); + return of(DEFAULT_EXECUTOR_SERVICE, future::get); } /** @@ -249,7 +268,7 @@ static Future fromJavaFuture(java.util.concurrent.Future future) { static Future fromJavaFuture(ExecutorService executorService, java.util.concurrent.Future future) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(future, "future is null"); - return Future.of(executorService, future::get); + return of(executorService, future::get); } /** @@ -278,9 +297,13 @@ static Future fromCompletableFuture(CompletableFuture future) { static Future fromCompletableFuture(ExecutorService executorService, CompletableFuture future) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(future, "future is null"); - final Promise promise = Promise.make(); - future.handle((t, err) -> err == null ? promise.success(t) : promise.failure(err)); - return promise.future(); + if (future.isDone() || future.isCompletedExceptionally() || future.isCancelled()) { + return fromTry(Try.of(future::get).mapFailure(Throwable::getCause)); + } else { + return promise(executorService, tryComplete -> + future.handle((t, err) -> tryComplete.test((err == null) ? Try.success(t) : Try.failure(err))) + ); + } } /** @@ -307,7 +330,7 @@ static Future fromTry(Try result) { static Future fromTry(ExecutorService executorService, Try result) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(result, "result is null"); - return Promise. fromTry(executorService, result).future(); + return new FutureImpl<>(executorService, result); } /** @@ -333,7 +356,7 @@ static Future narrow(Future future) { * @throws NullPointerException if computation is null. */ static Future of(CheckedFunction0 computation) { - return Future.of(DEFAULT_EXECUTOR_SERVICE, computation); + return of(DEFAULT_EXECUTOR_SERVICE, computation); } /** @@ -348,9 +371,46 @@ static Future of(CheckedFunction0 computation) { static Future of(ExecutorService executorService, CheckedFunction0 computation) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(computation, "computation is null"); - final FutureImpl future = new FutureImpl<>(executorService); - future.run(computation); - return future; + return promise(executorService, tryComplete -> tryComplete.test(Try.of(computation))); + } + + /** + * Creates a Future based on a promise in the way, that the given computation requires + * to complete the Future. + *

+ * The computation receives a {@link Predicate}, named {@code tryComplete} by convention, + * that takes a result of {@code Try} and return a boolean that states whether the + * Future was completed. + *

+ * Future completion is an idempotent operation in the way that the first call of {@code tryComplete} + * will return true, successive calls will return false. + * + * @param computation A computational task + * @param Type of the result + * @return a new {@code Future} instance + */ + static Future promise(CheckedConsumer>> computation) { + return promise(DEFAULT_EXECUTOR_SERVICE, computation); + } + + /** + * Creates a Future based on a promise in the way, that the given computation requires + * to complete the Future. + *

+ * The computation receives a {@link Predicate}, named {@code tryComplete} by convention, + * that takes a result of {@code Try} and return a boolean that states whether the + * Future was completed. + *

+ * Future completion is an idempotent operation in the way that the first call of {@code tryComplete} + * will return true, successive calls will return false. + * + * @param executorService an executor service that runs the given {@code computation} + * @param computation A computational task + * @param Type of the result + * @return a new {@code Future} instance + */ + static Future promise(ExecutorService executorService, CheckedConsumer>> computation) { + return new FutureImpl<>(executorService, computation); } /** @@ -415,7 +475,7 @@ static Future run(CheckedRunnable unit) { static Future run(ExecutorService executorService, CheckedRunnable unit) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(unit, "unit is null"); - return Future.of(executorService, () -> { + return of(executorService, () -> { unit.run(); return null; }); @@ -502,7 +562,7 @@ static Future successful(T result) { */ static Future successful(ExecutorService executorService, T result) { Objects.requireNonNull(executorService, "executorService is null"); - return Promise.successful(executorService, result).future(); + return new FutureImpl<>(executorService, Try.success(result)); } @Override @@ -574,12 +634,12 @@ static Future> traverse(ExecutorService executorService, Iterable< */ default Future andThen(Consumer> action) { Objects.requireNonNull(action, "action is null"); - final Promise promise = Promise.make(executorService()); - onComplete(t -> { - Try.run(() -> action.accept(t)); - promise.complete(t); - }); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(t -> { + Try.run(() -> action.accept(t)); + tryComplete.test(t); + }) + ); } /** @@ -652,9 +712,9 @@ default Future cancel() { */ default Future collect(PartialFunction partialFunction) { Objects.requireNonNull(partialFunction, "partialFunction is null"); - final Promise promise = Promise.make(executorService()); - onComplete(result -> promise.complete(result.collect(partialFunction))); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(result -> tryComplete.test(result.collect(partialFunction))) + ); } /** @@ -674,15 +734,15 @@ default Future collect(PartialFunction partialFun * @return A new Future which contains an exception at a point of time. */ default Future failed() { - final Promise promise = Promise.make(executorService()); - onComplete(result -> { - if (result.isFailure()) { - promise.success(result.getCause()); - } else { - promise.failure(new NoSuchElementException("Future.failed completed without a throwable")); - } - }); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(result -> { + if (result.isFailure()) { + tryComplete.test(Try.success(result.getCause())); + } else { + tryComplete.test(Try.failure(new NoSuchElementException("Future.failed completed without a throwable"))); + } + }) + ); } /** @@ -706,21 +766,15 @@ default Future failed() { */ default Future fallbackTo(Future that) { Objects.requireNonNull(that, "that is null"); - final Promise promise = Promise.make(executorService()); - onComplete(t -> { - if (t.isSuccess()) { - promise.complete(t); - } else { - that.onComplete(alt -> { - if (alt.isSuccess()) { - promise.complete(alt); - } else { - promise.complete(t); - } - }); - } - }); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(t -> { + if (t.isSuccess()) { + tryComplete.test(t); + } else { + that.onComplete(alt -> tryComplete.test(alt.isSuccess() ? alt : t)); + } + }) + ); } /** @@ -744,9 +798,7 @@ default Future filter(Predicate predicate) { */ default Future filterTry(CheckedPredicate predicate) { Objects.requireNonNull(predicate, "predicate is null"); - final Promise promise = Promise.make(executorService()); - onComplete(result -> promise.complete(result.filterTry(predicate))); - return promise.future(); + return promise(executorService(), tryComplete -> onComplete(result -> tryComplete.test(result.filterTry(predicate)))); } /** @@ -781,21 +833,21 @@ default Option getCause() { boolean isCompleted(); /** - * Checks if this Future completed with a success. + * Checks if this Future completed with a failure. * - * @return true, if this Future completed and is a Success, false otherwise. + * @return true, if this Future completed and is a Failure, false otherwise. */ - default boolean isSuccess() { - return getValue().map(Try::isSuccess).getOrElse(false); + default boolean isFailure() { + return isCompleted() && getValue().get().isFailure(); } /** - * Checks if this Future completed with a failure. + * Checks if this Future completed with a success. * - * @return true, if this Future completed and is a Failure, false otherwise. + * @return true, if this Future completed and is a Success, false otherwise. */ - default boolean isFailure() { - return getValue().map(Try::isFailure).getOrElse(false); + default boolean isSuccess() { + return isCompleted() && getValue().get().isSuccess(); } /** @@ -865,15 +917,16 @@ default Future recover(Function f) { */ default Future recoverWith(Function> f) { Objects.requireNonNull(f, "f is null"); - final Promise promise = Promise.make(executorService()); - onComplete(t -> { - if (t.isFailure()) { - Try.run(() -> f.apply(t.getCause()).onComplete(promise::complete)).onFailure(promise::failure); - } else { - promise.complete(t); - } - }); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(t -> { + if (t.isFailure()) { + Try.run(() -> f.apply(t.getCause()).onComplete(tryComplete::test)) + .onFailure(x -> tryComplete.test(Try.failure(x))); + } else { + tryComplete.test(t); + } + }) + ); } /** @@ -899,9 +952,11 @@ default U transform(Function, ? extends U> f) { */ default Future transformValue(Function, ? extends Try> f) { Objects.requireNonNull(f, "f is null"); - final Promise promise = Promise.make(executorService()); - onComplete(t -> Try.run(() -> promise.complete(f.apply(t))).onFailure(promise::failure)); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(t -> Try.run(() -> tryComplete.test(f.apply(t))) + .onFailure(x -> tryComplete.test(Try.failure(x))) + ) + ); } /** @@ -937,18 +992,18 @@ default Future> zip(Future that) { default Future zipWith(Future that, BiFunction combinator) { Objects.requireNonNull(that, "that is null"); Objects.requireNonNull(combinator, "combinator is null"); - final Promise promise = Promise.make(executorService()); - onComplete(res1 -> { - if (res1.isFailure()) { - promise.complete((Try.Failure) res1); - } else { - that.onComplete(res2 -> { - final Try result = res1.flatMap(t -> res2.map(u -> combinator.apply(t, u))); - promise.complete(result); - }); - } - }); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(res1 -> { + if (res1.isFailure()) { + tryComplete.test((Try.Failure) res1); + } else { + that.onComplete(res2 -> { + final Try result = res1.flatMap(t -> res2.map(u -> combinator.apply(t, u))); + tryComplete.test(result); + }); + } + }) + ); } // -- Value & Monad implementation @@ -960,12 +1015,12 @@ default Future flatMap(Function> default Future flatMapTry(CheckedFunction1> mapper) { Objects.requireNonNull(mapper, "mapper is null"); - final Promise promise = Promise.make(executorService()); - onComplete((Try result) -> result.mapTry(mapper) - .onSuccess(promise::completeWith) - .onFailure(promise::failure) + return promise(executorService(), tryComplete -> + onComplete(result -> result.mapTry(mapper) + .onSuccess(future -> future.onComplete(tryComplete::test)) + .onFailure(x -> tryComplete.test(Try.failure(x))) + ) ); - return promise.future(); } /** @@ -987,6 +1042,7 @@ default void forEach(Consumer action) { * IMPORTANT! If the computation result is a {@link Try.Failure}, the underlying {@code cause} of type {@link Throwable} is thrown. * * @return The value of this {@code Future}. + * @throws InterruptedException if the current thread was interrupted while waiting for the value */ @Override default T get() { @@ -1007,6 +1063,7 @@ default boolean isAsync() { * Checks, if this future has a value. * * @return true, if this future succeeded with a value, false otherwise. + * @throws InterruptedException if the current thread was interrupted while waiting for the value */ @Override default boolean isEmpty() { @@ -1048,31 +1105,31 @@ default Future mapTry(CheckedFunction1 mapper) { Objects.requireNonNull(mapper, "mapper is null"); return transformValue(t -> t.mapTry(mapper::apply)); } - + default Future orElse(Future other) { Objects.requireNonNull(other, "other is null"); - final Promise promise = Promise.make(executorService()); - onComplete(result -> { - if (result.isSuccess()) { - promise.complete(result); - } else { - other.onComplete(promise::complete); - } - }); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(result -> { + if (result.isSuccess()) { + tryComplete.test(result); + } else { + other.onComplete(tryComplete::test); + } + }) + ); } default Future orElse(Supplier> supplier) { Objects.requireNonNull(supplier, "supplier is null"); - final Promise promise = Promise.make(executorService()); - onComplete(result -> { - if (result.isSuccess()) { - promise.complete(result); - } else { - supplier.get().onComplete(promise::complete); - } - }); - return promise.future(); + return promise(executorService(), tryComplete -> + onComplete(result -> { + if (result.isSuccess()) { + tryComplete.test(result); + } else { + supplier.get().onComplete(tryComplete::test); + } + }) + ); } @Override diff --git a/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java b/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java index 8a598d4a7f..90995387b9 100644 --- a/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java +++ b/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java @@ -19,52 +19,19 @@ */ package io.vavr.concurrent; -import io.vavr.CheckedFunction0; +import io.vavr.CheckedConsumer; import io.vavr.collection.Queue; -import io.vavr.control.Try; import io.vavr.control.Option; +import io.vavr.control.Try; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.function.Predicate; /** * INTERNAL API - This class is subject to change. - *

- * Once a {@code FutureImpl} is created, one (and only one) of the following methods is called - * to complete it with a result: - *

    - *
  • {@link #run(CheckedFunction0)} - typically called within a {@code Future} factory method
  • - *
  • {@link #tryComplete(Try)} - explicit write operation, typically called by {@code Promise}
  • - *
- *

- * Lifecycle of a {@code FutureImpl}: - *

- * 1) Creation - *

    - *
  • {@code value = None}
  • - *
  • {@code actions = Queue.empty()}
  • - *
  • {@code job = null}
  • - *
- * 2) Run - *
    - *
  • {@code value = None}
  • - *
  • {@code actions = Queue(...)}
  • - *
  • {@code job = java.util.concurrent.Future}
  • - *
- * 3) Complete - *
    - *
  • {@code value = Some(Try)}
  • - *
  • {@code actions = null}
  • - *
  • {@code job = null}
  • - *
- * 4) Cancel - *
    - *
  • {@code value = Some(Failure(CancellationException))}
  • - *
  • {@code actions = null}
  • - *
  • {@code job = null}
  • - *
* * @param Result of the computation. * @author Daniel Dietrich @@ -85,39 +52,76 @@ final class FutureImpl implements Future { * Once the Future is completed, the value is defined. */ @GuardedBy("lock") - private volatile Option> value = Option.none(); + private volatile Option> value; /** * The queue of actions is filled when calling onComplete() before the Future is completed or cancelled. * Otherwise actions = null. */ @GuardedBy("lock") - private Queue>> actions = Queue.empty(); + private Queue>> actions; /** * The queue of waiters is filled when calling await() before the Future is completed or cancelled. * Otherwise waiters = null. */ @GuardedBy("lock") - private Queue waiters = Queue.empty(); + private Queue waiters; /** * Once a computation is started via run(), job is defined and used to control the lifecycle of the computation. + * The job variable must not be set to null after it was defined. *

* The {@code java.util.concurrent.Future} is not intended to store the result of the computation, it is stored in * {@code value} instead. */ @GuardedBy("lock") - private java.util.concurrent.Future job = null; + private java.util.concurrent.Future job; /** - * Creates a Future, {@link #run(CheckedFunction0)} has to be called separately. + * Creates a Future that is immediately completed with the given value. No task will be started. * * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions. + * @param value the result of this Future */ - FutureImpl(ExecutorService executorService) { + @SuppressWarnings("unchecked") + FutureImpl(ExecutorService executorService, Try value) { Objects.requireNonNull(executorService, "executorService is null"); this.executorService = executorService; + this.value = Option.some((Try) value); + this.actions = null; + this.waiters = null; + this.job = null; + } + + /** + * Creates a Future and starts a task. + * + * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions. + * @param computation A computation that receives a complete function + */ + FutureImpl(ExecutorService executorService, CheckedConsumer>> computation) { + Objects.requireNonNull(executorService, "executorService is null"); + this.executorService = executorService; + // the lock ensures that the task does not complete this Future before the constructor is finished + synchronized (lock) { + this.value = Option.none(); + this.actions = Queue.empty(); + this.waiters = Queue.empty(); + try { + // In a single-threaded context this Future may already have been completed during initialization. + // The worker thread completes this Future before it is done. + this.job = executorService.submit(() -> { + try { + computation.accept(this::tryComplete); + } catch (Throwable x) { + tryComplete(Try.failure(x)); + } + }); + } catch (Throwable x) { + tryComplete(Try.failure(x)); + } + } } @Override @@ -147,7 +151,7 @@ public Future await(long timeout, TimeUnit unit) { * If timeout = 0 then {@code LockSupport.park()} is called (start, timeout and unit are not used), * otherwise {@code LockSupport.park(timeout, unit}} is called. *

- * If a timeout > 0 is specified and the deadline is not met, this Future fails with a {@link TimeoutException}. + * If a timeout > -1 is specified and the deadline is not met, this Future fails with a {@link TimeoutException}. *

* If this Thread was interrupted, this Future fails with a {@link InterruptedException}. * @@ -213,14 +217,12 @@ public boolean isReleasable() { @Override public Future cancel(boolean mayInterruptIfRunning) { - synchronized (lock) { - if (!isCompleted()) { - Try.of(() -> job == null || job.cancel(mayInterruptIfRunning)).onSuccess(cancelled -> { - if (cancelled) { - complete(Try.failure(new CancellationException())); - } - }); - } + if (!isCompleted()) { + Try.of(() -> job.cancel(mayInterruptIfRunning)).onSuccess(cancelled -> { + if (cancelled) { + tryComplete(Try.failure(new CancellationException())); + } + }); } return this; } @@ -267,88 +269,53 @@ public Future onComplete(Consumer> action) { @Override public String toString() { - return stringPrefix() + "(" + value.map(String::valueOf).getOrElse("?") + ")"; + final String value = (this.value == null || this.value.isEmpty()) ? "?" : this.value.get().toString(); + return stringPrefix() + "(" + value + ")"; } /** - * Runs a computation using the underlying ExecutorService. + * INTERNAL METHOD, SHOULD BE USED BY THE CONSTRUCTOR, ONLY. *

- * DEV-NOTE: Internally this method is called by the static {@code Future} factory methods. - * - * @throws IllegalStateException if the Future is pending, completed or cancelled - * @throws NullPointerException if {@code computation} is null. - */ - void run(CheckedFunction0 computation) { - Objects.requireNonNull(computation, "computation is null"); - synchronized (lock) { - if (job != null) { - throw new IllegalStateException("The Future is already running."); - } - if (isCompleted()) { - throw new IllegalStateException("The Future is completed."); - } - try { - // if the ExecutorService runs the computation - // - in a different thread, the lock ensures that the job is assigned before the computation completes - // - in the current thread, the job is already completed and the `job` variable remains null - final java.util.concurrent.Future tmpJob = executorService.submit(() -> complete(Try.of(computation))); - if (!isCompleted()) { - job = tmpJob; - } - } catch (Throwable t) { - // ensures that the Future completes if the `executorService.submit()` method throws - if (!isCompleted()) { - complete(Try.failure(t)); - } - } - } - } - - boolean tryComplete(Try value) { - Objects.requireNonNull(value, "value is null"); - synchronized (lock) { - if (isCompleted()) { - return false; - } else { - complete(value); - return true; - } - } - } - - /** - * Completes this Future with a value. + * Completes this Future with a value and performs all actions. *

- * DEV-NOTE: Internally this method is called by the {@code Future.run()} method and by {@code Promise}. + * This method is idempotent. I.e. it does nothing, if this Future is already completed. * * @param value A Success containing a result or a Failure containing an Exception. * @throws IllegalStateException if the Future is already completed or cancelled. * @throws NullPointerException if the given {@code value} is null. + * @see FutureImpl#FutureImpl(ExecutorService, CheckedConsumer) */ - private void complete(Try value) { + private boolean tryComplete(Try value) { Objects.requireNonNull(value, "value is null"); - final Queue>> actions; - final Queue waiters; - // it is essential to make the completed state public *before* performing the actions - synchronized (lock) { - if (isCompleted()) { - actions = null; - waiters = null; + if (isCompleted()) { + return false; + } else { + final Queue>> actions; + final Queue waiters; + // it is essential to make the completed state public *before* performing the actions + synchronized (lock) { + if (isCompleted()) { + actions = null; + waiters = null; + } else { + // the job isn't set to null, see isCancelled() + actions = this.actions; + waiters = this.waiters; + this.value = Option.some(Try.narrow(value)); + this.actions = null; + this.waiters = null; + } + } + if (waiters != null) { + waiters.forEach(LockSupport::unpark); + } + if (actions != null) { + actions.forEach(this::perform); + return true; } else { - // the job isn't set to null, see isCancelled() - actions = this.actions; - waiters = this.waiters; - this.value = Option.some(Try.narrow(value)); - this.actions = null; - this.waiters = null; + return false; } } - if (waiters != null) { - waiters.forEach(LockSupport::unpark); - } - if (actions != null) { - actions.forEach(this::perform); - } } private void perform(Consumer> action) { diff --git a/vavr/src/main/java/io/vavr/concurrent/Promise.java b/vavr/src/main/java/io/vavr/concurrent/Promise.java deleted file mode 100644 index 7e9324cff8..0000000000 --- a/vavr/src/main/java/io/vavr/concurrent/Promise.java +++ /dev/null @@ -1,345 +0,0 @@ -/* __ __ __ __ __ ___ - * \ \ / / \ \ / / __/ - * \ \/ / /\ \ \/ / / - * \____/__/ \__\____/__/ - * - * Copyright 2014-2017 Vavr, http://vavr.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.vavr.concurrent; - -import io.vavr.control.Try; - -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; - -import static io.vavr.concurrent.Future.DEFAULT_EXECUTOR_SERVICE; - -/** - * A Promise is a write-once wrapper around a read-only Future which can complete the underlying Future with a value - * or an exception. - *

- * The underlying {@code ExecutorService} is used to execute asynchronous handlers, e.g. via - * {@code promise.future().onComplete(...)}. - * - *

Creation

- *

- * Promise offers static factory methods to create new promises which hasn't been fulfilled yet: - *

    - *
  • create new promises: {@link Promise#make()}
  • - *
- * And we may create new promises that are already finished: - *
    - *
  • {@link #failed(Throwable)}
  • - *
  • {@link #fromTry(Try)}
  • - *
  • {@link #successful(Object)}
  • - *
- * All the static factory methods mentioned above have additional versions which take an {@link ExecutorService} as - * argument. This gives us more control over thread creation and thread pool sizes. - * - *

One-shot API

- *

- * The main purpose of a {@code Promise} is to complete its underlying {@code Future}. When only a single {@code Thread} - * will eventually complete the {@code Promise}, we use one of these methods. Calls will throw if the {@code Promise} is already - * completed. - *

    - *
  • {@link #complete(Try)}
  • - *
  • {@link #completeWith(Future)}
  • - *
  • {@link #failure(Throwable)}
  • - *
  • {@link #success(Object)}
  • - *
- * - *

API for competing threads

- *

- * When multiple {@code Thread}s may complete our {@code Promise}, we typically use one of these methods. Calls will - * gracefully return {@code false} if the {@code Promise} is already completed. - *

    - *
  • {@link #tryComplete(Try)}
  • - *
  • {@link #tryCompleteWith(Future)}
  • - *
  • {@link #tryFailure(Throwable)}
  • - *
  • {@link #trySuccess(Object)}
  • - *
- * - * @param The result type of the underlying {@code Future}. - * @author Daniel Dietrich - */ -public interface Promise { - - /** - * Creates a failed {@code Promise}, backed by the {@link Future#DEFAULT_EXECUTOR_SERVICE}. - * - * @param exception The reason why it failed. - * @param The value type of a successful result. - * @return A failed {@code Promise}. - * @throws NullPointerException if exception is null - */ - static Promise failed(Throwable exception) { - Objects.requireNonNull(exception, "exception is null"); - return failed(DEFAULT_EXECUTOR_SERVICE, exception); - } - - /** - * Creates a failed {@code Promise}, backed by the given {@link ExecutorService}. - * - * @param executorService An {@code ExecutorService} passed to the underlying {@link Future}. - * @param exception The reason why it failed. - * @param The value type of a successful result. - * @return A failed {@code Promise}. - * @throws NullPointerException if executorService or exception is null - */ - static Promise failed(ExecutorService executorService, Throwable exception) { - Objects.requireNonNull(executorService, "executorService is null"); - Objects.requireNonNull(exception, "exception is null"); - return Promise. make(executorService).failure(exception); - } - - /** - * Creates a {@code Promise} from a {@link Try}, backed by the {@link Future#DEFAULT_EXECUTOR_SERVICE}. - * - * @param result The result. - * @param The value type of a successful result. - * @return A completed {@code Promise} which contains either a {@code Success} or a {@code Failure}. - * @throws NullPointerException if result is null - */ - static Promise fromTry(Try result) { - return fromTry(DEFAULT_EXECUTOR_SERVICE, result); - } - - /** - * Creates a {@code Promise} from a {@link Try}, backed by the given {@link ExecutorService}. - * - * @param executorService An {@code ExecutorService} passed to the underlying {@link Future}. - * @param result The result. - * @param The value type of a successful result. - * @return A completed {@code Promise} which contains either a {@code Success} or a {@code Failure}. - * @throws NullPointerException if executorService or result is null - */ - static Promise fromTry(ExecutorService executorService, Try result) { - Objects.requireNonNull(executorService, "executorService is null"); - Objects.requireNonNull(result, "result is null"); - return Promise. make(executorService).complete(result); - } - - /** - * Makes a {@code Promise} that isn't fulfilled yet, backed by the {@link Future#DEFAULT_EXECUTOR_SERVICE}. - * {@link ForkJoinPool#commonPool()}. - * - * @param Result type of the {@code Promise}. - * @return A new {@code Promise}. - */ - static Promise make() { - return make(DEFAULT_EXECUTOR_SERVICE); - } - - /** - * Makes a {@code Promise} that isn't fulfilled yet, backed by the given {@link ExecutorService}. - * - * @param executorService An {@code ExecutorService} passed to the underlying {@link Future}. - * @param Result type of the {@code Promise}. - * @return A new {@code Promise}. - * @throws NullPointerException if executorService is null - */ - static Promise make(ExecutorService executorService) { - Objects.requireNonNull(executorService, "executorService is null"); - return new PromiseImpl<>(new FutureImpl<>(executorService)); - } - - /** - * Narrows a widened {@code Promise} to {@code Promise} - * by performing a type-safe cast. This is eligible because immutable/read-only - * collections are covariant. - * - * @param promise A {@code Promise}. - * @param Component type of the {@code Promise}. - * @return the given {@code promise} instance as narrowed type {@code Promise}. - */ - @SuppressWarnings("unchecked") - static Promise narrow(Promise promise) { - return (Promise) promise; - } - - /** - * Creates a succeeded {@code Promise}, backed by the {@link Future#DEFAULT_EXECUTOR_SERVICE}. - * - * @param result The result. - * @param The value type of a successful result. - * @return A succeeded {@code Promise}. - */ - static Promise successful(T result) { - return successful(DEFAULT_EXECUTOR_SERVICE, result); - } - - /** - * Creates a succeeded {@code Promise}, backed by the given {@link ExecutorService}. - * - * @param executorService An {@code ExecutorService} passed to the underlying {@link Future}. - * @param result The result. - * @param The value type of a successful result. - * @return A succeeded {@code Promise}. - * @throws NullPointerException if executorService is null - */ - static Promise successful(ExecutorService executorService, T result) { - Objects.requireNonNull(executorService, "executorService is null"); - return Promise. make(executorService).success(result); - } - - /** - * Returns the {@link ExecutorService} used by this {@code Future}. - * - * @return The underlying {@code ExecutorService}. - */ - ExecutorService executorService(); - - /** - * Returns the underlying {@link Future} of this {@code Promise}. - * - * @return The {@code Future}. - */ - Future future(); - - /** - * Checks if this {@code Promise} is completed, i.e. has a value. - * - * @return true, if the computation successfully finished or failed, false otherwise. - */ - default boolean isCompleted() { - return future().isCompleted(); - } - - /** - * Completes this {@code Promise} with the given {@code value}. - * - * @param value Either a {@link Try.Success} containing the result or a {@link Try.Failure} containing an exception. - * @return This {@code Promise}. - * @throws IllegalStateException if this {@code Promise} has already been completed. - */ - default Promise complete(Try value) { - if (tryComplete(value)) { - return this; - } else { - throw new IllegalStateException("Promise already completed."); - } - } - - /** - * Attempts to completes this {@code Promise} with the given {@code value}. - * - * @param value Either a {@link Try.Success} containing the result or a {@link Try.Failure} containing an exception. - * @return {@code false} if this {@code Promise} has already been completed, {@code true} otherwise. - * @throws IllegalStateException if this {@code Promise} has already been completed. - */ - boolean tryComplete(Try value); - - /** - * Completes this {@code Promise} with the given {@code Future}, once that {@code Future} is completed. - * - * @param other Another {@code Future} to react on. - * @return This {@code Promise}. - */ - default Promise completeWith(Future other) { - return tryCompleteWith(other); - } - - /** - * Attempts to complete this {@code Promise} with the specified {@code Future}, once that {@code Future} is completed. - * - * @param other Another {@code Future} to react on. - * @return This {@code Promise}. - */ - default Promise tryCompleteWith(Future other) { - other.onComplete(this::tryComplete); - return this; - } - - /** - * Completes this {@code Promise} with the given {@code value}. - * - * @param value A value. - * @return This {@code Promise}. - * @throws IllegalStateException if this {@code Promise} has already been completed. - */ - default Promise success(T value) { - return complete(Try.success(value)); - } - - /** - * Completes this {@code Promise} with the given {@code value}. - * - * @param value A value. - * @return {@code false} if this {@code Promise} has already been completed, {@code true} otherwise. - */ - default boolean trySuccess(T value) { - return tryComplete(Try.success(value)); - } - - /** - * Completes this {@code Promise} with the given {@code exception}. - * - * @param exception An exception. - * @return This {@code Promise}. - * @throws IllegalStateException if this {@code Promise} has already been completed. - */ - default Promise failure(Throwable exception) { - return complete(Try.failure(exception)); - } - - /** - * Completes this {@code Promise} with the given {@code exception}. - * - * @param exception An exception. - * @return {@code false} if this {@code Promise} has already been completed, {@code true} otherwise. - */ - default boolean tryFailure(Throwable exception) { - return tryComplete(Try.failure(exception)); - } -} - -/** - * Internal {@code Promise} implementation. - * - * @param result type - * @author Daniel Dietrich - */ -final class PromiseImpl implements Promise { - - private final FutureImpl future; - - PromiseImpl(FutureImpl future) { - this.future = future; - } - - @Override - public ExecutorService executorService() { - return future.executorService(); - } - - @Override - public Future future() { - return future; - } - - @Override - public boolean tryComplete(Try value) { - return future.tryComplete(value); - } - - // The underlying FutureImpl is MUTABLE and therefore we CANNOT CHANGE DEFAULT equals() and hashCode() behavior. - // See http://stackoverflow.com/questions/4718009/mutable-objects-and-hashcode - - @Override - public String toString() { - return "Promise(" + future.getValue().map(String::valueOf).getOrElse("?") + ")"; - } -} diff --git a/vavr/src/test/java/io/vavr/concurrent/Concurrent.java b/vavr/src/test/java/io/vavr/concurrent/Concurrent.java index 0e0c5e6752..0dd47b1f9b 100644 --- a/vavr/src/test/java/io/vavr/concurrent/Concurrent.java +++ b/vavr/src/test/java/io/vavr/concurrent/Concurrent.java @@ -31,29 +31,25 @@ final class Concurrent { private static final Random RND = new Random(); - // Max wait time for results = WAIT_MILLIS * WAIT_COUNT (however, most probably it will take only WAIT_MILLIS * 1) - private static final long WAIT_MILLIS = 50; - private static final int WAIT_COUNT = 100; - // Max sleep time to delay computation private static final int SLEEP_MAX_MILLIS = 150; private Concurrent() { } - /** - * Frequently checking if something happened by testing a condition. - * If after {@link #WAIT_COUNT} * {@link #WAIT_MILLIS} ms nothing happened, an {@code AssertionError} is thrown. - * - * @param condition A condition. - */ static void waitUntil(Supplier condition) { - int count = 0; - while (!condition.get()) { - if (++count > WAIT_COUNT) { + long nanos = 1L; + boolean interrupted = false; + while (!interrupted && !condition.get()) { + if (nanos > 1_000_000) { fail("Condition not met."); } else { - Try.run(() -> Thread.sleep(WAIT_MILLIS)); + try { + Thread.sleep(nanos); + nanos = nanos << 1; + } catch(InterruptedException x) { + interrupted = true; + } } } } @@ -78,10 +74,4 @@ static CheckedFunction0 zZz(X exception) { throw exception; }; } - - static Void waitForever() { - while (true) { - Try.run(() -> Thread.sleep(WAIT_MILLIS)); - } - } } diff --git a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java index 0e3fbe28aa..2dd98c9f06 100644 --- a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java +++ b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java @@ -38,10 +38,12 @@ import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static io.vavr.concurrent.Concurrent.waitUntil; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static io.vavr.concurrent.Concurrent.zZz; import static io.vavr.concurrent.ExecutorServices.rejectingExecutorService; import static io.vavr.concurrent.ExecutorServices.trivialExecutorService; @@ -54,6 +56,7 @@ public class FutureTest extends AbstractValueTest { @Override protected void starting(Description desc) { printInfo("[STARTING]", desc); + printForkJoinPoolInfo(); } @Override protected void finished(Description desc) { @@ -62,6 +65,22 @@ protected void finished(Description desc) { private void printInfo(String prefix, Description desc) { System.out.println(String.format("%s %s %s", prefix, LocalDateTime.now(), desc.getDisplayName())); } + private void printForkJoinPoolInfo() { + final ForkJoinPool pool = ForkJoinPool.commonPool(); + final String info = String.format("- [ForkJoinPool.commonPool()] parallelism: %s, poolSize: %s, isAsyncMode: %s, runningThreadCount: %s, activeThreadCount: %s, isQuiescent: %s, stealCount: %s, queuedTaskCount: %s, queuedSubmissionCount: %s, hasQueuedSubmissions: %s\n", + pool.getParallelism(), + pool.getPoolSize(), + pool.getAsyncMode(), + pool.getRunningThreadCount(), + pool.getActiveThreadCount(), + pool.isQuiescent(), + pool.getStealCount(), + pool.getQueuedTaskCount(), + pool.getQueuedSubmissionCount(), + pool.hasQueuedSubmissions() + ); + System.out.printf(info); + } }; @Override @@ -110,8 +129,7 @@ protected int getPeekNonNilPerformingAnAction() { @Test public void shouldCreateFailureThatFailsWithRuntimeException() { - final Future failed = Future.failed(new RuntimeException("ooops")); - waitUntil(failed::isCompleted); + final Future failed = Future.failed(new RuntimeException("ooops")).await(); assertThat(failed.isFailure()).isTrue(); final Throwable t = failed.getValue().get().getCause(); assertThat(t.getClass()).isEqualTo(RuntimeException.class); @@ -120,8 +138,7 @@ public void shouldCreateFailureThatFailsWithRuntimeException() { @Test public void shouldCreateFailureThatFailsWithError() { - final Future failed = Future.failed(new Error("ooops")); - waitUntil(failed::isCompleted); + final Future failed = Future.failed(new Error("ooops")).await(); assertThat(failed.isFailure()).isTrue(); final Throwable t = failed.getValue().get().getCause(); assertThat(t.getClass()).isEqualTo(Error.class); @@ -133,7 +150,7 @@ public void shouldCreateAndFailAFutureUsingForkJoinPool() { final Future future = Future.of(() -> { throw new Error(); }); - waitUntil(future::isCompleted); + future.await(); assertFailed(future, Error.class); } @@ -149,19 +166,15 @@ public void shouldCreateAndFailAFutureUsingTrivialExecutorService() { @Test public void shouldCreateFutureFromJavaFuture() { - // Create slow-resolving Java future to show that the wrapping doesn't block - final java.util.concurrent.Future jFuture = generateJavaFuture(1, 100); - final Future future = Future.fromJavaFuture(jFuture); - waitUntil(future::isCompleted); + final java.util.concurrent.Future jFuture = CompletableFuture.supplyAsync(() -> 1); + final Future future = Future.fromJavaFuture(jFuture).await(); assertCompleted(future, 1); } @Test public void shouldCreateFutureFromJavaFutureUsingTrivialExecutorService() { - // Create slow-resolving Java future to show that the wrapping doesn't block - final java.util.concurrent.Future jFuture = generateJavaFuture("Result", 100); - final Future future = Future.fromJavaFuture(trivialExecutorService(), jFuture); - waitUntil(future::isCompleted); + final java.util.concurrent.Future jFuture = CompletableFuture.supplyAsync(() -> "Result"); + final Future future = Future.fromJavaFuture(trivialExecutorService(), jFuture).await(); assertCompleted(future, "Result"); } @@ -184,30 +197,27 @@ public void shouldCreateFutureFromFailedJavaCompletableFuture() { @Test public void shouldCreateFutureFromJavaCompletableFuture() { - // Create slow-resolving Java future to show that the wrapping doesn't block - final CompletableFuture jFuture = generateJavaCompletableFuture(1, 100); - final Future future = Future.fromCompletableFuture(jFuture); - waitUntil(future::isCompleted); + final CompletableFuture jFuture = CompletableFuture.supplyAsync(() -> 1); + final Future future = Future.fromCompletableFuture(jFuture).await(); assertCompleted(future, 1); } @Test public void shouldCreateFutureFromLateFailingJavaCompletableFuture() { final CompletableFuture jFuture = Future. of(zZz(new RuntimeException())).toCompletableFuture(); - final Future future = Future.fromCompletableFuture(jFuture); - waitUntil(future::isCompleted); + final Future future = Future.fromCompletableFuture(jFuture).await(); assertFailed(future, RuntimeException.class); } @Test public void shouldCreateFutureFromJavaCompletableFutureUsingTrivialExecutorService() { - // Create slow-resolving Java future to show that the wrapping doesn't block - final java.util.concurrent.Future jFuture = generateJavaCompletableFuture("Result", 100); - final Future future = Future.fromJavaFuture(trivialExecutorService(), jFuture); - waitUntil(future::isCompleted); + final java.util.concurrent.Future jFuture = CompletableFuture.supplyAsync(() -> "Result"); + final Future future = Future.fromJavaFuture(trivialExecutorService(), jFuture).await(); assertCompleted(future, "Result"); } + // TODO: test the cases isDone(), isCompletedExceptionally(), isCancelled() + // -- static find() @Test @@ -220,7 +230,7 @@ public void shouldFindNoneWhenEmptySeq() { public void shouldFindFirstValueThatSatisfiesAPredicateUsingForkJoinPool() { final Seq> futures = Stream.from(1).map(i -> Future.of(() -> i)).take(20); final Future> testee = Future.find(futures, i -> i == 13); - waitUntil(testee::isCompleted); + testee.await(); assertCompleted(testee, Option.some(13)); } @@ -228,7 +238,7 @@ public void shouldFindFirstValueThatSatisfiesAPredicateUsingForkJoinPool() { public void shouldFailFindingFirstValueBecauseNoResultSatisfiesTheGivenPredicateUsingForkJoinPool() { final Seq> futures = Stream.from(1).map(i -> Future.of(() -> i)).take(20); final Future> testee = Future.find(futures, i -> false); - waitUntil(testee::isCompleted); + testee.await(); assertCompleted(testee, Option.none()); } @@ -241,7 +251,7 @@ public void shouldFindOneSucceedingFutureWhenAllOthersFailUsingDefaultExecutorSe .take(12) .append(Future.of(() -> 13)); final Future> testee = Future.find(futures, i -> i == 13); - waitUntil(testee::isCompleted); + testee.await(); assertCompleted(testee, Option.some(13)); } @@ -253,7 +263,7 @@ public void shouldFindNoneWhenAllFuturesFailUsingForkJoinPool() { })) .take(20); final Future> testee = Future.find(futures, i -> i == 13); - waitUntil(testee::isCompleted); + testee.await(); assertCompleted(testee, Option.none()); } @@ -263,7 +273,7 @@ public void shouldFindNoneWhenAllFuturesFailUsingForkJoinPool() { public void shouldGetFirstCompletedOfFailuresUsingForkJoinPool() { final Seq> futures = Stream.from(1).map(i -> Future.of(zZz(new Error()))).take(3); final Future testee = Future.firstCompletedOf(futures); - waitUntil(testee::isCompleted); + testee.await(); assertThat(testee.getValue().get().isFailure()).isTrue(); } @@ -279,14 +289,14 @@ public void shouldGetFirstCompletedOfSucceedingFuturesUsingForkJoinPool() { @Test public void shouldCreateFailFutureFromTry() { final Future future = Future.fromTry(Try.of(() -> { throw new Error(); })); - waitUntil(future::isCompleted); + future.await(); assertThat(future.isFailure()).isTrue(); } @Test public void shouldCreateSuccessFutureFromTry() { final Future future = Future.fromTry(Try.of(() -> 42)); - waitUntil(future::isCompleted); + future.await(); assertThat(future.get()).isEqualTo(42); } @@ -312,7 +322,7 @@ public void shouldCompleteWithFailureWhenExecutorServiceThrowsRejectedExecutionE } @Test - public void shouldCompleteOneFuturesUsingAThreadPoolExecutorLimitedToOneThread() { + public void shouldCompleteOneFutureUsingAThreadPoolExecutorLimitedToOneThread() { final ExecutorService service = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, new SynchronousQueue<>()); final Future future = Future.of(service, () -> expensiveOperation(1)).await(); assertCompleted(future, 1); @@ -325,7 +335,8 @@ public void shouldCompleteThreeFuturesUsingAThreadPoolExecutorLimitedToTwoThread final Stream> futures = Stream .rangeClosed(1, 3) .map(value -> Future.of(service, () -> expensiveOperation(value))); - futures.forEach(Future::await); + futures.forEach(future -> Try.run(future::await)); + futures.forEach(System.out::println); assertThat(futures.flatMap(Function.identity()).toList().sorted()).isEqualTo(List.of(1, 2, 3)); service.shutdown(); } @@ -347,8 +358,7 @@ public void shouldReduceSequenceOfFutures() { final Future future = Future.reduce( List.of(Future.of(zZz("Va")), Future.of(zZz("vr"))), (i1, i2) -> i1 + i2 - ); - waitUntil(future::isCompleted); + ).await(); assertThat(future.get()).isEqualTo("Vavr"); } @@ -357,8 +367,7 @@ public void shouldReduceWithErrorIfSequenceOfFuturesContainsOneError() { final Future future = Future.reduce( List.of(Future.of(zZz(13)), Future.of(zZz(new Error()))), (i1, i2) -> i1 + i2 - ); - waitUntil(future::isCompleted); + ).await(); assertFailed(future, Error.class); } @@ -367,8 +376,7 @@ public void shouldReduceWithErrorIfSequenceOfFuturesContainsOneError() { @Test public void shouldCompleteRunnable() { final int[] sideEffect = new int[] { 0 }; - final Future future = Future.run(() -> sideEffect[0] = 42); - waitUntil(future::isCompleted); + Future.run(() -> sideEffect[0] = 42).await(); assertThat(sideEffect[0]).isEqualTo(42); } @@ -378,8 +386,7 @@ public void shouldCompleteRunnable() { public void shouldCompleteWithSeqOfValueIfSequenceOfFuturesContainsNoError() { final Future> sequence = Future.sequence( List.of(Future.of(zZz(1)), Future.of(zZz(2))) - ); - waitUntil(sequence::isCompleted); + ).await(); assertThat(sequence.getValue().get()).isEqualTo(Try.success(Stream.of(1, 2))); } @@ -387,8 +394,7 @@ public void shouldCompleteWithSeqOfValueIfSequenceOfFuturesContainsNoError() { public void shouldCompleteWithErrorIfSequenceOfFuturesContainsOneError() { final Future> sequence = Future.sequence( List.of(Future.of(zZz(13)), Future.of(zZz(new Error()))) - ); - waitUntil(sequence::isCompleted); + ).await(); assertFailed(sequence, Error.class); } @@ -406,8 +412,7 @@ public void shouldCreateSuccessful() { @Test public void shouldCompleteTraverse() { - final Future> future = Future.traverse(List.of(1, 2, 3), i -> Future.of(zZz(i))); - waitUntil(future::isCompleted); + final Future> future = Future.traverse(List.of(1, 2, 3), i -> Future.of(zZz(i))).await(); assertThat(future.get()).isEqualTo(Stream.of(1, 2, 3)); } @@ -416,31 +421,26 @@ public void shouldCompleteTraverse() { @Test public void shouldCompleteWithErrorIfFailAndThenFail() { final Future future = Future. of(zZz(new Error("fail!"))) - .andThen(t -> zZz(new Error("and then fail!"))); - waitUntil(future::isCompleted); + .andThen(t -> zZz(new Error("and then fail!"))).await(); assertFailed(future, Error.class); } @Test public void shouldCompleteWithSuccessIfSuccessAndThenFail() { final Future future = Future.of(zZz(42)) - .andThen(t -> zZz(new Error("and then fail!"))); - waitUntil(future::isCompleted); + .andThen(t -> zZz(new Error("and then fail!"))).await(); assertThat(future.getValue().get()).isEqualTo(Try.success(42)); } @Test public void shouldCompleteWithSpecificOrderIfSuccessAndThenSuccess() { - final boolean[] lock = new boolean[] { true }; final int[] sideEffect = new int[] { 0 }; - final Future future = Future. of(() -> { - waitUntil(() -> !lock[0]); - return null; + final Future future = Future.run(() -> { + Thread.sleep(250); }).andThen(t -> sideEffect[0] = 42); assertThat(future.isCompleted()).isFalse(); assertThat(sideEffect[0]).isEqualTo(0); - lock[0] = false; - waitUntil(future::isCompleted); + future.await(); assertThat(sideEffect[0]).isEqualTo(42); } @@ -449,32 +449,28 @@ public void shouldCompleteWithSpecificOrderIfSuccessAndThenSuccess() { @Test public void shouldReturnSelfResultOnOrElseIfSuccess() { final Future f1 = Future.of(() -> "f1"); - final Future f2 = f1.orElse(Future.of(() -> "f2")); - waitUntil(f2::isCompleted); + final Future f2 = f1.orElse(Future.of(() -> "f2")).await(); assertThat(f2.get()).isEqualTo("f1"); } @Test public void shouldReturnSelfResultOnOrElseSupplierIfSuccess() { final Future f1 = Future.of(() -> "f1"); - final Future f2 = f1.orElse(() -> Future.of(() -> "f2")); - waitUntil(f2::isCompleted); + final Future f2 = f1.orElse(() -> Future.of(() -> "f2")).await(); assertThat(f2.get()).isEqualTo("f1"); } @Test public void shouldReturnOtherResultOnOrElseIfFailure() { final Future f1 = Future.failed(new RuntimeException()); - final Future f2 = f1.orElse(Future.of(() -> "f2")); - waitUntil(f2::isCompleted); + final Future f2 = f1.orElse(Future.of(() -> "f2")).await(); assertThat(f2.get()).isEqualTo("f2"); } @Test public void shouldReturnOtherResultOnOrElseSupplierIfFailure() { final Future f1 = Future.failed(new RuntimeException()); - final Future f2 = f1.orElse(() -> Future.of(() -> "f2")); - waitUntil(f2::isCompleted); + final Future f2 = f1.orElse(() -> Future.of(() -> "f2")).await(); assertThat(f2.get()).isEqualTo("f2"); } @@ -490,7 +486,7 @@ public void shouldAwaitOnGet() { } // -- await(timeout, timeunit) - + @Test public void shouldAwaitAndTimeout() { final long timeout = 100; @@ -510,7 +506,7 @@ public void shouldAwaitAndTimeout() { assertThat(future.getCause().get()).isInstanceOf(TimeoutException.class); assertThat(future.getCause().get().getMessage()).isEqualTo("timeout after 100 MILLISECONDS"); } - + @Test public void shouldHandleInterruptedExceptionCorrectlyInAwait() { final Future future = Future.run(() -> { throw new InterruptedException(); }); @@ -524,16 +520,14 @@ public void shouldHandleInterruptedExceptionCorrectlyInAwait() { @Test public void shouldConvertToFailedFromFail() { - final Future future = Future.of(zZz(new Error())).failed(); - waitUntil(future::isCompleted); + final Future future = Future.of(zZz(new Error())).failed().await(); assertThat(future.isSuccess()).isTrue(); assertThat(future.get().getClass()).isEqualTo(Error.class); } @Test public void shouldConvertToFailedFromSuccess() { - final Future future = Future.of(zZz(42)).failed(); - waitUntil(future::isCompleted); + final Future future = Future.of(zZz(42)).failed().await(); assertThat(future.isFailure()).isTrue(); assertThat(future.getValue().get().getCause().getClass()).isEqualTo(NoSuchElementException.class); } @@ -546,8 +540,7 @@ public void shouldFallbackToThisResult() { final Future that = Future.of(() -> { throw new Error(); }); - final Future testee = future.fallbackTo(that); - waitUntil(testee::isCompleted); + final Future testee = future.fallbackTo(that).await(); assertThat(testee.getValue().get()).isEqualTo(Try.success(1)); } @@ -557,8 +550,7 @@ public void shouldFallbackToThatResult() { throw new Error(); }); final Future that = Future.of(() -> 1); - final Future testee = future.fallbackTo(that); - waitUntil(testee::isCompleted); + final Future testee = future.fallbackTo(that).await(); assertThat(testee.getValue().get()).isEqualTo(Try.success(1)); } @@ -570,8 +562,7 @@ public void shouldFallbackToThisFailure() { final Future that = Future.of(() -> { throw new Error(); }); - final Future testee = future.fallbackTo(that); - waitUntil(testee::isCompleted); + final Future testee = future.fallbackTo(that).await(); final Try result = testee.getValue().get(); assertThat(result.isFailure()).isTrue(); assertThat(result.getCause().getMessage()).isEqualTo("ok"); @@ -582,46 +573,58 @@ public void shouldFallbackToThisFailure() { @Test public void shouldFoldEmptyIterable() { final Seq> futures = Stream.empty(); - final Future testee = Future.fold(futures, 0, (a, b) -> a + b); - waitUntil(testee::isCompleted); + final Future testee = Future.fold(futures, 0, (a, b) -> a + b).await(); assertThat(testee.getValue().get()).isEqualTo(Try.success(0)); } @Test public void shouldFoldNonEmptyIterableOfSucceedingFutures() { final Seq> futures = Stream.from(1).map(i -> Future.of(zZz(i))).take(5); - final Future testee = Future.fold(futures, 0, (a, b) -> a + b); - waitUntil(testee::isCompleted); + final Future testee = Future.fold(futures, 0, (a, b) -> a + b).await(); assertThat(testee.getValue().get()).isEqualTo(Try.success(15)); } @Test public void shouldFoldNonEmptyIterableOfFailingFutures() { final Seq> futures = Stream.from(1).map(i -> Future. of(zZz(new Error()))).take(5); - final Future testee = Future.fold(futures, 0, (a, b) -> a + b); - waitUntil(testee::isCompleted); + final Future testee = Future.fold(futures, 0, (a, b) -> a + b).await(); assertFailed(testee, Error.class); } // -- cancel() - @SuppressWarnings("InfiniteLoopStatement") @Test public void shouldInterruptLockedFuture() { - final Future future = Future.of(() -> { - while (true) { - Try.run(() -> Thread.sleep(100)); + final Object monitor = new Object(); + final AtomicBoolean running = new AtomicBoolean(false); + final Future future = blocking(() -> { + synchronized (monitor) { + running.set(true); + monitor.wait(); // wait forever } }); - future.cancel(); - waitUntil(future::isCompleted); - assertCancelled(future); + waitUntil(running::get); + synchronized (monitor) { + future.cancel(); + } + assertThat(future.isCancelled()).isTrue(); } @Test(expected = CancellationException.class) public void shouldThrowOnGetAfterCancellation() { - final Future future = Future.of(Concurrent::waitForever); - assertThat(future.cancel().isCancelled()).isTrue(); + final Object monitor = new Object(); + final AtomicBoolean running = new AtomicBoolean(false); + final Future future = blocking(() -> { + synchronized (monitor) { + running.set(true); + monitor.wait(); // wait forever + } + }); + waitUntil(running::get); + synchronized (monitor) { + future.cancel(); + } + assertThat(future.isCancelled()).isTrue(); future.get(); fail("Future was expected to throw on get() after cancellation!"); } @@ -631,24 +634,21 @@ public void shouldThrowOnGetAfterCancellation() { @Test public void shouldCollectDefinedValueUsingPartialFunction() { final PartialFunction pf = Function1. of(String::valueOf).partial(i -> i % 2 == 1); - final Future future = Future.of(zZz(3)).collect(pf); - waitUntil(future::isCompleted); + final Future future = Future.of(zZz(3)).collect(pf).await(); assertThat(future.getValue().get()).isEqualTo(Try.success("3")); } @Test public void shouldFilterNotDefinedValueUsingPartialFunction() { final PartialFunction pf = Function1. of(String::valueOf).partial(i -> i % 2 == 1); - final Future future = Future.of(zZz(2)).collect(pf); - waitUntil(future::isCompleted); + final Future future = Future.of(zZz(2)).collect(pf).await(); assertThat(future.getValue().get().isFailure()).isTrue(); } @Test public void shouldCollectEmptyFutureUsingPartialFunction() { final PartialFunction pf = Function1. of(String::valueOf).partial(i -> i % 2 == 1); - final Future future = Future. of(zZz(new Error())).collect(pf); - waitUntil(future::isCompleted); + final Future future = Future. of(zZz(new Error())).collect(pf).await(); assertThat(future.getValue().get().isFailure()).isTrue(); } @@ -673,8 +673,18 @@ public void shouldReturnExecutorService() { @Test public void shouldGetCauseOfUncompletedFuture() { - final Future future = Future.of(Concurrent::waitForever); + final AtomicBoolean running = new AtomicBoolean(false); + final Future future = Future.run(() -> { + synchronized (running) { + running.set(true); + running.wait(); + } + }); assertThat(future.getCause()).isEqualTo(Option.none()); + waitUntil(running::get); + synchronized (running) { + running.notify(); + } } @Test @@ -692,8 +702,18 @@ public void shouldThrowWhenGettingCauseOfSucceededFuture() { @Test public void shouldGetValueOfUncompletedFuture() { - final Future future = Future.of(Concurrent::waitForever); + final AtomicBoolean running = new AtomicBoolean(false); + final Future future = Future.run(() -> { + synchronized (running) { + running.set(true); + running.wait(); + } + }); assertThat(future.getValue()).isEqualTo(Option.none()); + waitUntil(running::get); + synchronized (running) { + running.notify(); + } } @Test @@ -730,8 +750,7 @@ public void shouldBeCompletedWhenFailed() { @Test public void shouldBeCompletedWhenResultIsPresent() { - final Future future = Future.of(() -> null); - waitUntil(future::isCompleted); + final Future future = Future.of(() -> null).await(); assertThat(future.isCompleted()).isTrue(); } @@ -748,54 +767,48 @@ public void shouldBeSuccessful() { public void shouldBeFailed() { assertThat(Future.failed(new Exception()).isFailure()).isTrue(); } - - // -- onComplete() - @Test - public void shouldExecuteOnCompleteOnCompletedFuture() { - final int[] actual = new int[] { 0 }; - Future.successful(1).onComplete(t -> { - actual[0] = t.get(); - }); - waitUntil(() -> actual[0] == 1); - } + // -- onComplete() @Test - public void shouldRegisterCallbackBeforeFutureCompletes() { + public void shouldRegisterCallbackBeforeFutureCompletes() throws InterruptedException { // instead of delaying we wait/notify - final Object lock = new Object(); + final Object lock1 = new Object(); + final Object lock2 = new Object(); final int[] actual = new int[] { -1 }; - final boolean[] futureWaiting = new boolean[] { false }; final int expected = 1; // create a future and put it to sleep final Future future = Future.of(() -> { - synchronized (lock) { - futureWaiting[0] = true; - lock.wait(); + synchronized(lock1) { + lock1.wait(); } return expected; }); - // give the future thread some time to sleep - waitUntil(() -> futureWaiting[0]); - // the future now is on hold and we have time to register a callback - future.onComplete(result -> actual[0] = result.get()); - assertThat(future.isCompleted()).isFalse(); - assertThat(actual[0]).isEqualTo(-1); + future.onComplete(result -> { + actual[0] = result.get(); + synchronized(lock2) { + lock2.notify(); + } + }); - // now awake the future - synchronized (lock) { - lock.notify(); + // this hinders the onComplete action to notify lock2 before we wait on lock2 + synchronized(lock2) { + synchronized (lock1) { + // now wake the future up + lock1.notify(); + } + lock2.wait(); } // give the future thread some time to complete - waitUntil(future::isCompleted); + future.await(); // the callback is also executed on its own thread - we have to wait for it to complete. - waitUntil(() -> actual[0] == expected); + assertThat(actual[0]).isEqualTo(expected); } @Test @@ -917,7 +930,7 @@ public void shouldTransform() { @Test public void shouldTransformResultFromSuccessToSuccess() { final Future future = Future.of(zZz(42)).transformValue(t -> Try.of(() -> "forty two")); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isSuccess); assertThat(future.get()).isEqualTo("forty two"); } @@ -925,7 +938,7 @@ public void shouldTransformResultFromSuccessToSuccess() { @Test public void shouldTransformResultFromSuccessToFailure() { final Future future = Future.of(zZz(42)).transformValue(t -> Try.failure(new Error())); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isFailure); assertThat(future.getCause().get().getClass()).isEqualTo(Error.class); } @@ -935,7 +948,7 @@ public void shouldTransformResultFromSuccessToFailureThroughError() { final Future future = Future.of(zZz(42)).transformValue(t -> Try.of(() -> { throw new ArithmeticException(); })); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isFailure); assertThat(future.getCause().get().getClass()).isEqualTo(ArithmeticException.class); } @@ -943,7 +956,7 @@ public void shouldTransformResultFromSuccessToFailureThroughError() { @Test public void shouldTransformResultFromFailureToSuccess() { final Future future = Future.of(zZz(new Error())).transformValue(t -> Try.of(() -> "forty two")); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isSuccess); assertThat(future.get()).isEqualTo("forty two"); } @@ -953,7 +966,7 @@ public void shouldTransformResultFromFailureToFailure() { final Future future = Future.of(() -> { throw new ArithmeticException(); }).transformValue(t -> Try.failure(new Error())); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isFailure); assertThat(future.getCause().get().getClass()).isEqualTo(Error.class); } @@ -963,7 +976,7 @@ public void shouldTransformResultFromFailureToFailureThroughError() { final Future future = Future.of(zZz(new Error())).transformValue(t -> Try.of(() -> { throw new ArithmeticException(); })); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isFailure); assertThat(future.getCause().get().getClass()).isEqualTo(ArithmeticException.class); } @@ -973,7 +986,7 @@ public void shouldTransformResultFromFailureToFailureThroughError() { @Test public void shouldZipSuccess() { final Future> future = Future.of(zZz(1)).zip(Future.of(zZz(2))); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isSuccess); assertThat(future.get()).isEqualTo(Tuple.of(1, 2)); } @@ -981,7 +994,7 @@ public void shouldZipSuccess() { @Test public void shouldZipFailure() { final Future> future = Future. of(zZz(new Error())).zip(Future.of(zZz(2))); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isFailure); assertThat(future.getCause().get().getClass()).isEqualTo(Error.class); } @@ -991,7 +1004,7 @@ public void shouldZipFailure() { @Test public void shouldZipWithSuccess() { final Future> future = Future.of(zZz(1)).zipWith(Future.of(zZz(2)), List::of); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isSuccess); assertThat(future.get()).isEqualTo(List.of(1, 2)); } @@ -999,7 +1012,7 @@ public void shouldZipWithSuccess() { @Test public void shouldZipWithFailure() { final Future> future = Future. of(zZz(new Error())).zipWith(Future.of(zZz(2)), List::of); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isFailure); assertThat(future.getCause().get().getClass()).isEqualTo(Error.class); } @@ -1009,7 +1022,7 @@ public void shouldZipWithCombinatorFailure() { final Future> future = Future.of(zZz(3)).zipWith(Future.of(zZz(2)), (t, u) -> { throw new RuntimeException(); }); - waitUntil(future::isCompleted); + future.await(); waitUntil(future::isFailure); assertThat(future.getCause().get().getClass()).isEqualTo(RuntimeException.class); } @@ -1017,23 +1030,28 @@ public void shouldZipWithCombinatorFailure() { // -- Value implementation @Test - public void shouldFilterFuture() { + public void shouldFilterFutureWhenFilterIsNotEmpty() { final Future future = Future.successful(42); assertThat(future.filter(i -> i == 42).get()).isEqualTo(42); + } + + @Test + public void shouldFilterFutureWhenFilterIsEmpty() { + final Future future = Future.successful(42); assertThat(future.filter(i -> i == 43).isEmpty()).isTrue(); } @Test public void shouldFlatMapFuture() { final Future future = Future.of(zZz(42)).flatMap(i -> Future.of(zZz(i * 2))); - waitUntil(future::isCompleted); + future.await(); assertThat(future.get()).isEqualTo(84); } @Test public void shouldMapFuture() { final Future future = Future.of(zZz(42)).map(i -> i * 2); - waitUntil(future::isCompleted); + future.await(); assertThat(future.get()).isEqualTo(84); } @@ -1062,14 +1080,14 @@ public void shouldReturnIterator() { @Test public void shouldMapTheHappyPath() { final Future testee = Future.of(zZz(1)).map(Object::toString); - waitUntil(testee::isCompleted); + testee.await(); assertCompleted(testee, "1"); } @Test public void shouldMapWhenCrashingDuringFutureComputation() { final Future testee = Future. of(zZz(new Error())).map(Object::toString); - waitUntil(testee::isCompleted); + testee.await(); assertFailed(testee, Error.class); } @@ -1078,7 +1096,7 @@ public void shouldMapWhenCrashingDuringMapping() { final Future testee = Future.of(zZz(1)).map(i -> { throw new IllegalStateException(); }); - waitUntil(testee::isCompleted); + testee.await(); assertFailed(testee, IllegalStateException.class); } @@ -1087,14 +1105,14 @@ public void shouldMapWhenCrashingDuringMapping() { @Test public void shouldMapTryTheHappyPath() { final Future testee = Future.of(zZz(1)).mapTry(Object::toString); - waitUntil(testee::isCompleted); + testee.await(); assertCompleted(testee, "1"); } @Test public void shouldMapTryWhenCrashingDuringFutureComputation() { final Future testee = Future. of(zZz(new Error())).mapTry(Object::toString); - waitUntil(testee::isCompleted); + testee.await(); assertFailed(testee, Error.class); } @@ -1103,7 +1121,7 @@ public void shouldMapTryWhenCrashingDuringMapping() { final Future testee = Future.of(zZz(1)).mapTry(i -> { throw new IOException(); }); - waitUntil(testee::isCompleted); + testee.await(); assertFailed(testee, IOException.class); } @@ -1123,11 +1141,6 @@ public void shouldRecognizeUnequalObjects() { // -- (helpers) - // checks the invariant for cancelled state - private void assertCancelled(Future future) { - assertFailed(future, CancellationException.class); - } - private void assertFailed(Future future, Class exception) { assertThat(future.isCompleted()).isTrue(); assertThat(future.getValue().get().failed().get()).isExactlyInstanceOf(exception); @@ -1139,19 +1152,39 @@ private void assertCompleted(Future future, T value) { assertThat(future.getValue()).isEqualTo(Option.some(Try.success(value))); } - private java.util.concurrent.Future generateJavaFuture(T value, int waitPeriod) { - return generateJavaCompletableFuture(value, waitPeriod); + private static Future blocking(CheckedRunnable computation) { + return blocking(() -> { + computation.run(); + return null; + }); } - private java.util.concurrent.CompletableFuture generateJavaCompletableFuture(T value, int waitPeriod) { - return CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(waitPeriod); - } catch (InterruptedException e) { - e.printStackTrace(); + private static Future blocking(CheckedFunction0 computation) { + return Future.of(() -> { + final AtomicReference result = new AtomicReference<>(null); + final AtomicReference errorRef = new AtomicReference<>(null); + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() { + boolean releasable = false; + @Override + public boolean block() throws InterruptedException { + try { + result.set(computation.apply()); + } catch(Throwable x) { + errorRef.set(x); + } + return releasable = true; + } + @Override + public boolean isReleasable() { + return releasable; + } + }); + final Throwable error = errorRef.get(); + if (error != null) { + throw error; + } else { + return result.get(); } - - return value; }); } diff --git a/vavr/src/test/java/io/vavr/concurrent/PromiseTest.java b/vavr/src/test/java/io/vavr/concurrent/PromiseTest.java deleted file mode 100644 index 80f9910883..0000000000 --- a/vavr/src/test/java/io/vavr/concurrent/PromiseTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* __ __ __ __ __ ___ - * \ \ / / \ \ / / __/ - * \ \/ / /\ \ \/ / / - * \____/__/ \__\____/__/ - * - * Copyright 2014-2017 Vavr, http://vavr.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.vavr.concurrent; - -import io.vavr.control.Try; -import org.junit.Test; - -import static io.vavr.concurrent.Concurrent.zZz; -import static org.assertj.core.api.Assertions.assertThat; - -public class PromiseTest { - - @Test - public void shouldReturnExecutorService() { - final Promise promise = Promise.successful(42); - assertThat(promise.executorService()).isNotNull(); - } - - @Test - public void shouldReturnSuccessfulPromise() { - final Promise promise = Promise.successful(42); - assertThat(promise.isCompleted()).isTrue(); - assertThat(promise.future().isSuccess()).isTrue(); - } - - @Test - public void shouldReturnFailedPromise() { - final Promise promise = Promise.failed(new RuntimeException()); - assertThat(promise.isCompleted()).isTrue(); - assertThat(promise.future().isFailure()).isTrue(); - } - - @Test - public void shouldReturnPromiseFromTry() { - final Promise promise = Promise.fromTry(Try.of(() -> 42)); - assertThat(promise.isCompleted()).isTrue(); - assertThat(promise.future().isSuccess()).isTrue(); - } - - @Test(expected = IllegalStateException.class) - public void shouldFailWhenCompleteAgain() { - Promise.successful(42).complete(Try.success(0)); - } - - @Test - public void shouldTrySuccess() { - final Promise promise = Promise.make(); - assertThat(promise.trySuccess(42)).isTrue(); - assertThat(promise.trySuccess(42)).isFalse(); - assertThat(promise.future().get()).isEqualTo(42); - } - - @Test - public void shouldTryFailure() { - final Promise promise = Promise.make(); - assertThat(promise.tryFailure(new RuntimeException())).isTrue(); - assertThat(promise.tryFailure(new RuntimeException())).isFalse(); - assertThat(promise.future().isFailure()).isTrue(); - } - - @Test - public void shouldConvertToString() { - assertThat(Promise.successful("vavr").toString().contains("vavr")).isTrue(); - } - - @Test - public void shouldCompletePromiseWithItsOwnFuture() { - final Promise promise = Promise.make(ExecutorServices.trivialExecutorService()); - promise.completeWith(promise.future()); - assertThat(promise.isCompleted()).isFalse(); - assertThat(promise.success("ok").isCompleted()).isTrue(); - } - - @Test - public void shouldMediateProducerConsumerViaPromise() { - - final String product = "Coffee"; - - class Context { - - String produceSomething() { - Concurrent.zZz(); - System.out.println("Making " + product); - return product; - } - - void continueDoingSomethingUnrelated() { - System.out.println("Unreleated stuff"); - } - - void startDoingSomething() { - System.out.println("Something else"); - } - } - - final Context ctx = new Context(); - final Promise producerResult = Promise.make(); - final Promise consumerResult = Promise.make(); - - // producer - Future.run(() -> { - producerResult.success(ctx.produceSomething()); - ctx.continueDoingSomethingUnrelated(); - }); - - // consumer - Future.run(() -> { - ctx.startDoingSomething(); - consumerResult.completeWith(producerResult.future()); - }); - - final String actual = consumerResult.future().get(); - assertThat(actual).isEqualTo(product); - } -} From db5ff1b117bbce23aac4243aa5a49d72530a9898 Mon Sep 17 00:00:00 2001 From: Daniel Dietrich Date: Mon, 25 Sep 2017 00:45:46 +0200 Subject: [PATCH 3/7] Changed unit test parallelism --- pom.xml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index be55a38c40..3738b4e840 100644 --- a/pom.xml +++ b/pom.xml @@ -305,10 +305,13 @@ We use these goals frequently to keep the dependencies and plugins up-to-date: - + -Djava.util.concurrent.ForkJoinPool.common.parallelism=1 + none + From fae5531b3fa9c4a3d943044c53f8965e7af7291b Mon Sep 17 00:00:00 2001 From: Daniel Dietrich Date: Mon, 25 Sep 2017 00:59:40 +0200 Subject: [PATCH 4/7] putting blocking code in a managed context --- vavr/src/test/java/io/vavr/concurrent/FutureTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java index 2dd98c9f06..500b1f1806 100644 --- a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java +++ b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java @@ -674,7 +674,7 @@ public void shouldReturnExecutorService() { @Test public void shouldGetCauseOfUncompletedFuture() { final AtomicBoolean running = new AtomicBoolean(false); - final Future future = Future.run(() -> { + final Future future = blocking(() -> { synchronized (running) { running.set(true); running.wait(); @@ -703,7 +703,7 @@ public void shouldThrowWhenGettingCauseOfSucceededFuture() { @Test public void shouldGetValueOfUncompletedFuture() { final AtomicBoolean running = new AtomicBoolean(false); - final Future future = Future.run(() -> { + final Future future = blocking(() -> { synchronized (running) { running.set(true); running.wait(); @@ -780,7 +780,7 @@ public void shouldRegisterCallbackBeforeFutureCompletes() throws InterruptedExce final int expected = 1; // create a future and put it to sleep - final Future future = Future.of(() -> { + final Future future = blocking(() -> { synchronized(lock1) { lock1.wait(); } From f82777ecc505d5802a7b43c3e1644e48747a45fa Mon Sep 17 00:00:00 2001 From: Daniel Dietrich Date: Mon, 25 Sep 2017 10:28:55 +0200 Subject: [PATCH 5/7] Optimized promise creation by introducing synchronous join() operation --- pom.xml | 6 +- .../main/java/io/vavr/concurrent/Future.java | 96 +++++++++++------ .../java/io/vavr/concurrent/FutureImpl.java | 101 +++++++++++------- 3 files changed, 132 insertions(+), 71 deletions(-) diff --git a/pom.xml b/pom.xml index 3738b4e840..141c688771 100644 --- a/pom.xml +++ b/pom.xml @@ -305,13 +305,13 @@ We use these goals frequently to keep the dependencies and plugins up-to-date: - -Djava.util.concurrent.ForkJoinPool.common.parallelism=1 - none all 4 true - --> diff --git a/vavr/src/main/java/io/vavr/concurrent/Future.java b/vavr/src/main/java/io/vavr/concurrent/Future.java index df9b4e748d..78244232e7 100644 --- a/vavr/src/main/java/io/vavr/concurrent/Future.java +++ b/vavr/src/main/java/io/vavr/concurrent/Future.java @@ -106,7 +106,7 @@ static Future failed(Throwable exception) { static Future failed(ExecutorService executorService, Throwable exception) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(exception, "exception is null"); - return new FutureImpl<>(executorService, Try.failure(exception)); + return FutureImpl.of(executorService, Try.failure(exception)); } /** @@ -146,7 +146,7 @@ static Future> find(ExecutorService executorService, Iterable { + return join(executorService, tryComplete -> { final AtomicBoolean completed = new AtomicBoolean(false); final AtomicInteger count = new AtomicInteger(list.length()); list.forEach(future -> future.onComplete(result -> { @@ -196,7 +196,7 @@ static Future firstCompletedOf(Iterable> fu static Future firstCompletedOf(ExecutorService executorService, Iterable> futures) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(futures, "futures is null"); - return promise(executorService, tryComplete -> futures.forEach(future -> future.onComplete(tryComplete::test))); + return join(executorService, tryComplete -> futures.forEach(future -> future.onComplete(tryComplete::test))); } /** @@ -300,7 +300,7 @@ static Future fromCompletableFuture(ExecutorService executorService, Comp if (future.isDone() || future.isCompletedExceptionally() || future.isCancelled()) { return fromTry(Try.of(future::get).mapFailure(Throwable::getCause)); } else { - return promise(executorService, tryComplete -> + return join(executorService, tryComplete -> future.handle((t, err) -> tryComplete.test((err == null) ? Try.success(t) : Try.failure(err))) ); } @@ -330,7 +330,7 @@ static Future fromTry(Try result) { static Future fromTry(ExecutorService executorService, Try result) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(result, "result is null"); - return new FutureImpl<>(executorService, result); + return FutureImpl.of(executorService, result); } /** @@ -371,15 +371,32 @@ static Future of(CheckedFunction0 computation) { static Future of(ExecutorService executorService, CheckedFunction0 computation) { Objects.requireNonNull(executorService, "executorService is null"); Objects.requireNonNull(computation, "computation is null"); - return promise(executorService, tryComplete -> tryComplete.test(Try.of(computation))); + return FutureImpl.async(executorService, tryComplete -> tryComplete.test(Try.of(computation))); } /** - * Creates a Future based on a promise in the way, that the given computation requires - * to complete the Future. - *

+ * Creates a (possibly blocking) Future that joins the results of the given {@code computation} + * using a completion handler: + * + *

{@code
+     * CheckedConsumer>> computation = tryComplete -> {
+     *     // computation
+     * };
+     * }
+ * + * The {@code computation} is executed synchronously. It requires to complete the returned Future. + * A common use-case is to hand over the {@code tryComplete} predicate to another {@code Future} + * in order to prevent blocking: + * + *
{@code
+     * Future greeting(Future nameFuture) {
+     *     return Future.join(tryComplete -> {
+     *         nameFuture.onComplete(name -> tryComplete.test("Hi " + name));
+     *     });
+     * }}
+ * * The computation receives a {@link Predicate}, named {@code tryComplete} by convention, - * that takes a result of {@code Try} and return a boolean that states whether the + * that takes a result of type {@code Try} and returns a boolean that states whether the * Future was completed. *

* Future completion is an idempotent operation in the way that the first call of {@code tryComplete} @@ -389,16 +406,33 @@ static Future of(ExecutorService executorService, CheckedFunction0 Type of the result * @return a new {@code Future} instance */ - static Future promise(CheckedConsumer>> computation) { - return promise(DEFAULT_EXECUTOR_SERVICE, computation); + static Future join(CheckedConsumer>> computation) { + return join(DEFAULT_EXECUTOR_SERVICE, computation); } /** - * Creates a Future based on a promise in the way, that the given computation requires - * to complete the Future. - *

+ * Creates a (possibly blocking) Future that joins the results of the given {@code computation} + * using a completion handler: + * + *

{@code
+     * CheckedConsumer>> computation = tryComplete -> {
+     *     // computation
+     * };
+     * }
+ * + * The {@code computation} is executed synchronously. It requires to complete the returned Future. + * A common use-case is to hand over the {@code tryComplete} predicate to another {@code Future} + * in order to prevent blocking: + * + *
{@code
+     * Future greeting(Future nameFuture) {
+     *     return Future.join(tryComplete -> {
+     *         nameFuture.onComplete(name -> tryComplete.test("Hi " + name));
+     *     });
+     * }}
+ * * The computation receives a {@link Predicate}, named {@code tryComplete} by convention, - * that takes a result of {@code Try} and return a boolean that states whether the + * that takes a result of type {@code Try} and returns a boolean that states whether the * Future was completed. *

* Future completion is an idempotent operation in the way that the first call of {@code tryComplete} @@ -409,8 +443,8 @@ static Future promise(CheckedConsumer>> comput * @param Type of the result * @return a new {@code Future} instance */ - static Future promise(ExecutorService executorService, CheckedConsumer>> computation) { - return new FutureImpl<>(executorService, computation); + static Future join(ExecutorService executorService, CheckedConsumer>> computation) { + return FutureImpl.sync(executorService, computation); } /** @@ -562,7 +596,7 @@ static Future successful(T result) { */ static Future successful(ExecutorService executorService, T result) { Objects.requireNonNull(executorService, "executorService is null"); - return new FutureImpl<>(executorService, Try.success(result)); + return FutureImpl.of(executorService, Try.success(result)); } @Override @@ -634,7 +668,7 @@ static Future> traverse(ExecutorService executorService, Iterable< */ default Future andThen(Consumer> action) { Objects.requireNonNull(action, "action is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(t -> { Try.run(() -> action.accept(t)); tryComplete.test(t); @@ -712,7 +746,7 @@ default Future cancel() { */ default Future collect(PartialFunction partialFunction) { Objects.requireNonNull(partialFunction, "partialFunction is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(result -> tryComplete.test(result.collect(partialFunction))) ); } @@ -734,7 +768,7 @@ default Future collect(PartialFunction partialFun * @return A new Future which contains an exception at a point of time. */ default Future failed() { - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(result -> { if (result.isFailure()) { tryComplete.test(Try.success(result.getCause())); @@ -766,7 +800,7 @@ default Future failed() { */ default Future fallbackTo(Future that) { Objects.requireNonNull(that, "that is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(t -> { if (t.isSuccess()) { tryComplete.test(t); @@ -798,7 +832,7 @@ default Future filter(Predicate predicate) { */ default Future filterTry(CheckedPredicate predicate) { Objects.requireNonNull(predicate, "predicate is null"); - return promise(executorService(), tryComplete -> onComplete(result -> tryComplete.test(result.filterTry(predicate)))); + return join(executorService(), tryComplete -> onComplete(result -> tryComplete.test(result.filterTry(predicate)))); } /** @@ -917,7 +951,7 @@ default Future recover(Function f) { */ default Future recoverWith(Function> f) { Objects.requireNonNull(f, "f is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(t -> { if (t.isFailure()) { Try.run(() -> f.apply(t.getCause()).onComplete(tryComplete::test)) @@ -952,7 +986,7 @@ default U transform(Function, ? extends U> f) { */ default Future transformValue(Function, ? extends Try> f) { Objects.requireNonNull(f, "f is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(t -> Try.run(() -> tryComplete.test(f.apply(t))) .onFailure(x -> tryComplete.test(Try.failure(x))) ) @@ -992,7 +1026,7 @@ default Future> zip(Future that) { default Future zipWith(Future that, BiFunction combinator) { Objects.requireNonNull(that, "that is null"); Objects.requireNonNull(combinator, "combinator is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(res1 -> { if (res1.isFailure()) { tryComplete.test((Try.Failure) res1); @@ -1015,7 +1049,7 @@ default Future flatMap(Function> default Future flatMapTry(CheckedFunction1> mapper) { Objects.requireNonNull(mapper, "mapper is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(result -> result.mapTry(mapper) .onSuccess(future -> future.onComplete(tryComplete::test)) .onFailure(x -> tryComplete.test(Try.failure(x))) @@ -1042,7 +1076,6 @@ default void forEach(Consumer action) { * IMPORTANT! If the computation result is a {@link Try.Failure}, the underlying {@code cause} of type {@link Throwable} is thrown. * * @return The value of this {@code Future}. - * @throws InterruptedException if the current thread was interrupted while waiting for the value */ @Override default T get() { @@ -1063,7 +1096,6 @@ default boolean isAsync() { * Checks, if this future has a value. * * @return true, if this future succeeded with a value, false otherwise. - * @throws InterruptedException if the current thread was interrupted while waiting for the value */ @Override default boolean isEmpty() { @@ -1108,7 +1140,7 @@ default Future mapTry(CheckedFunction1 mapper) { default Future orElse(Future other) { Objects.requireNonNull(other, "other is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(result -> { if (result.isSuccess()) { tryComplete.test(result); @@ -1121,7 +1153,7 @@ default Future orElse(Future other) { default Future orElse(Supplier> supplier) { Objects.requireNonNull(supplier, "supplier is null"); - return promise(executorService(), tryComplete -> + return join(executorService(), tryComplete -> onComplete(result -> { if (result.isSuccess()) { tryComplete.test(result); diff --git a/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java b/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java index 90995387b9..5661889980 100644 --- a/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java +++ b/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java @@ -20,6 +20,7 @@ package io.vavr.concurrent; import io.vavr.CheckedConsumer; +import io.vavr.CheckedFunction1; import io.vavr.collection.Queue; import io.vavr.control.Option; import io.vavr.control.Try; @@ -59,7 +60,7 @@ final class FutureImpl implements Future { * Otherwise actions = null. */ @GuardedBy("lock") - private Queue>> actions; + private Queue>> actions; /** * The queue of waiters is filled when calling await() before the Future is completed or cancelled. @@ -78,50 +79,66 @@ final class FutureImpl implements Future { @GuardedBy("lock") private java.util.concurrent.Future job; + private FutureImpl(ExecutorService executorService, Option> value, Queue>> actions, Queue waiters, CheckedFunction1, java.util.concurrent.Future> jobFactory) { + this.executorService = executorService; + synchronized (lock) { + this.value = value; + this.actions = actions; + this.waiters = waiters; + try { + this.job = jobFactory.apply(this); + } catch(Throwable x) { + tryComplete(Try.failure(x)); + } + } + } + /** - * Creates a Future that is immediately completed with the given value. No task will be started. + * Creates a {@code FutureImpl} that is immediately completed with the given value. No task will be started. * * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions. * @param value the result of this Future */ @SuppressWarnings("unchecked") - FutureImpl(ExecutorService executorService, Try value) { - Objects.requireNonNull(executorService, "executorService is null"); - this.executorService = executorService; - this.value = Option.some((Try) value); - this.actions = null; - this.waiters = null; - this.job = null; + static FutureImpl of(ExecutorService executorService, Try value) { + return new FutureImpl<>(executorService, Option.some(Try.narrow(value)), null, null, ignored -> null); } /** - * Creates a Future and starts a task. + * Creates a {@code FutureImpl} that is eventually completed. + * The given {@code computation} is synchronously executed, no thread is started. * - * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions. - * @param computation A computation that receives a complete function + * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions. + * @param computation A non-blocking computation + * @param value type of the Future + * @return a new {@code FutureImpl} instance */ - FutureImpl(ExecutorService executorService, CheckedConsumer>> computation) { - Objects.requireNonNull(executorService, "executorService is null"); - this.executorService = executorService; - // the lock ensures that the task does not complete this Future before the constructor is finished - synchronized (lock) { - this.value = Option.none(); - this.actions = Queue.empty(); - this.waiters = Queue.empty(); + static FutureImpl sync(ExecutorService executorService, CheckedConsumer>> computation) { + // TODO: currently can't be cancelled because job is null + return new FutureImpl<>(executorService, Option.none(), Queue.empty(), Queue.empty(), future -> { + computation.accept(future::tryComplete); + return null; + }); + } + + /** + * Creates a {@code FutureImpl} that is eventually completed. + * The given {@code computation} is asynchronously executed, a new thread is started. + * + * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions. + * @param computation A (possibly blocking) computation + * @param value type of the Future + * @return a new {@code FutureImpl} instance + */ + static FutureImpl async(ExecutorService executorService, CheckedConsumer>> computation) { + // In a single-threaded context this Future may already have been completed during initialization. + return new FutureImpl<>(executorService, Option.none(), Queue.empty(), Queue.empty(), future -> executorService.submit(() -> { try { - // In a single-threaded context this Future may already have been completed during initialization. - // The worker thread completes this Future before it is done. - this.job = executorService.submit(() -> { - try { - computation.accept(this::tryComplete); - } catch (Throwable x) { - tryComplete(Try.failure(x)); - } - }); + computation.accept(future::tryComplete); } catch (Throwable x) { - tryComplete(Try.failure(x)); + future.tryComplete(Try.failure(x)); } - } + })); } @Override @@ -247,6 +264,7 @@ public boolean isCompleted() { return value.isDefined(); } + @SuppressWarnings("unchecked") @Override public Future onComplete(Consumer> action) { Objects.requireNonNull(action, "action is null"); @@ -257,7 +275,7 @@ public Future onComplete(Consumer> action) { if (isCompleted()) { perform(action); } else { - actions = actions.enqueue(action); + actions = actions.enqueue((Consumer>) action); } } } @@ -283,14 +301,13 @@ public String toString() { * @param value A Success containing a result or a Failure containing an Exception. * @throws IllegalStateException if the Future is already completed or cancelled. * @throws NullPointerException if the given {@code value} is null. - * @see FutureImpl#FutureImpl(ExecutorService, CheckedConsumer) */ private boolean tryComplete(Try value) { Objects.requireNonNull(value, "value is null"); if (isCompleted()) { return false; } else { - final Queue>> actions; + final Queue>> actions; final Queue waiters; // it is essential to make the completed state public *before* performing the actions synchronized (lock) { @@ -307,7 +324,7 @@ private boolean tryComplete(Try value) { } } if (waiters != null) { - waiters.forEach(LockSupport::unpark); + waiters.forEach(this::unlock); } if (actions != null) { actions.forEach(this::perform); @@ -319,6 +336,18 @@ private boolean tryComplete(Try value) { } private void perform(Consumer> action) { - Try.run(() -> executorService.execute(() -> action.accept(value.get()))); + try { + executorService.execute(() -> action.accept(value.get())); + } catch(Throwable x) { + // ignored + } + } + + private void unlock(Thread waiter) { + try { + LockSupport.unpark(waiter); + } catch(Throwable x) { + // ignored + } } } From 3d2e86e60b7409748f5ede89ef20973501b72dc2 Mon Sep 17 00:00:00 2001 From: Daniel Dietrich Date: Mon, 25 Sep 2017 13:38:03 +0200 Subject: [PATCH 6/7] hardened cancellation --- pom.xml | 6 ++- .../java/io/vavr/concurrent/FutureImpl.java | 37 +++++++++++++------ .../java/io/vavr/concurrent/FutureTest.java | 14 ++++++- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index 141c688771..04c9b3e481 100644 --- a/pom.xml +++ b/pom.xml @@ -306,12 +306,14 @@ We use these goals frequently to keep the dependencies and plugins up-to-date: Typically this is the number of cores by default. In the travis-ci build env this is currently set to 1. --> + none + diff --git a/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java b/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java index 5661889980..ef78cf6fb7 100644 --- a/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java +++ b/vavr/src/main/java/io/vavr/concurrent/FutureImpl.java @@ -37,6 +37,7 @@ * @param Result of the computation. * @author Daniel Dietrich */ +// TODO: where to use UncaughtExceptionHandler? final class FutureImpl implements Future { /** @@ -49,6 +50,12 @@ final class FutureImpl implements Future { */ private final Object lock = new Object(); + /** + * Indicates if this Future is cancelled + */ + @GuardedBy("lock") + private volatile boolean cancelled; + /** * Once the Future is completed, the value is defined. */ @@ -71,7 +78,6 @@ final class FutureImpl implements Future { /** * Once a computation is started via run(), job is defined and used to control the lifecycle of the computation. - * The job variable must not be set to null after it was defined. *

* The {@code java.util.concurrent.Future} is not intended to store the result of the computation, it is stored in * {@code value} instead. @@ -79,9 +85,11 @@ final class FutureImpl implements Future { @GuardedBy("lock") private java.util.concurrent.Future job; + // single constructor private FutureImpl(ExecutorService executorService, Option> value, Queue>> actions, Queue waiters, CheckedFunction1, java.util.concurrent.Future> jobFactory) { this.executorService = executorService; synchronized (lock) { + this.cancelled = false; this.value = value; this.actions = actions; this.waiters = waiters; @@ -114,7 +122,6 @@ static FutureImpl of(ExecutorService executorService, Try va * @return a new {@code FutureImpl} instance */ static FutureImpl sync(ExecutorService executorService, CheckedConsumer>> computation) { - // TODO: currently can't be cancelled because job is null return new FutureImpl<>(executorService, Option.none(), Queue.empty(), Queue.empty(), future -> { computation.accept(future::tryComplete); return null; @@ -235,11 +242,15 @@ public boolean isReleasable() { @Override public Future cancel(boolean mayInterruptIfRunning) { if (!isCompleted()) { - Try.of(() -> job.cancel(mayInterruptIfRunning)).onSuccess(cancelled -> { - if (cancelled) { - tryComplete(Try.failure(new CancellationException())); - } - }); + synchronized (lock) { + Try.of(() -> job == null || job.cancel(mayInterruptIfRunning)) + .recover(ignored -> job != null && job.isCancelled()) + .onSuccess(cancelled -> { + if (cancelled) { + this.cancelled = tryComplete(Try.failure(new CancellationException())); + } + }); + } } return this; } @@ -256,7 +267,7 @@ public Option> getValue() { @Override public boolean isCancelled() { - return job != null && job.isCancelled(); + return cancelled; } @Override @@ -287,8 +298,9 @@ public Future onComplete(Consumer> action) { @Override public String toString() { - final String value = (this.value == null || this.value.isEmpty()) ? "?" : this.value.get().toString(); - return stringPrefix() + "(" + value + ")"; + final Option> value = this.value; + final String s = (value == null || value.isEmpty()) ? "?" : value.get().toString(); + return stringPrefix() + "(" + s + ")"; } /** @@ -321,6 +333,7 @@ private boolean tryComplete(Try value) { this.value = Option.some(Try.narrow(value)); this.actions = null; this.waiters = null; + this.job = null; } } if (waiters != null) { @@ -339,7 +352,7 @@ private void perform(Consumer> action) { try { executorService.execute(() -> action.accept(value.get())); } catch(Throwable x) { - // ignored + // ignored // TODO: tell UncaughtExceptionHandler? } } @@ -347,7 +360,7 @@ private void unlock(Thread waiter) { try { LockSupport.unpark(waiter); } catch(Throwable x) { - // ignored + // ignored // TODO: tell UncaughtExceptionHandler? } } } diff --git a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java index 500b1f1806..72cbe4ab9d 100644 --- a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java +++ b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java @@ -336,7 +336,6 @@ public void shouldCompleteThreeFuturesUsingAThreadPoolExecutorLimitedToTwoThread .rangeClosed(1, 3) .map(value -> Future.of(service, () -> expensiveOperation(value))); futures.forEach(future -> Try.run(future::await)); - futures.forEach(System.out::println); assertThat(futures.flatMap(Function.identity()).toList().sorted()).isEqualTo(List.of(1, 2, 3)); service.shutdown(); } @@ -629,6 +628,19 @@ public void shouldThrowOnGetAfterCancellation() { fail("Future was expected to throw on get() after cancellation!"); } + @Test + public void shouldCancelJoinedFutureThatNeverCompletes() { + final Future future = Future.join(tryComplete -> { + // we break our promise, the Future is never completed + }); + + assertThat(future.isCompleted()).isFalse(); + assertThat(future.isCancelled()).isFalse(); + + assertThat(future.cancel().isCancelled()).isTrue(); + assertThat(future.isCompleted()).isTrue(); + } + // -- collect() @Test From 5a1ca80350182c9bfd57eb8f6df5b5670b860432 Mon Sep 17 00:00:00 2001 From: Daniel Dietrich Date: Mon, 25 Sep 2017 22:27:46 +0200 Subject: [PATCH 7/7] simplified tests --- pom.xml | 7 +-- .../java/io/vavr/concurrent/Concurrent.java | 8 ++-- .../java/io/vavr/concurrent/FutureTest.java | 47 ++++++------------- 3 files changed, 19 insertions(+), 43 deletions(-) diff --git a/pom.xml b/pom.xml index 04c9b3e481..b55e7ca74d 100644 --- a/pom.xml +++ b/pom.xml @@ -305,15 +305,10 @@ We use these goals frequently to keep the dependencies and plugins up-to-date: - - none - all 4 true - --> diff --git a/vavr/src/test/java/io/vavr/concurrent/Concurrent.java b/vavr/src/test/java/io/vavr/concurrent/Concurrent.java index 0dd47b1f9b..124b76484c 100644 --- a/vavr/src/test/java/io/vavr/concurrent/Concurrent.java +++ b/vavr/src/test/java/io/vavr/concurrent/Concurrent.java @@ -38,15 +38,15 @@ private Concurrent() { } static void waitUntil(Supplier condition) { - long nanos = 1L; + long millis = 1; boolean interrupted = false; while (!interrupted && !condition.get()) { - if (nanos > 1_000_000) { + if (millis > 4096) { fail("Condition not met."); } else { try { - Thread.sleep(nanos); - nanos = nanos << 1; + Thread.sleep(millis); + millis = millis << 1; } catch(InterruptedException x) { interrupted = true; } diff --git a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java index 72cbe4ab9d..740ebd39f5 100644 --- a/vavr/src/test/java/io/vavr/concurrent/FutureTest.java +++ b/vavr/src/test/java/io/vavr/concurrent/FutureTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Predicate; import static io.vavr.concurrent.Concurrent.waitUntil; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -497,10 +498,11 @@ public void shouldAwaitAndTimeout() { } }); final long start = System.currentTimeMillis(); - future.await(timeout, unit); + final Future returnedFuture = future.await(timeout, unit); final long stop = System.currentTimeMillis(); final long millis = unit.toMillis(timeout); assertThat(stop - start).isBetween(millis, millis + millis / 10); + assertThat(returnedFuture).isSameAs(future); assertThat(future.isFailure()).isTrue(); assertThat(future.getCause().get()).isInstanceOf(TimeoutException.class); assertThat(future.getCause().get().getMessage()).isEqualTo("timeout after 100 MILLISECONDS"); @@ -785,42 +787,21 @@ public void shouldBeFailed() { @Test public void shouldRegisterCallbackBeforeFutureCompletes() throws InterruptedException { - // instead of delaying we wait/notify - final Object lock1 = new Object(); - final Object lock2 = new Object(); - final int[] actual = new int[] { -1 }; - final int expected = 1; + final AtomicBoolean ok = new AtomicBoolean(false); + final AtomicReference>> computation = new AtomicReference<>(null); - // create a future and put it to sleep - final Future future = blocking(() -> { - synchronized(lock1) { - lock1.wait(); - } - return expected; - }); - - // the future now is on hold and we have time to register a callback - future.onComplete(result -> { - actual[0] = result.get(); - synchronized(lock2) { - lock2.notify(); - } - }); + // this computation never ends + final Future future = Future.join(computation::set); - // this hinders the onComplete action to notify lock2 before we wait on lock2 - synchronized(lock2) { - synchronized (lock1) { - // now wake the future up - lock1.notify(); - } - lock2.wait(); - } + // now we have time to register an onComplete handler + future.onComplete(result -> result.forEach(ok::set)); - // give the future thread some time to complete - future.await(); + // now we complete the future... + assertThat(future.isCompleted()).isFalse(); + assertThat(computation.get().test(Try.success(true))).isTrue(); - // the callback is also executed on its own thread - we have to wait for it to complete. - assertThat(actual[0]).isEqualTo(expected); + // ...and wait for the result + waitUntil(ok::get); } @Test