Skip to content

Commit

Permalink
[FUTURES] Added Futures.whenAllComplete(..).run(..) to Futures.java
Browse files Browse the repository at this point in the history
Since runnable cannot throw checked exception, you cannot use getDone when defining your runnable which is a shame :(

RELNOTES:
  - Add a run method to FutureCombiner to allow passing a runnable to Futures.whenAllComplete and Futures.whenAllSucceed

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=177663488
  • Loading branch information
ratteler50 authored and cpovirk committed Dec 4, 2017
1 parent d453cf1 commit de28fd8
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,141 @@ public ListenableFuture<String> call() throws Exception {
gotException.await();
}

public void testWhenAllComplete_runnableResult() throws Exception {
final SettableFuture<Integer> futureInteger = SettableFuture.create();
final SettableFuture<Boolean> futureBoolean = SettableFuture.create();
final String[] result = new String[1];
Runnable combiner =
new Runnable() {
@Override
public void run() {
assertTrue(futureInteger.isDone());
assertTrue(futureBoolean.isDone());
result[0] =
createCombinedResult(
Futures.getUnchecked(futureInteger), Futures.getUnchecked(futureBoolean));
}
};

ListenableFuture<?> futureResult =
whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor());
Integer integerPartial = 1;
futureInteger.set(integerPartial);
Boolean booleanPartial = true;
futureBoolean.set(booleanPartial);
futureResult.get();
assertEquals(createCombinedResult(integerPartial, booleanPartial), result[0]);
}

public void testWhenAllComplete_runnableError() throws Exception {
final RuntimeException thrown = new RuntimeException("test");

final SettableFuture<Integer> futureInteger = SettableFuture.create();
final SettableFuture<Boolean> futureBoolean = SettableFuture.create();
Runnable combiner =
new Runnable() {
@Override
public void run() {
assertTrue(futureInteger.isDone());
assertTrue(futureBoolean.isDone());
throw thrown;
}
};

ListenableFuture<?> futureResult =
whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor());
Integer integerPartial = 1;
futureInteger.set(integerPartial);
Boolean booleanPartial = true;
futureBoolean.set(booleanPartial);

try {
getDone(futureResult);
fail();
} catch (ExecutionException expected) {
assertSame(thrown, expected.getCause());
}
}

@GwtIncompatible // threads

public void testWhenAllCompleteRunnable_resultCanceledWithoutInterrupt_doesNotInterruptRunnable()
throws Exception {
SettableFuture<String> stringFuture = SettableFuture.create();
SettableFuture<Boolean> booleanFuture = SettableFuture.create();
final CountDownLatch inFunction = new CountDownLatch(1);
final CountDownLatch shouldCompleteFunction = new CountDownLatch(1);
final CountDownLatch combinerCompletedWithoutInterrupt = new CountDownLatch(1);
Runnable combiner =
new Runnable() {
@Override
public void run() {
inFunction.countDown();
try {
shouldCompleteFunction.await();
combinerCompletedWithoutInterrupt.countDown();
} catch (InterruptedException e) {
// Ensure the thread's interrupt status is preserved.
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
};

ListenableFuture<?> futureResult =
whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor());

stringFuture.set("value");
booleanFuture.set(true);
inFunction.await();
futureResult.cancel(false);
shouldCompleteFunction.countDown();
try {
futureResult.get();
fail();
} catch (CancellationException expected) {
}
combinerCompletedWithoutInterrupt.await();
}

@GwtIncompatible // threads

public void testWhenAllCompleteRunnable_resultCanceledWithInterrupt_InterruptsRunnable()
throws Exception {
SettableFuture<String> stringFuture = SettableFuture.create();
SettableFuture<Boolean> booleanFuture = SettableFuture.create();
final CountDownLatch inFunction = new CountDownLatch(1);
final CountDownLatch gotException = new CountDownLatch(1);
Runnable combiner =
new Runnable() {
@Override
public void run() {
inFunction.countDown();
try {
new CountDownLatch(1).await(); // wait for interrupt
} catch (InterruptedException expected) {
// Ensure the thread's interrupt status is preserved.
Thread.currentThread().interrupt();
gotException.countDown();
}
}
};

ListenableFuture<?> futureResult =
whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor());

stringFuture.set("value");
booleanFuture.set(true);
inFunction.await();
futureResult.cancel(true);
try {
futureResult.get();
fail();
} catch (CancellationException expected) {
}
gotException.await();
}

