diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index fc9122f62d..14f1a4c5f0 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -31,6 +31,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; @@ -6415,6 +6416,105 @@ public final Flux retryWhen(Function, ? extends Publisher> return onAssembly(new FluxRetryWhen<>(this, whenFactory)); } + /** + * In case of error, retry this {@link Flux} up to {@code numRetries} times using a + * randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%} + * but the effective backoff delay cannot be less than {@code firstBackoff}. + *

+ * The randomized exponential backoff is good at preventing two typical issues with + * other simpler backoff strategies, namely: + *

+ * + * @param numRetries the maximum number of attempts before an {@link IllegalStateException} + * is raised (having the original retry-triggering exception as cause). + * @param firstBackoff the first backoff delay to apply then grow exponentially. Also + * minimum delay even taking jitter into account. + * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. + */ + public final Flux retryWithBackoff(long numRetries, Duration firstBackoff) { + return retryWithBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d); + } + + /** + * In case of error, retry this {@link Flux} up to {@code numRetries} times using a + * randomized exponential backoff strategy. The jitter factor is {@code 50%} + * but the effective backoff delay cannot be less than {@code firstBackoff} nor more + * than {@code maxBackoff}. +

+ * The randomized exponential backoff is good at preventing two typical issues with + * other simpler backoff strategies, namely: + *

    + *
  • + * having an exponentially growing backoff delay with a small initial delay gives + * the best tradeoff between not overwhelming the server and serving the client as + * fast as possible + *
  • + *
  • + * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" + * where eg. numerous clients would hit the server at the same time, causing it to + * display transient failures which would cause all clients to retry at the same + * backoff times, ultimately sparing no load on the server. + *
  • + *
+ * + * @param numRetries the maximum number of attempts before an {@link IllegalStateException} + * is raised (having the original retry-triggering exception as cause). + * @param firstBackoff the first backoff delay to apply then grow exponentially. Also + * minimum delay even taking jitter into account. + * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. + * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. + */ + public final Flux retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff) { + return retryWithBackoff(numRetries, firstBackoff, maxBackoff, 0.5d); + } + + /** + * In case of error, retry this {@link Flux} up to {@code numRetries} times using a + * randomized exponential backoff strategy, randomized with a user-provided jitter + * factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}). + * Even with the jitter, the effective backoff delay cannot be less than + * {@code firstBackoff} nor more than {@code maxBackoff}. +

+ * The randomized exponential backoff is good at preventing two typical issues with + * other simpler backoff strategies, namely: + *

    + *
  • + * having an exponentially growing backoff delay with a small initial delay gives + * the best tradeoff between not overwhelming the server and serving the client as + * fast as possible + *
  • + *
  • + * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" + * where eg. numerous clients would hit the server at the same time, causing it to + * display transient failures which would cause all clients to retry at the same + * backoff times, ultimately sparing no load on the server. + *
  • + *
+ * + * @param numRetries the maximum number of attempts before an {@link IllegalStateException} + * is raised (having the original retry-triggering exception as cause). + * @param firstBackoff the first backoff delay to apply then grow exponentially. Also + * minimum delay even taking jitter into account. + * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. + * @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0). + * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. + */ + public final Flux retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor) { + return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor)); + } + /** * Sample this {@link Flux} by periodically emitting an item corresponding to that * {@link Flux} latest emitted value within the periodical time window. diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxRetryWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxRetryWhen.java index 61aac1b2f4..c080e94182 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxRetryWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxRetryWhen.java @@ -15,7 +15,9 @@ */ package reactor.core.publisher; +import java.time.Duration; import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -241,4 +243,64 @@ public void subscribe(CoreSubscriber actual) { completionSignal.subscribe(actual); } } + + static Function, Publisher> randomExponentialBackoffFunction( + long numRetries, Duration firstBackoff, Duration maxBackoff, + double jitterFactor) { + if (jitterFactor < 0 || jitterFactor > 1) throw new IllegalArgumentException("jitterFactor must be between 0 and 1 (default 0.5)"); + Objects.requireNonNull(firstBackoff, "firstBackoff is required"); + Objects.requireNonNull(maxBackoff, "maxBackoff is required"); + + return t -> t.index() + .flatMap(t2 -> { + long iteration = t2.getT1(); + + if (iteration >= numRetries) { + return Mono.error(new IllegalStateException("Retries exhausted: " + iteration + "/" + numRetries, t2.getT2())); + } + + Duration nextBackoff; + try { + nextBackoff = firstBackoff.multipliedBy((long) Math.pow(2, iteration)); + if (nextBackoff.compareTo(maxBackoff) > 0) { + nextBackoff = maxBackoff; + } + } + catch (ArithmeticException overflow) { + nextBackoff = maxBackoff; + } + + //short-circuit delay == 0 case + if (nextBackoff.isZero()) { + return Mono.just(iteration); + } + + ThreadLocalRandom random = ThreadLocalRandom.current(); + + long jitterOffset; + try { + jitterOffset = nextBackoff.multipliedBy((long) (100 * jitterFactor)) + .dividedBy(100) + .toMillis(); + } + catch (ArithmeticException ae) { + jitterOffset = Math.round(Long.MAX_VALUE * jitterFactor); + } + long lowBound = Math.max(firstBackoff.minus(nextBackoff) + .toMillis(), -jitterOffset); + long highBound = Math.min(maxBackoff.minus(nextBackoff) + .toMillis(), jitterOffset); + + long jitter; + if (highBound == lowBound) { + if (highBound == 0) jitter = 0; + else jitter = random.nextLong(highBound); + } + else { + jitter = random.nextLong(lowBound, highBound); + } + Duration effectiveBackoff = nextBackoff.plusMillis(jitter); + return Mono.delay(effectiveBackoff); + }); + } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index c4ef03ac1b..3edb041d2a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -2923,6 +2923,105 @@ public final Mono retryWhen(Function, ? extends Publisher> return onAssembly(new MonoRetryWhen<>(this, whenFactory)); } + /** + * In case of error, retry this {@link Flux} up to {@code numRetries} times using a + * randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%} + * but the effective backoff delay cannot be less than {@code firstBackoff}. +

+ * The randomized exponential backoff is good at preventing two typical issues with + * other simpler backoff strategies, namely: + *

    + *
  • + * having an exponentially growing backoff delay with a small initial delay gives + * the best tradeoff between not overwhelming the server and serving the client as + * fast as possible + *
  • + *
  • + * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" + * where eg. numerous clients would hit the server at the same time, causing it to + * display transient failures which would cause all clients to retry at the same + * backoff times, ultimately sparing no load on the server. + *
  • + *
+ * + * @param numRetries the maximum number of attempts before an {@link IllegalStateException} + * is raised (having the original retry-triggering exception as cause). + * @param firstBackoff the first backoff delay to apply then grow exponentially. Also + * minimum delay even taking jitter into account. + * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. + */ + public final Mono retryWithBackoff(long numRetries, Duration firstBackoff) { + return retryWithBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d); + } + + /** + * In case of error, retry this {@link Flux} up to {@code numRetries} times using a + * randomized exponential backoff strategy. The jitter factor is {@code 50%} + * but the effective backoff delay cannot be less than {@code firstBackoff} nor more + * than {@code maxBackoff}. +

+ * The randomized exponential backoff is good at preventing two typical issues with + * other simpler backoff strategies, namely: + *

    + *
  • + * having an exponentially growing backoff delay with a small initial delay gives + * the best tradeoff between not overwhelming the server and serving the client as + * fast as possible + *
  • + *
  • + * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" + * where eg. numerous clients would hit the server at the same time, causing it to + * display transient failures which would cause all clients to retry at the same + * backoff times, ultimately sparing no load on the server. + *
  • + *
+ * + * @param numRetries the maximum number of attempts before an {@link IllegalStateException} + * is raised (having the original retry-triggering exception as cause). + * @param firstBackoff the first backoff delay to apply then grow exponentially. Also + * minimum delay even taking jitter into account. + * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. + * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. + */ + public final Mono retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff) { + return retryWithBackoff(numRetries, firstBackoff, maxBackoff, 0.5d); + } + + /** + * In case of error, retry this {@link Flux} up to {@code numRetries} times using a + * randomized exponential backoff strategy, randomized with a user-provided jitter + * factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}). + * Even with the jitter, the effective backoff delay cannot be less than + * {@code firstBackoff} nor more than {@code maxBackoff}. +

+ * The randomized exponential backoff is good at preventing two typical issues with + * other simpler backoff strategies, namely: + *

    + *
  • + * having an exponentially growing backoff delay with a small initial delay gives + * the best tradeoff between not overwhelming the server and serving the client as + * fast as possible + *
  • + *
  • + * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" + * where eg. numerous clients would hit the server at the same time, causing it to + * display transient failures which would cause all clients to retry at the same + * backoff times, ultimately sparing no load on the server. + *
  • + *
+ * + * @param numRetries the maximum number of attempts before an {@link IllegalStateException} + * is raised (having the original retry-triggering exception as cause). + * @param firstBackoff the first backoff delay to apply then grow exponentially. Also + * minimum delay even taking jitter into account. + * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. + * @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0). + * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. + */ + public final Mono retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor) { + return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor)); + } + /** * Expect exactly one item from this {@link Mono} source or signal * {@link java.util.NoSuchElementException} for an empty source. diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index 25f0a87a3c..7419e4d241 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -16,22 +16,28 @@ package reactor.core.publisher; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; +import org.assertj.core.api.LongAssert; +import org.assertj.core.data.Percentage; import org.junit.Test; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import reactor.util.context.Context; +import reactor.util.function.Tuple2; import reactor.util.function.Tuples; import static org.assertj.core.api.Assertions.assertThat; @@ -446,4 +452,131 @@ public void retryWhenContextTrigger_OriginalContextManuallyUpdated() { assertThat(contextPerRetry) .allMatch(ctx -> ctx.hasKey("thirdPartyContext")); } + + @Test + public void fluxRetryRandomBackoff() { + Exception exception = new IOException("boom retry"); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Flux.concat(Flux.range(0, 2), Flux.error(exception)) + .retryWithBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0.1) + .elapsed() + .doOnNext(elapsed -> { if (elapsed.getT2() == 0) elapsedList.add(elapsed.getT1());} ) + .map(Tuple2::getT2) + ) + .thenAwait(Duration.ofSeconds(2)) + .expectNext(0, 1) //normal output + .expectNext(0, 1, 0, 1, 0, 1, 0, 1) //4 retry attempts + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 4/4") + .hasCause(exception)); + + assertThat(elapsedList).hasSize(5); + assertThat(elapsedList, LongAssert.class).first() + .isEqualTo(0L); + assertThat(elapsedList, LongAssert.class).element(1) + .isCloseTo(100, Percentage.withPercentage(10)); + assertThat(elapsedList, LongAssert.class).element(2) + .isCloseTo(200, Percentage.withPercentage(10)); + assertThat(elapsedList, LongAssert.class).element(3) + .isCloseTo(400, Percentage.withPercentage(10)); + assertThat(elapsedList, LongAssert.class).element(4) + .isCloseTo(800, Percentage.withPercentage(10)); + } + + @Test + public void fluxRetryRandomBackoff_maxBackoffShaves() { + Exception exception = new IOException("boom retry"); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Flux.concat(Flux.range(0, 2), Flux.error(exception)) + .retryWithBackoff(4, Duration.ofMillis(100), Duration.ofMillis(220), 0.9) + .elapsed() + .doOnNext(elapsed -> { if (elapsed.getT2() == 0) elapsedList.add(elapsed.getT1());} ) + .map(Tuple2::getT2) + ) + .thenAwait(Duration.ofSeconds(2)) + .expectNext(0, 1) //normal output + .expectNext(0, 1, 0, 1, 0, 1, 0, 1) //4 retry attempts + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 4/4") + .hasCause(exception)); + + assertThat(elapsedList).hasSize(5); + assertThat(elapsedList, LongAssert.class) + .first() + .isEqualTo(0L); + assertThat(elapsedList, LongAssert.class) + .element(1) + .isGreaterThanOrEqualTo(100) //min backoff + .isCloseTo(100, Percentage.withPercentage(90)); + assertThat(elapsedList, LongAssert.class) + .element(2) + .isCloseTo(200, Percentage.withPercentage(90)) + .isGreaterThanOrEqualTo(100) + .isLessThanOrEqualTo(220); + assertThat(elapsedList, LongAssert.class) + .element(3) + .isGreaterThanOrEqualTo(100) + .isLessThanOrEqualTo(220); + assertThat(elapsedList, LongAssert.class) + .element(4) + .isGreaterThanOrEqualTo(100) + .isLessThanOrEqualTo(220); + } + + @Test + public void fluxRetryRandomBackoff_minBackoffFloor() { + for (int i = 0; i < 50; i++) { + Exception exception = new IOException("boom retry loop #" + i); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Flux.concat(Flux.range(0, 2), Flux.error(exception)) + .retryWithBackoff(1, Duration.ofMillis(100), Duration.ofMillis(2000), 0.9) + .elapsed() + .doOnNext(elapsed -> { if (elapsed.getT2() == 0) elapsedList.add(elapsed.getT1());} ) + .map(Tuple2::getT2) + ) + .thenAwait(Duration.ofSeconds(2)) + .expectNext(0, 1) //normal output + .expectNext(0, 1) //1 retry attempts + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 1/1") + .hasCause(exception)); + + assertThat(elapsedList).hasSize(2); + assertThat(elapsedList, LongAssert.class) + .first() + .isEqualTo(0L); + assertThat(elapsedList, LongAssert.class) + .element(1) + .isGreaterThanOrEqualTo(100) //min backoff + .isCloseTo(100, Percentage.withPercentage(90)); + } + } + + @Test + public void fluxRetryRandomBackoff_noRandomness() { + Exception exception = new IOException("boom retry"); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Flux.concat(Flux.range(0, 2), Flux.error(exception)) + .retryWithBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0) + .elapsed() + .doOnNext(elapsed -> { if (elapsed.getT2() == 0) elapsedList.add(elapsed.getT1());} ) + .map(Tuple2::getT2) + ) + .thenAwait(Duration.ofSeconds(2)) + .expectNext(0, 1) //normal output + .expectNext(0, 1, 0, 1, 0, 1, 0, 1) //4 retry attempts + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 4/4") + .hasCause(exception)); + + assertThat(elapsedList).containsExactly(0L, 100L, 200L, 400L, 800L); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java index 7409717cc5..155c087e75 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java @@ -15,11 +15,22 @@ */ package reactor.core.publisher; +import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.assertj.core.api.LongAssert; +import org.assertj.core.data.Percentage; import org.junit.Test; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import reactor.util.function.Tuple2; + +import static org.assertj.core.api.Assertions.assertThat; public class MonoRetryWhenTest { @@ -45,4 +56,132 @@ public void exponentialRetry() { .expectComplete() .verify(); } + + @Test + public void monoRetryRandomBackoff() { + AtomicInteger errorCount = new AtomicInteger(); + Exception exception = new IOException("boom retry"); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Mono.error(exception) + .doOnError(e -> { + errorCount.incrementAndGet(); + elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS)); + }) + .retryWithBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0.1) + ) + .thenAwait(Duration.ofSeconds(2)) + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 4/4") + .hasCause(exception)); + + assertThat(errorCount).hasValue(5); + assertThat(elapsedList).hasSize(5); + assertThat(elapsedList.get(0)).isEqualTo(0L); + assertThat(elapsedList.get(1) - elapsedList.get(0)) + .isGreaterThanOrEqualTo(100) //min backoff + .isCloseTo(100, Percentage.withPercentage(10)); + assertThat(elapsedList.get(2) - elapsedList.get(1)) + .isCloseTo(200, Percentage.withPercentage(10)); + assertThat(elapsedList.get(3) - elapsedList.get(2)) + .isCloseTo(400, Percentage.withPercentage(10)); + assertThat(elapsedList.get(4) - elapsedList.get(3)) + .isCloseTo(800, Percentage.withPercentage(10)); + } + + @Test + public void monoRetryRandomBackoff_maxBackoffShaves() { + AtomicInteger errorCount = new AtomicInteger(); + Exception exception = new IOException("boom retry"); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Mono.error(exception) + .doOnError(e -> { + errorCount.incrementAndGet(); + elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS)); + }) + .retryWithBackoff(4, Duration.ofMillis(100), Duration.ofMillis(220), 0.9) + ) + .thenAwait(Duration.ofSeconds(2)) + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 4/4") + .hasCause(exception)); + + assertThat(errorCount).hasValue(5); + assertThat(elapsedList).hasSize(5); + assertThat(elapsedList.get(0)).isEqualTo(0L); + assertThat(elapsedList.get(1) - elapsedList.get(0)) + .isGreaterThanOrEqualTo(100) //min backoff + .isCloseTo(100, Percentage.withPercentage(90)); + + assertThat(elapsedList.get(2) - elapsedList.get(1)) + .isCloseTo(200, Percentage.withPercentage(90)) + .isGreaterThanOrEqualTo(100) + .isLessThanOrEqualTo(220); + assertThat(elapsedList.get(3) - elapsedList.get(2)) + .isGreaterThanOrEqualTo(100) + .isLessThanOrEqualTo(220); + assertThat(elapsedList.get(4) - elapsedList.get(3)) + .isGreaterThanOrEqualTo(100) + .isLessThanOrEqualTo(220); + } + + @Test + public void monoRetryRandomBackoff_minBackoffFloor() { + for (int i = 0; i < 50; i++) { + AtomicInteger errorCount = new AtomicInteger(); + Exception exception = new IOException("boom retry loop #" + i); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Mono.error(exception) + .doOnError(e -> { + errorCount.incrementAndGet(); + elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS)); + }) + .retryWithBackoff(1, Duration.ofMillis(100), Duration.ofMillis(2000), 0.9) + ) + .thenAwait(Duration.ofSeconds(2)) + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 1/1") + .hasCause(exception)); + + assertThat(errorCount).hasValue(2); + assertThat(elapsedList).hasSize(2); + assertThat(elapsedList.get(0)).isEqualTo(0L); + assertThat(elapsedList.get(1) - elapsedList.get(0)) + .isGreaterThanOrEqualTo(100) //min backoff + .isCloseTo(100, Percentage.withPercentage(90)); + } + } + + @Test + public void monoRetryRandomBackoff_noRandom() { + AtomicInteger errorCount = new AtomicInteger(); + Exception exception = new IOException("boom retry"); + List elapsedList = new ArrayList<>(); + + StepVerifier.withVirtualTime(() -> + Mono.error(exception) + .doOnError(e -> { + errorCount.incrementAndGet(); + elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS)); + }) + .retryWithBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0d) + ) + .thenAwait(Duration.ofSeconds(2)) + .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class) + .hasMessage("Retries exhausted: 4/4") + .hasCause(exception)); + + assertThat(errorCount).hasValue(5); + assertThat(elapsedList.get(0)).isEqualTo(0L); + assertThat(elapsedList.get(1) - elapsedList.get(0)).isEqualTo(100); + assertThat(elapsedList.get(2) - elapsedList.get(1)).isEqualTo(200); + assertThat(elapsedList.get(3) - elapsedList.get(2)).isEqualTo(400); + assertThat(elapsedList.get(4) - elapsedList.get(3)).isEqualTo(800); + } + } \ No newline at end of file diff --git a/src/docs/asciidoc/apdx-operatorChoice.adoc b/src/docs/asciidoc/apdx-operatorChoice.adoc index b5565f246a..59d17acb5b 100644 --- a/src/docs/asciidoc/apdx-operatorChoice.adoc +++ b/src/docs/asciidoc/apdx-operatorChoice.adoc @@ -6,7 +6,7 @@ is covered by a combination of operators, it is presented as a method call, with leading dot and parameters in parentheses, like this: `.methodCall(parameter)`. //TODO flux: cache, share, replay, publish, publishOn/subscribeOn/cancelOn -//compose/transform, retryWhen, repeatWhen, sort, startWith +//compose/transform, repeatWhen, sort, startWith //TODO Mono.sequenceEqual I want to deal with: @@ -60,7 +60,7 @@ I want to deal with: * I want to transform existing data: ** on a 1-to-1 basis (eg. strings to their length): `map` *** ...by just casting it: `cast` -*** ...in order to materialize each source value's index: `Flux#indexed` +*** ...in order to materialize each source value's index: `Flux#index` ** on a 1-to-n basis (eg. strings to their characters): `flatMap` + use a factory method ** on a 1-to-n basis with programmatic behavior for each source element and/or state: `handle` ** running an asynchronous task for each source item (eg. urls to http request): `flatMap` + an async `Publisher`-returning method @@ -218,6 +218,7 @@ I want to deal with: *** to a `Publisher` or `Mono`, possibly different ones depending on the error: `Flux#onErrorResume` and `Mono#onErrorResume` ** by retrying: `retry` *** ...triggered by a companion control Flux: `retryWhen` +*** ... using a standard backoff strategy (exponential backoff with jitter): `retryWithBackoff` * I want to deal with backpressure "errors"footnote:[request max from upstream and apply the strategy when downstream does not produce enough request]... ** by throwing a special `IllegalStateException`: `Flux#onBackpressureError`