Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor changes to/housekeeping of FutureImpl. Added internal GuardedBy… #1533

Merged
merged 3 commits into from
Aug 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions javaslang/src/main/java/javaslang/concurrent/FutureImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

/**
* <strong>INTERNAL API - This class is subject to change.</strong>
*
* {@link Future} implementation, for internal use only.
* <p>
* Once a {@code FutureImpl} is created, one (and only one) of the following methods is called
* to complete it with a result:
* <ul>
* <li>{@link #run(CheckedSupplier)} - typically called within a {@code Future} factory method</li>
* <li>{@link #tryComplete(Try)} - explicit write operation, typically called by {@code Promise}</li>
* </ul>
* <p>
* <strong>Lifecycle of a {@code FutureImpl}:</strong>
* <p>
Expand Down Expand Up @@ -65,25 +70,25 @@ final class FutureImpl<T> implements Future<T> {

/**
* Once the Future is completed, the value is defined.
*
* @@GuardedBy("lock")
*/
@GuardedBy("lock")
private volatile Option<Try<T>> value = Option.none();

/**
* The queue of actions is filled when calling onComplete() before the Future is completed or cancelled.
* Otherwise actions = null.
*
* @@GuardedBy("lock")
*/
@GuardedBy("lock")
private Queue<Consumer<? super Try<T>>> actions = Queue.empty();

/**
* Once a computation is started via run(), job is defined and used to control the lifecycle of the computation.
*
* @@GuardedBy("lock")
* <p>
* The {@code java.util.concurrent.Future} is not intended to store the result of the computation, it is stored in
* {@code value} instead.
*/
private java.util.concurrent.Future<Try<T>> job = null;
@GuardedBy("lock")
private java.util.concurrent.Future<?> job = null;

/**
* Creates a Future, {@link #run(CheckedSupplier)} has to be called separately.
Expand Down Expand Up @@ -159,6 +164,14 @@ public Future<T> onComplete(Consumer<? super Try<T>> action) {
return this;
}

// This class is MUTABLE and therefore CANNOT CHANGE DEFAULT equals() and hashCode() behavior.
// See http://stackoverflow.com/questions/4718009/mutable-objects-and-hashcode

@Override
public String toString() {
return stringPrefix() + "(" + value.map(String::valueOf).getOrElse("?") + ")";
}

/**
* Runs a computation using the underlying ExecutorService.
* <p>
Expand All @@ -176,22 +189,35 @@ void run(CheckedSupplier<? extends T> computation) {
if (isCompleted()) {
throw new IllegalStateException("The Future is completed.");
}
// if the ExecutorService runs the computation
// - in a different thread, the lock ensures that the job is assigned before the computation completes
// - in the current thread, the job is already completed and the `job` variable remains null
try {
final java.util.concurrent.Future<Try<T>> tmpJob = executorService.submit(() -> complete(Try.of(computation)));
// if the ExecutorService runs the computation
// - in a different thread, the lock ensures that the job is assigned before the computation completes
// - in the current thread, the job is already completed and the `job` variable remains null
final java.util.concurrent.Future<?> tmpJob = executorService.submit(() -> complete(Try.of(computation)));
if (!isCompleted()) {
job = tmpJob;
}
} catch (Throwable t) {
// ensures that the Future completes if the `executorService.submit()` method throws
if (!isCompleted()) {
complete(Try.failure(t));
}
}
}
}

boolean tryComplete(Try<? extends T> value) {
Objects.requireNonNull(value, "value is null");
synchronized (lock) {
if (isCompleted()) {
return false;
} else {
complete(value);
return true;
}
}
}

/**
* Completes this Future with a value.
* <p>
Expand All @@ -202,8 +228,7 @@ void run(CheckedSupplier<? extends T> computation) {
* @throws IllegalStateException if the Future is already completed or cancelled.
* @throws NullPointerException if the given {@code value} is null.
*/
@SuppressWarnings("unchecked")
Try<T> complete(Try<? extends T> value) {
private void complete(Try<? extends T> value) {
Objects.requireNonNull(value, "value is null");
final Queue<Consumer<? super Try<T>>> actions;
// it is essential to make the completed state public *before* performing the actions
Expand All @@ -212,35 +237,14 @@ Try<T> complete(Try<? extends T> value) {
throw new IllegalStateException("The Future is completed.");
}
actions = this.actions;
this.value = Option.some((Try<T>) value);
this.value = Option.some(Try.narrow(value));
this.actions = null;
this.job = null;
}
actions.forEach(this::perform);
return (Try<T>) value;
}