public void testWhenAllSucceed() throws Exception {
class PartialResultException extends Exception {

Expand Down
27 changes: 21 additions & 6 deletions android/guava/src/com/google/common/util/concurrent/Futures.java
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner) {
*
* <p>Canceling this future will attempt to cancel all the component futures.
*/
@CanIgnoreReturnValue
@CanIgnoreReturnValue // TODO(cpovirk): Remove this
public <C> ListenableFuture<C> call(Callable<C> combiner, Executor executor) {
return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner);
}
Expand All @@ -1003,17 +1003,32 @@ public <C> ListenableFuture<C> call(Callable<C> combiner, Executor executor) {
* ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is
* scheduled to be removed in April 2018.
*/
@CanIgnoreReturnValue
@CanIgnoreReturnValue // TODO(cpovirk): Remove this
@Deprecated
public <C> ListenableFuture<C> call(Callable<C> combiner) {
return call(combiner, directExecutor());
}

/*
* TODO(cpovirk): Evaluate demand for a run(Runnable) version. Would it allow us to remove
* @CanIgnoreReturnValue from the call() methods above?
* https://github.com/google/guava/issues/2371
/**
* Creates the {@link ListenableFuture} which will return the result of running {@code combiner}
* when all Futures complete. {@code combiner} will run using {@code executor}.
*
* <p>If the combiner throws a {@code CancellationException}, the returned future will be
* cancelled.
*
* <p>Canceling this Future will attempt to cancel all the component futures.
*/
public ListenableFuture<?> run(final Runnable combiner, Executor executor) {
return call(
new Callable<Void>() {
@Override
public Void call() throws Exception {
combiner.run();
return null;
}
},
executor);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3096,6 +3096,60 @@ public void testWhenAllComplete_asyncResult() throws Exception {
}
}

public void testWhenAllComplete_runnableError() throws Exception {
com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest();
testCase.setUp();
Throwable failure = null;
try {
testCase.testWhenAllComplete_runnableError();
} catch (Throwable t) {
failure = t;
}
try {
testCase.tearDown();
} catch (Throwable t) {
if (failure == null) {
failure = t;
}
}
if (failure instanceof Exception) {
throw (Exception) failure;
}
if (failure instanceof Error) {
throw (Error) failure;
}
if (failure != null) {
throw new RuntimeException(failure);
}
}

public void testWhenAllComplete_runnableResult() throws Exception {
com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest();
testCase.setUp();
Throwable failure = null;
try {
testCase.testWhenAllComplete_runnableResult();
} catch (Throwable t) {
failure = t;
}
try {
testCase.tearDown();
} catch (Throwable t) {
if (failure == null) {
failure = t;
}
}
if (failure instanceof Exception) {
throw (Exception) failure;
}
if (failure instanceof Error) {
throw (Error) failure;
}
if (failure != null) {
throw new RuntimeException(failure);
}
}

public void testWhenAllComplete_wildcard() throws Exception {
com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest();
testCase.setUp();
Expand Down
135 changes: 135 additions & 0 deletions guava-tests/test/com/google/common/util/concurrent/FuturesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,141 @@ public ListenableFuture<String> call() throws Exception {
gotException.await();
}

public void testWhenAllComplete_runnableResult() throws Exception {
final SettableFuture<Integer> futureInteger = SettableFuture.create();
final SettableFuture<Boolean> futureBoolean = SettableFuture.create();
final String[] result = new String[1];
Runnable combiner =
new Runnable() {
@Override
public void run() {
assertTrue(futureInteger.isDone());
assertTrue(futureBoolean.isDone());
result[0] =
createCombinedResult(
Futures.getUnchecked(futureInteger), Futures.getUnchecked(futureBoolean));
}
};

ListenableFuture<?> futureResult =
whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor());
Integer integerPartial = 1;
futureInteger.set(integerPartial);
Boolean booleanPartial = true;
futureBoolean.set(booleanPartial);
futureResult.get();
assertEquals(createCombinedResult(integerPartial, booleanPartial), result[0]);
}

public void testWhenAllComplete_runnableError() throws Exception {
final RuntimeException thrown = new RuntimeException("test");

final SettableFuture<Integer> futureInteger = SettableFuture.create();
final SettableFuture<Boolean> futureBoolean = SettableFuture.create();
Runnable combiner =
new Runnable() {
@Override
public void run() {
assertTrue(futureInteger.isDone());
assertTrue(futureBoolean.isDone());
throw thrown;
}
};

ListenableFuture<?> futureResult =
whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor());
Integer integerPartial = 1;
futureInteger.set(integerPartial);
Boolean booleanPartial = true;
futureBoolean.set(booleanPartial);

try {
getDone(futureResult);
fail();
} catch (ExecutionException expected) {
assertSame(thrown, expected.getCause());
}
}

@GwtIncompatible // threads

public void testWhenAllCompleteRunnable_resultCanceledWithoutInterrupt_doesNotInterruptRunnable()
throws Exception {
SettableFuture<String> stringFuture = SettableFuture.create();
SettableFuture<Boolean> booleanFuture = SettableFuture.create();
final CountDownLatch inFunction = new CountDownLatch(1);
final CountDownLatch shouldCompleteFunction = new CountDownLatch(1);
final CountDownLatch combinerCompletedWithoutInterrupt = new CountDownLatch(1);
Runnable combiner =
new Runnable() {
@Override
public void run() {
inFunction.countDown();
try {
shouldCompleteFunction.await();
combinerCompletedWithoutInterrupt.countDown();
} catch (InterruptedException e) {
// Ensure the thread's interrupt status is preserved.
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
};

ListenableFuture<?> futureResult =
whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor());

stringFuture.set("value");
booleanFuture.set(true);
inFunction.await();
futureResult.cancel(false);
shouldCompleteFunction.countDown();
try {
futureResult.get();
fail();
} catch (CancellationException expected) {
}
combinerCompletedWithoutInterrupt.await();
}

@GwtIncompatible // threads

public void testWhenAllCompleteRunnable_resultCanceledWithInterrupt_InterruptsRunnable()
throws Exception {
SettableFuture<String> stringFuture = SettableFuture.create();
SettableFuture<Boolean> booleanFuture = SettableFuture.create();
final CountDownLatch inFunction = new CountDownLatch(1);
final CountDownLatch gotException = new CountDownLatch(1);
Runnable combiner =
new Runnable() {
@Override
public void run() {
inFunction.countDown();
try {
new CountDownLatch(1).await(); // wait for interrupt
} catch (InterruptedException expected) {
// Ensure the thread's interrupt status is preserved.
Thread.currentThread().interrupt();
gotException.countDown();
}
}
};

ListenableFuture<?> futureResult =
whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor());

stringFuture.set("value");
booleanFuture.set(true);
inFunction.await();
futureResult.cancel(true);
try {
futureResult.get();
fail();
} catch (CancellationException expected) {
}
gotException.await();
}

public void testWhenAllSucceed() throws Exception {
class PartialResultException extends Exception {

Expand Down
Loading

0 comments on commit de28fd8

Please sign in to comment.