Skip to content

Commit

Permalink
Fail fast implementation for joinList
Browse files Browse the repository at this point in the history
  • Loading branch information
JoseAlavez committed May 3, 2020
1 parent 25bbd6e commit 5b82580
Show file tree
Hide file tree
Showing 10 changed files with 526 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
target/
pom.xml.releaseBackup
release.properties
*.iml
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ collection.stream()
.thenApply(this::consumeList)
```

To fail fast as soon as one of the asynchronous operations completed exceptionally, a `FailFast` behavior can be specified as parameter:
```java
collection.stream()
.map(this::someAsyncFunction)
.collect(CompletableFutures.joinList(new FailFastWithThrowable(TimeoutException.class)))
.thenApply(this::consumeList)
```

#### combine

If you want to combine more than two futures of different types, use the `combine` method:
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@
</roles>
<timezone>+1</timezone>
</developer>
<developer>
<id>JoseAlavez</id>
<name>Jose Alavez</name>
<email>414v32@gmail.com</email>
<url>https://github.com/JoseAlavez</url>
<roles>
<role>developer</role>
</roles>
<timezone>+2</timezone>
</developer>
</developers>

<dependencies>
Expand Down
90 changes: 88 additions & 2 deletions src/main/java/com/spotify/futures/CompletableFutures.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import javax.annotation.Nullable;

import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -65,6 +66,30 @@ private CompletableFutures() {
*/
public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {
return allAsList(stages, null);
}

/**
* Returns a new {@link CompletableFuture} which completes to a list of all values of its input
* stages, if all succeed. The list of results is in the same order as the input stages.
*
* <p> If any of the given stages complete exceptionally, then the returned future also does so,
* with a {@link CompletionException} holding this exception as its cause. Fail fast behavior can
* be adjusted by an implementation of {@link FailFast} as input parameter, so the combined
* completable future returns exceptionally, immediately after a stage completed exceptionally.
*
* <p> If no stages are provided, returns a future holding an empty list.
*
* @param stages the stages to combine
* @param failFast fail fast behavior to apply when combining stages
* @param <T> the common super-type of all of the input stages, that determines the monomorphic
* type of the output future
* @return a future that completes to a list of the results of the supplied stages
* @throws NullPointerException if the stages list or any of its elements are {@code null}
* @since 0.3.3
*/
public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages, @Nullable FailFast failFast) {
// We use traditional for-loops instead of streams here for performance reasons,
// see AllAsListBenchmark

Expand All @@ -73,8 +98,13 @@ public static <T> CompletableFuture<List<T>> allAsList(
for (int i = 0; i < stages.size(); i++) {
all[i] = stages.get(i).toCompletableFuture();
}
return CompletableFuture.allOf(all)
.thenApply(ignored -> {

CompletableFuture<Void> allOf = CompletableFuture.allOf(all);
if (failFast != null) {
setupFailFast(failFast, all, allOf);
}
return allOf.thenApply(
ignored -> {
final List<T> result = new ArrayList<>(all.length);
for (int i = 0; i < all.length; i++) {
result.add(all[i].join());
Expand All @@ -83,6 +113,27 @@ public static <T> CompletableFuture<List<T>> allAsList(
});
}

private static <T> void setupFailFast(
FailFast failFast, CompletableFuture<? extends T>[] all, CompletableFuture<Void> allOf) {
ExecutionMetadata executionMeta = new ExecutionMetadata(all);
for (int i = 0; i < all.length; i++) {
all[i].whenComplete(
(result, throwable) -> {
if (!allOf.isDone()
&& throwable != null
&& failFast.failFast(throwable, executionMeta)) {
allOf.completeExceptionally(failFast.withThrowable(throwable));

if (failFast.cancelAll()) {
for (int j = 0; j < all.length; j++) {
all[j].cancel(true);
}
}
}
});
}
}

/**
* Returns a new {@link CompletableFuture} which completes to a list of values of those input
* stages that succeeded. The list of results is in the same order as the input stages. For failed
Expand Down Expand Up @@ -125,6 +176,41 @@ public static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable th
return future;
}

/**
* Collect a stream of {@link CompletionStage}s into a single future holding a list of the joined
* entities.
*
* <p>Usage:
*
* <pre>{@code
* collection.stream()
* .map(this::someAsyncFunc)
* .collect(joinList(new FailFastWithThrowable(TimeoutException.class)))
* .thenApply(this::consumeList)
* }</pre>
*
* <p>The generated {@link CompletableFuture} will complete to a list of all entities, in the
* order they were encountered in the original stream. Similar to {@link
* CompletableFuture#allOf(CompletableFuture[])}, if any of the input futures complete
* exceptionally, then the returned CompletableFuture also does so, with a {@link
* CompletionException} holding this exception as its cause. Fail fast behavior can be adjusted by
* an implementation of {@link FailFast} as input parameter, so the combined completable future
* returns exceptionally, immediately after a stage completed exceptionally.
*
* @param failFast fail fast behavior to apply when combining stages
* @param <T> the common super-type of all of the input stages, that determines the
* monomorphic type of the output future
* @param <S> the implementation of {@link CompletionStage} that the stream contains
* @return a new {@link CompletableFuture} according to the rules outlined in the method
* description
* @throws NullPointerException if any future in the stream is {@code null}
* @since 0.3.3
*/
public static <T, S extends CompletionStage<? extends T>>
Collector<S, ?, CompletableFuture<List<T>>> joinList(@Nullable FailFast failFast) {
return collectingAndThen(toList(), stages -> allAsList(stages, failFast));
}

/**
* Collect a stream of {@link CompletionStage}s into a single future holding a list of the
* joined entities.
Expand Down
67 changes: 67 additions & 0 deletions src/main/java/com/spotify/futures/ExecutionMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.spotify.futures;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* Provides execution metadata in relation to a group of completable futures.
*
* @author Jose Alavez
* @since 0.3.3
*/
public class ExecutionMetadata {

private final CompletableFuture<?>[] all;

ExecutionMetadata(CompletableFuture<?>... all) {
this.all = all;
}

private int accumulate(Function<CompletableFuture<?>, Boolean> fn) {
int accumulator = 0;
for (int i = 0; i < all.length; i++) {
if (fn.apply(all[i])) {
accumulator++;
}
}
return accumulator;
}

/**
* Returns the total of completable futures being combined.
*/
public int getTotal() {
return all.length;
}

/**
* Returns the total of completable futures that have been completed normally, exceptionally or
* via cancellation.
*
* @see CompletableFuture#isDone()
*/
public int getDone() {
return accumulate(CompletableFuture::isDone);
}

/**
* Returns the total of completable futures that have been completed normally.
*
* @see CompletableFuture#isDone()
* @see CompletableFuture#isCompletedExceptionally()
*/
public int getCompletedNormally() {
return accumulate(
completableFuture ->
completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
}

/**
* Returns the total of completable futures that have been completed exceptionally, in any way.
*
* @see CompletableFuture#isCompletedExceptionally()
*/
public int getCompletedExceptionally() {
return accumulate(CompletableFuture::isCompletedExceptionally);
}
}
40 changes: 40 additions & 0 deletions src/main/java/com/spotify/futures/FailFast.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.spotify.futures;

/**
* Interface that defines fail fast behavior when combining several completion stages in a
* completable future.
*
* @author Jose Alavez
* @since 0.3.3
*/
public interface FailFast {

/**
* Evaluates if the {@code throwable} should lead to a fast failure when combining completion
* stages together. A fast failure will not wait for all the combined stages to complete normally
* or exceptionally.
*
* @param throwable A {@link Throwable} returned by a exceptionally completed stage.
* @param executionMetadata A {@link ExecutionMetadata} that provides data related to current
* execution.
* @return {@code true} if the combined completable future should fail fast, {@code false} if
* otherwise.
*/
boolean failFast(Throwable throwable, ExecutionMetadata executionMetadata);

/**
* Specifies a {@link Throwable} instance to return when a combined completable future fail fast.
*
* @param origin A {@link Throwable} that originated the fail fast.
* @return A {@link Throwable} to used to complete exceptionally a combined completable future.
*/
Throwable withThrowable(Throwable origin);

/**
* Defines if all the incomplete stages should be cancelled when failing fast.
*
* @return {@code true} if all the incomplete stages should be cancelled when failing fast, {@code
* false} if otherwise.
*/
boolean cancelAll();
}
59 changes: 59 additions & 0 deletions src/main/java/com/spotify/futures/FailFastWithThrowable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.spotify.futures;

import static java.util.Arrays.asList;

import java.util.Collection;

/**
* Fail fast implementation that completes exceptionally when one of the stages completes
* exceptionally matching a specific throwable class.
*
* <p>This implementation maintains the originating {@link Throwable} causing the fast failure and
* propagates it through the combined completable future.
*
* <p>This implementation cancels all the remaining incomplete completable futures when failing
* fast.
*
* @author Jose Alavez
* @see FailFast
* @since 0.3.3
*/
public class FailFastWithThrowable implements FailFast {

private final Collection<Class<? extends Throwable>> classes;

/**
* @see FailFastWithThrowable#FailFastWithThrowable(java.util.Collection)
*/
@SafeVarargs
public FailFastWithThrowable(Class<? extends Throwable>... throwableClasses) {
this(asList(throwableClasses));
}

/**
* @param throwableClasses {@link Collection} of throwable classes to fail fast.
*/
public FailFastWithThrowable(Collection<Class<? extends Throwable>> throwableClasses) {
this.classes = throwableClasses;
}

@Override
public boolean cancelAll() {
return true;
}

@Override
public Throwable withThrowable(Throwable origin) {
return origin;
}

@Override
public boolean failFast(Throwable throwable, ExecutionMetadata executionMetadata) {

if (throwable.getCause() != null) {
return classes.contains(throwable.getCause().getClass());
}

return classes.contains(throwable.getClass());
}
}
Loading

0 comments on commit 5b82580

Please sign in to comment.