boolean tryComplete(Try<? extends T> value) {
Objects.requireNonNull(value, "value is null");
synchronized (lock) {
if (isCompleted()) {
return false;
} else {
complete(value);
return true;
}
}
}

private void perform(Consumer<? super Try<T>> action) {
Try.run(() -> executorService.execute(() -> action.accept(value.get())));
}

// This class is MUTABLE and therefore CANNOT CHANGE DEFAULT equals() and hashCode() behavior.
// See http://stackoverflow.com/questions/4718009/mutable-objects-and-hashcode

@Override
public String toString() {
return stringPrefix() + "(" + value.map(String::valueOf).getOrElse("?") + ")";
}
}
38 changes: 38 additions & 0 deletions javaslang/src/main/java/javaslang/concurrent/GuardedBy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* / \____ _ _ ____ ______ / \ ____ __ _______
* / / \/ \ / \/ \ / /\__\/ // \/ \ // /\__\ JΛVΛSLΛNG
* _/ / /\ \ \/ / /\ \\__\\ \ // /\ \ /\\/ \ /__\ \ Copyright 2014-2016 Javaslang, http://javaslang.io
* /___/\_/ \_/\____/\_/ \_/\__\/__/\__\_/ \_// \__/\_____/ Licensed under the Apache License, Version 2.0
*/
package javaslang.concurrent;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* <strong>INTERNAL API - Used for documentation purpose only.</strong>
* <p>
* An annotated field or method must only be accessed when holding the lock specified in {@code value()}.
* <p>
* This is an annotation known from <a href="http://jcip.net">Java Concurrency in Practice</a>.
* See also <a href="https://jcp.org/en/jsr/detail?id=305">JSR 305</a>
*
* @author Daniel Dietrich
* @since 2.1.0
*/
@Documented
@Target(value = { FIELD, METHOD })
@Retention(RUNTIME)
@interface GuardedBy {

/**
* Specifies the lock that guards the annotated field or method.
*
* @return a valid lock that guards the annotated field or method
*/
String value();
}
153 changes: 153 additions & 0 deletions javaslang/src/test/java/javaslang/concurrent/ExecutorServices.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/* / \____ _ _ ____ ______ / \ ____ __ _______
* / / \/ \ / \/ \ / /\__\/ // \/ \ // /\__\ JΛVΛSLΛNG
* _/ / /\ \ \/ / /\ \\__\\ \ // /\ \ /\\/ \ /__\ \ Copyright 2014-2016 Javaslang, http://javaslang.io
* /___/\_/ \_/\____/\_/ \_/\__\/__/\__\_/ \_// \__/\_____/ Licensed under the Apache License, Version 2.0
*/
package javaslang.concurrent;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

final class ExecutorServices {

private static final ExecutorService TRIVIAL_EXECUTOR_SERVICE = new TrivialExecutorService();

private static final ExecutorService REJECTING_EXECUTOR_SERVICE = new RejectingExecutorService();

private ExecutorServices() {
}

static ExecutorService trivialExecutorService() {
return TRIVIAL_EXECUTOR_SERVICE;
}

static ExecutorService rejectingExecutorService() {
return REJECTING_EXECUTOR_SERVICE;
}

private static final class TrivialExecutorService extends AbstractExecutorService {

@Override
public <T> java.util.concurrent.Future<T> submit(Callable<T> task) {
try {
return new ImmediatelyDoneFuture<>(task.call());
} catch (Exception x) {
throw new IllegalStateException("Error calling task.", x);
}
}

private static class ImmediatelyDoneFuture<T> implements java.util.concurrent.Future<T> {

final T value;

ImmediatelyDoneFuture(T value) {
this.value = value;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

@Override
public T get() {
return value;
}

@Override
public T get(long timeout, TimeUnit unit) {
return value;
}
}
}

private static final class RejectingExecutorService extends AbstractExecutorService {

@Override
public <T> java.util.concurrent.Future<T> submit(Callable<T> task) {
throw new RejectedExecutionException();
}
}

private static abstract class AbstractExecutorService implements ExecutorService {

private boolean shutdown = false;

@Override
public abstract <T> java.util.concurrent.Future<T> submit(Callable<T> task);

@Override
public java.util.concurrent.Future<?> submit(Runnable task) {
return submit(task, null);
}

@Override
public <T> java.util.concurrent.Future<T> submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}

@Override
public void execute(Runnable command) {
command.run();
}

@Override
public void shutdown() { shutdown = true; }

@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}

@Override
public boolean isShutdown() {
return shutdown;
}

@Override
public boolean isTerminated() {
return isShutdown();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return true;
}

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
}
}
Loading