diff --git a/javaslang/src/main/java/javaslang/concurrent/FutureImpl.java b/javaslang/src/main/java/javaslang/concurrent/FutureImpl.java index 58e3c1393d..0b2458aaf8 100644 --- a/javaslang/src/main/java/javaslang/concurrent/FutureImpl.java +++ b/javaslang/src/main/java/javaslang/concurrent/FutureImpl.java @@ -17,8 +17,13 @@ /** * INTERNAL API - This class is subject to change. - * - * {@link Future} implementation, for internal use only. + *

+ * Once a {@code FutureImpl} is created, one (and only one) of the following methods is called + * to complete it with a result: + *

*

* Lifecycle of a {@code FutureImpl}: *

@@ -65,25 +70,25 @@ final class FutureImpl implements Future { /** * Once the Future is completed, the value is defined. - * - * @@GuardedBy("lock") */ + @GuardedBy("lock") private volatile Option> value = Option.none(); /** * The queue of actions is filled when calling onComplete() before the Future is completed or cancelled. * Otherwise actions = null. - * - * @@GuardedBy("lock") */ + @GuardedBy("lock") private Queue>> actions = Queue.empty(); /** * Once a computation is started via run(), job is defined and used to control the lifecycle of the computation. - * - * @@GuardedBy("lock") + *

+ * The {@code java.util.concurrent.Future} is not intended to store the result of the computation, it is stored in + * {@code value} instead. */ - private java.util.concurrent.Future> job = null; + @GuardedBy("lock") + private java.util.concurrent.Future job = null; /** * Creates a Future, {@link #run(CheckedSupplier)} has to be called separately. @@ -159,6 +164,14 @@ public Future onComplete(Consumer> action) { return this; } + // This class is MUTABLE and therefore CANNOT CHANGE DEFAULT equals() and hashCode() behavior. + // See http://stackoverflow.com/questions/4718009/mutable-objects-and-hashcode + + @Override + public String toString() { + return stringPrefix() + "(" + value.map(String::valueOf).getOrElse("?") + ")"; + } + /** * Runs a computation using the underlying ExecutorService. *

@@ -176,15 +189,16 @@ void run(CheckedSupplier computation) { if (isCompleted()) { throw new IllegalStateException("The Future is completed."); } - // if the ExecutorService runs the computation - // - in a different thread, the lock ensures that the job is assigned before the computation completes - // - in the current thread, the job is already completed and the `job` variable remains null try { - final java.util.concurrent.Future> tmpJob = executorService.submit(() -> complete(Try.of(computation))); + // if the ExecutorService runs the computation + // - in a different thread, the lock ensures that the job is assigned before the computation completes + // - in the current thread, the job is already completed and the `job` variable remains null + final java.util.concurrent.Future tmpJob = executorService.submit(() -> complete(Try.of(computation))); if (!isCompleted()) { job = tmpJob; } } catch (Throwable t) { + // ensures that the Future completes if the `executorService.submit()` method throws if (!isCompleted()) { complete(Try.failure(t)); } @@ -192,6 +206,18 @@ void run(CheckedSupplier computation) { } } + boolean tryComplete(Try value) { + Objects.requireNonNull(value, "value is null"); + synchronized (lock) { + if (isCompleted()) { + return false; + } else { + complete(value); + return true; + } + } + } + /** * Completes this Future with a value. *

@@ -202,8 +228,7 @@ void run(CheckedSupplier computation) { * @throws IllegalStateException if the Future is already completed or cancelled. * @throws NullPointerException if the given {@code value} is null. */ - @SuppressWarnings("unchecked") - Try complete(Try value) { + private void complete(Try value) { Objects.requireNonNull(value, "value is null"); final Queue>> actions; // it is essential to make the completed state public *before* performing the actions @@ -212,35 +237,14 @@ Try complete(Try value) { throw new IllegalStateException("The Future is completed."); } actions = this.actions; - this.value = Option.some((Try) value); + this.value = Option.some(Try.narrow(value)); this.actions = null; this.job = null; } actions.forEach(this::perform); - return (Try) value; - } - - boolean tryComplete(Try value) { - Objects.requireNonNull(value, "value is null"); - synchronized (lock) { - if (isCompleted()) { - return false; - } else { - complete(value); - return true; - } - } } private void perform(Consumer> action) { Try.run(() -> executorService.execute(() -> action.accept(value.get()))); } - - // This class is MUTABLE and therefore CANNOT CHANGE DEFAULT equals() and hashCode() behavior. - // See http://stackoverflow.com/questions/4718009/mutable-objects-and-hashcode - - @Override - public String toString() { - return stringPrefix() + "(" + value.map(String::valueOf).getOrElse("?") + ")"; - } } diff --git a/javaslang/src/main/java/javaslang/concurrent/GuardedBy.java b/javaslang/src/main/java/javaslang/concurrent/GuardedBy.java new file mode 100644 index 0000000000..d477f1e4a5 --- /dev/null +++ b/javaslang/src/main/java/javaslang/concurrent/GuardedBy.java @@ -0,0 +1,38 @@ +/* / \____ _ _ ____ ______ / \ ____ __ _______ + * / / \/ \ / \/ \ / /\__\/ // \/ \ // /\__\ JΛVΛSLΛNG + * _/ / /\ \ \/ / /\ \\__\\ \ // /\ \ /\\/ \ /__\ \ Copyright 2014-2016 Javaslang, http://javaslang.io + * /___/\_/ \_/\____/\_/ \_/\__\/__/\__\_/ \_// \__/\_____/ Licensed under the Apache License, Version 2.0 + */ +package javaslang.concurrent; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * INTERNAL API - Used for documentation purpose only. + *

+ * An annotated field or method must only be accessed when holding the lock specified in {@code value()}. + *

+ * This is an annotation known from Java Concurrency in Practice. + * See also JSR 305 + * + * @author Daniel Dietrich + * @since 2.1.0 + */ +@Documented +@Target(value = { FIELD, METHOD }) +@Retention(RUNTIME) +@interface GuardedBy { + + /** + * Specifies the lock that guards the annotated field or method. + * + * @return a valid lock that guards the annotated field or method + */ + String value(); +} diff --git a/javaslang/src/test/java/javaslang/concurrent/ExecutorServices.java b/javaslang/src/test/java/javaslang/concurrent/ExecutorServices.java new file mode 100644 index 0000000000..547b49aece --- /dev/null +++ b/javaslang/src/test/java/javaslang/concurrent/ExecutorServices.java @@ -0,0 +1,153 @@ +/* / \____ _ _ ____ ______ / \ ____ __ _______ + * / / \/ \ / \/ \ / /\__\/ // \/ \ // /\__\ JΛVΛSLΛNG + * _/ / /\ \ \/ / /\ \\__\\ \ // /\ \ /\\/ \ /__\ \ Copyright 2014-2016 Javaslang, http://javaslang.io + * /___/\_/ \_/\____/\_/ \_/\__\/__/\__\_/ \_// \__/\_____/ Licensed under the Apache License, Version 2.0 + */ +package javaslang.concurrent; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; + +final class ExecutorServices { + + private static final ExecutorService TRIVIAL_EXECUTOR_SERVICE = new TrivialExecutorService(); + + private static final ExecutorService REJECTING_EXECUTOR_SERVICE = new RejectingExecutorService(); + + private ExecutorServices() { + } + + static ExecutorService trivialExecutorService() { + return TRIVIAL_EXECUTOR_SERVICE; + } + + static ExecutorService rejectingExecutorService() { + return REJECTING_EXECUTOR_SERVICE; + } + + private static final class TrivialExecutorService extends AbstractExecutorService { + + @Override + public java.util.concurrent.Future submit(Callable task) { + try { + return new ImmediatelyDoneFuture<>(task.call()); + } catch (Exception x) { + throw new IllegalStateException("Error calling task.", x); + } + } + + private static class ImmediatelyDoneFuture implements java.util.concurrent.Future { + + final T value; + + ImmediatelyDoneFuture(T value) { + this.value = value; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public T get() { + return value; + } + + @Override + public T get(long timeout, TimeUnit unit) { + return value; + } + } + } + + private static final class RejectingExecutorService extends AbstractExecutorService { + + @Override + public java.util.concurrent.Future submit(Callable task) { + throw new RejectedExecutionException(); + } + } + + private static abstract class AbstractExecutorService implements ExecutorService { + + private boolean shutdown = false; + + @Override + public abstract java.util.concurrent.Future submit(Callable task); + + @Override + public java.util.concurrent.Future submit(Runnable task) { + return submit(task, null); + } + + @Override + public java.util.concurrent.Future submit(Runnable task, T result) { + return submit(() -> { + task.run(); + return result; + }); + } + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public void shutdown() { shutdown = true; } + + @Override + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public boolean isTerminated() { + return isShutdown(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return true; + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } + } +} diff --git a/javaslang/src/test/java/javaslang/concurrent/FutureTest.java b/javaslang/src/test/java/javaslang/concurrent/FutureTest.java index 574b8e05d8..dbadb95f7b 100644 --- a/javaslang/src/test/java/javaslang/concurrent/FutureTest.java +++ b/javaslang/src/test/java/javaslang/concurrent/FutureTest.java @@ -15,13 +15,18 @@ import javaslang.control.Option; import javaslang.control.Try; import org.assertj.core.api.IterableAssert; +import org.junit.Ignore; import org.junit.Test; import java.util.NoSuchElementException; import java.util.concurrent.*; +import java.util.function.Function; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static javaslang.concurrent.Concurrent.waitUntil; import static javaslang.concurrent.Concurrent.zZz; +import static javaslang.concurrent.ExecutorServices.rejectingExecutorService; +import static javaslang.concurrent.ExecutorServices.trivialExecutorService; import static org.assertj.core.api.Assertions.fail; public class FutureTest extends AbstractValueTest { @@ -44,12 +49,12 @@ public IterableAssert isEqualTo(Object expected) { @Override protected Future empty() { - return Future.failed(TrivialExecutorService.instance(), new Error()); + return Future.failed(trivialExecutorService(), new Error()); } @Override protected Future of(T element) { - return Future.of(TrivialExecutorService.instance(), () -> element); + return Future.of(trivialExecutorService(), () -> element); } @SafeVarargs @@ -96,7 +101,7 @@ public void shouldCreateAndFailAFutureUsingForkJoinPool() { @Test public void shouldCreateAndFailAFutureUsingTrivialExecutorService() { - final Future future = Future.of(TrivialExecutorService.instance(), () -> { + final Future future = Future.of(trivialExecutorService(), () -> { throw new Error(); }); assertFailed(future, Error.class); @@ -117,7 +122,7 @@ public void shouldCreateFutureFromJavaFuture() { public void shouldCreateFutureFromJavaFutureUsingTrivialExecutorService() { // Create slow-resolving Java future to show that the wrapping doesn't block java.util.concurrent.Future jFuture = generateJavaFuture("Result", 3000); - final Future future = Future.fromJavaFuture(TrivialExecutorService.instance(), jFuture); + final Future future = Future.fromJavaFuture(trivialExecutorService(), jFuture); waitUntil(future::isCompleted); assertCompleted(future, "Result"); } @@ -147,7 +152,7 @@ public void shouldFailFindingFirstValueBecauseNoResultSatisfiesTheGivenPredicate } @Test - public void shouldFindOneSucceedingFutureWhenAllOthersFailUsingForkJoinPool() { + public void shouldFindOneSucceedingFutureWhenAllOthersFailUsingDefaultExecutorService() { final Seq> futures = Stream.from(1) .map(i -> Future. of(() -> { throw new Error(); @@ -209,23 +214,52 @@ public void shouldCreateSuccessFutureFromTry() { @Test public void shouldCreateAndCompleteAFutureUsingTrivialExecutorService() { - final Future future = Future.of(TrivialExecutorService.instance(), () -> 1); + final Future future = Future.of(trivialExecutorService(), () -> 1); assertCompleted(future, 1); } @Test public void shouldNotCancelCompletedFutureUsingTrivialExecutorService() { - final Future future = Future.of(TrivialExecutorService.instance(), () -> 1); + final Future future = Future.of(trivialExecutorService(), () -> 1); assertThat(future.cancel()).isFalse(); assertCompleted(future, 1); } @Test public void shouldCompleteWithFailureWhenExecutorServiceThrowsRejectedExecutionException() { - final Future future = Future.of(RejectingExecutorService.instance(), () -> 1); + final Future future = Future.of(rejectingExecutorService(), () -> 1); assertFailed(future, RejectedExecutionException.class); } + // TODO: Re-enable this test when solving #1530 + @Ignore + @Test + public void shouldCompleteOneFuturesUsingAThreadPoolExecutorLimitedToOneThread() { + final ExecutorService service = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, new SynchronousQueue<>()); + final Future future = Future.of(service, () -> expensiveOperation(1)); + future.await(); + assertCompleted(future, 1); + service.shutdown(); + } + + // TODO: Re-enable this test when solving #1530 + @Ignore + @Test + public void shouldCompleteThreeFuturesUsingAThreadPoolExecutorLimitedToOneThread() { + final ExecutorService service = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, new SynchronousQueue<>()); + final Stream> futures = Stream + .rangeClosed(1, 3) + .map(value -> Future.of(service, () -> expensiveOperation(value))); + futures.forEach(Future::await); + assertThat(futures.flatMap(Function.identity()).toList().sorted()).isEqualTo(List.of(1, 2, 3)); + service.shutdown(); + } + + private static T expensiveOperation(T value) throws InterruptedException { + Thread.sleep(500); + return value; + } + // -- static reduce() @Test(expected = NoSuchElementException.class) @@ -590,7 +624,7 @@ public void shouldRegisterCallbackBeforeFutureCompletes() { @Test public void shouldPerformActionAfterFutureCompleted() { final int[] actual = new int[] { -1 }; - final Future future = Future.of(TrivialExecutorService.instance(), () -> 1); + final Future future = Future.of(trivialExecutorService(), () -> 1); assertCompleted(future, 1); assertThat(actual[0]).isEqualTo(-1); future.onComplete(result -> actual[0] = result.get()); diff --git a/javaslang/src/test/java/javaslang/concurrent/PromiseTest.java b/javaslang/src/test/java/javaslang/concurrent/PromiseTest.java index 40e2d82b09..beb986c081 100644 --- a/javaslang/src/test/java/javaslang/concurrent/PromiseTest.java +++ b/javaslang/src/test/java/javaslang/concurrent/PromiseTest.java @@ -9,6 +9,7 @@ import org.junit.Test; import static javaslang.concurrent.Concurrent.zZz; +import static javaslang.concurrent.ExecutorServices.trivialExecutorService; import static org.assertj.core.api.Assertions.assertThat; public class PromiseTest { @@ -68,7 +69,7 @@ public void shouldConvertToString() { @Test public void shouldCompletePromiseWithItsOwnFuture() { - final Promise promise = Promise.make(TrivialExecutorService.instance()); + final Promise promise = Promise.make(trivialExecutorService()); promise.completeWith(promise.future()); assertThat(promise.isCompleted()).isFalse(); assertThat(promise.success("ok").isCompleted()).isTrue(); diff --git a/javaslang/src/test/java/javaslang/concurrent/RejectingExecutorService.java b/javaslang/src/test/java/javaslang/concurrent/RejectingExecutorService.java deleted file mode 100644 index c73a97960f..0000000000 --- a/javaslang/src/test/java/javaslang/concurrent/RejectingExecutorService.java +++ /dev/null @@ -1,94 +0,0 @@ -/* / \____ _ _ ____ ______ / \ ____ __ _______ - * / / \/ \ / \/ \ / /\__\/ // \/ \ // /\__\ JΛVΛSLΛNG - * _/ / /\ \ \/ / /\ \\__\\ \ // /\ \ /\\/ \ /__\ \ Copyright 2014-2016 Javaslang, http://javaslang.io - * /___/\_/ \_/\____/\_/ \_/\__\/__/\__\_/ \_// \__/\_____/ Licensed under the Apache License, Version 2.0 - */ -package javaslang.concurrent; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.*; - -/** - * A submitted Callable is immediately rejected - */ -final class RejectingExecutorService implements ExecutorService { - - private static final RejectingExecutorService INSTANCE = new RejectingExecutorService(); - - private RejectingExecutorService() { - } - - public static RejectingExecutorService instance() { - return INSTANCE; - } - - // -- relevant methods - - @Override - public java.util.concurrent.Future submit(Callable task) { - throw new RejectedExecutionException("test"); - } - - @Override - public void execute(Runnable command) { - command.run(); - } - - // -- not needed - - @Override - public void shutdown() { - throw new UnsupportedOperationException(); - } - - @Override - public List shutdownNow() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isShutdown() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isTerminated() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public java.util.concurrent.Future submit(Runnable task, T result) { - throw new UnsupportedOperationException(); - } - - @Override - public java.util.concurrent.Future submit(Runnable task) { - throw new UnsupportedOperationException(); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - throw new UnsupportedOperationException(); - } -} diff --git a/javaslang/src/test/java/javaslang/concurrent/TrivialExecutorService.java b/javaslang/src/test/java/javaslang/concurrent/TrivialExecutorService.java deleted file mode 100644 index ec2ecc3653..0000000000 --- a/javaslang/src/test/java/javaslang/concurrent/TrivialExecutorService.java +++ /dev/null @@ -1,135 +0,0 @@ -/* / \____ _ _ ____ ______ / \ ____ __ _______ - * / / \/ \ / \/ \ / /\__\/ // \/ \ // /\__\ JΛVΛSLΛNG - * _/ / /\ \ \/ / /\ \\__\\ \ // /\ \ /\\/ \ /__\ \ Copyright 2014-2016 Javaslang, http://javaslang.io - * /___/\_/ \_/\____/\_/ \_/\__\/__/\__\_/ \_// \__/\_____/ Licensed under the Apache License, Version 2.0 - */ -package javaslang.concurrent; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.*; - -/** - * A submitted Callable is immediately done (without running in a different thread). - */ -final class TrivialExecutorService implements ExecutorService { - - private static final TrivialExecutorService INSTANCE = new TrivialExecutorService(); - - private TrivialExecutorService() { - } - - public static TrivialExecutorService instance() { - return INSTANCE; - } - - // -- relevant methods - - @Override - public Done submit(Callable task) { - try { - return new Done<>(task.call()); - } catch (Exception x) { - throw new IllegalStateException("Error calling task.", x); - } - } - - - @Override - public void execute(Runnable command) { - command.run(); - } - - // -- not needed - - @Override - public void shutdown() { - throw new UnsupportedOperationException(); - } - - @Override - public List shutdownNow() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isShutdown() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isTerminated() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public java.util.concurrent.Future submit(Runnable task, T result) { - throw new UnsupportedOperationException(); - } - - @Override - public java.util.concurrent.Future submit(Runnable task) { - throw new UnsupportedOperationException(); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - throw new UnsupportedOperationException(); - } - - // -- immediately done future - - static class Done implements java.util.concurrent.Future { - - final T value; - - Done(T value) { - this.value = value; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - - @Override - public T get() { - return value; - } - - @Override - public T get(long timeout, TimeUnit unit) { - return value; - } - } -}