Skip to content

Commit

Permalink
fix #1122 Add standardized exponential backoff retry with jitter to core
Browse files Browse the repository at this point in the history
This form of retry is similar to the one provided by reactor-extra,
but exposing less configuration options. Instead, it uses sane defaults
to result in an exponential backoff that is randomized with jitter.
  • Loading branch information
simonbasle committed Mar 28, 2018
1 parent 63debe7 commit f759799
Show file tree
Hide file tree
Showing 6 changed files with 536 additions and 2 deletions.
100 changes: 100 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -6415,6 +6416,105 @@ public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
return onAssembly(new FluxRetryWhen<>(this, whenFactory));
}

/**
* In case of error, retry this {@link Flux} up to {@code numRetries} times using a
* randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%}
* but the effective backoff delay cannot be less than {@code firstBackoff}.
* <p>
* The randomized exponential backoff is good at preventing two typical issues with
* other simpler backoff strategies, namely:
* <ul>
* <li>
* having an exponentially growing backoff delay with a small initial delay gives
* the best tradeoff between not overwhelming the server and serving the client as
* fast as possible
* </li>
* <li>
* having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms"
* where eg. numerous clients would hit the server at the same time, causing it to
* display transient failures which would cause all clients to retry at the same
* backoff times, ultimately sparing no load on the server.
* </li>
* </ul>
*
* @param numRetries the maximum number of attempts before an {@link IllegalStateException}
* is raised (having the original retry-triggering exception as cause).
* @param firstBackoff the first backoff delay to apply then grow exponentially. Also
* minimum delay even taking jitter into account.
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Flux<T> retryWithBackoff(long numRetries, Duration firstBackoff) {
return retryWithBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d);
}

/**
* In case of error, retry this {@link Flux} up to {@code numRetries} times using a
* randomized exponential backoff strategy. The jitter factor is {@code 50%}
* but the effective backoff delay cannot be less than {@code firstBackoff} nor more
* than {@code maxBackoff}.
<p>
* The randomized exponential backoff is good at preventing two typical issues with
* other simpler backoff strategies, namely:
* <ul>
* <li>
* having an exponentially growing backoff delay with a small initial delay gives
* the best tradeoff between not overwhelming the server and serving the client as
* fast as possible
* </li>
* <li>
* having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms"
* where eg. numerous clients would hit the server at the same time, causing it to
* display transient failures which would cause all clients to retry at the same
* backoff times, ultimately sparing no load on the server.
* </li>
* </ul>
*
* @param numRetries the maximum number of attempts before an {@link IllegalStateException}
* is raised (having the original retry-triggering exception as cause).
* @param firstBackoff the first backoff delay to apply then grow exponentially. Also
* minimum delay even taking jitter into account.
* @param maxBackoff the maximum delay to apply despite exponential growth and jitter.
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Flux<T> retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff) {
return retryWithBackoff(numRetries, firstBackoff, maxBackoff, 0.5d);
}

/**
* In case of error, retry this {@link Flux} up to {@code numRetries} times using a
* randomized exponential backoff strategy, randomized with a user-provided jitter
* factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}).
* Even with the jitter, the effective backoff delay cannot be less than
* {@code firstBackoff} nor more than {@code maxBackoff}.
<p>
* The randomized exponential backoff is good at preventing two typical issues with
* other simpler backoff strategies, namely:
* <ul>
* <li>
* having an exponentially growing backoff delay with a small initial delay gives
* the best tradeoff between not overwhelming the server and serving the client as
* fast as possible
* </li>
* <li>
* having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms"
* where eg. numerous clients would hit the server at the same time, causing it to
* display transient failures which would cause all clients to retry at the same
* backoff times, ultimately sparing no load on the server.
* </li>
* </ul>
*
* @param numRetries the maximum number of attempts before an {@link IllegalStateException}
* is raised (having the original retry-triggering exception as cause).
* @param firstBackoff the first backoff delay to apply then grow exponentially. Also
* minimum delay even taking jitter into account.
* @param maxBackoff the maximum delay to apply despite exponential growth and jitter.
* @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0).
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Flux<T> retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor) {
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor));
}

/**
* Sample this {@link Flux} by periodically emitting an item corresponding to that
* {@link Flux} latest emitted value within the periodical time window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package reactor.core.publisher;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
Expand Down Expand Up @@ -241,4 +243,64 @@ public void subscribe(CoreSubscriber<? super Throwable> actual) {
completionSignal.subscribe(actual);
}
}

static Function<Flux<Throwable>, Publisher<Long>> randomExponentialBackoffFunction(
long numRetries, Duration firstBackoff, Duration maxBackoff,
double jitterFactor) {
if (jitterFactor < 0 || jitterFactor > 1) throw new IllegalArgumentException("jitterFactor must be between 0 and 1 (default 0.5)");
Objects.requireNonNull(firstBackoff, "firstBackoff is required");
Objects.requireNonNull(maxBackoff, "maxBackoff is required");

return t -> t.index()
.flatMap(t2 -> {
long iteration = t2.getT1();

if (iteration >= numRetries) {
return Mono.<Long>error(new IllegalStateException("Retries exhausted: " + iteration + "/" + numRetries, t2.getT2()));
}

Duration nextBackoff;
try {
nextBackoff = firstBackoff.multipliedBy((long) Math.pow(2, iteration));
if (nextBackoff.compareTo(maxBackoff) > 0) {
nextBackoff = maxBackoff;
}
}
catch (ArithmeticException overflow) {
nextBackoff = maxBackoff;
}

//short-circuit delay == 0 case
if (nextBackoff.isZero()) {
return Mono.just(iteration);
}

ThreadLocalRandom random = ThreadLocalRandom.current();

long jitterOffset;
try {
jitterOffset = nextBackoff.multipliedBy((long) (100 * jitterFactor))
.dividedBy(100)
.toMillis();
}
catch (ArithmeticException ae) {
jitterOffset = Math.round(Long.MAX_VALUE * jitterFactor);
}
long lowBound = Math.max(firstBackoff.minus(nextBackoff)
.toMillis(), -jitterOffset);
long highBound = Math.min(maxBackoff.minus(nextBackoff)
.toMillis(), jitterOffset);

long jitter;
if (highBound == lowBound) {
if (highBound == 0) jitter = 0;
else jitter = random.nextLong(highBound);
}
else {
jitter = random.nextLong(lowBound, highBound);
}
Duration effectiveBackoff = nextBackoff.plusMillis(jitter);
return Mono.delay(effectiveBackoff);
});
}
}
99 changes: 99 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -2923,6 +2923,105 @@ public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
return onAssembly(new MonoRetryWhen<>(this, whenFactory));
}

/**
* In case of error, retry this {@link Flux} up to {@code numRetries} times using a
* randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%}
* but the effective backoff delay cannot be less than {@code firstBackoff}.
<p>
* The randomized exponential backoff is good at preventing two typical issues with
* other simpler backoff strategies, namely:
* <ul>
* <li>
* having an exponentially growing backoff delay with a small initial delay gives
* the best tradeoff between not overwhelming the server and serving the client as
* fast as possible
* </li>
* <li>
* having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms"
* where eg. numerous clients would hit the server at the same time, causing it to
* display transient failures which would cause all clients to retry at the same
* backoff times, ultimately sparing no load on the server.
* </li>
* </ul>
*
* @param numRetries the maximum number of attempts before an {@link IllegalStateException}
* is raised (having the original retry-triggering exception as cause).
* @param firstBackoff the first backoff delay to apply then grow exponentially. Also
* minimum delay even taking jitter into account.
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Mono<T> retryWithBackoff(long numRetries, Duration firstBackoff) {
return retryWithBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d);
}

/**
* In case of error, retry this {@link Flux} up to {@code numRetries} times using a
* randomized exponential backoff strategy. The jitter factor is {@code 50%}
* but the effective backoff delay cannot be less than {@code firstBackoff} nor more
* than {@code maxBackoff}.
<p>
* The randomized exponential backoff is good at preventing two typical issues with
* other simpler backoff strategies, namely:
* <ul>
* <li>
* having an exponentially growing backoff delay with a small initial delay gives
* the best tradeoff between not overwhelming the server and serving the client as
* fast as possible
* </li>
* <li>
* having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms"
* where eg. numerous clients would hit the server at the same time, causing it to
* display transient failures which would cause all clients to retry at the same
* backoff times, ultimately sparing no load on the server.
* </li>
* </ul>
*
* @param numRetries the maximum number of attempts before an {@link IllegalStateException}
* is raised (having the original retry-triggering exception as cause).
* @param firstBackoff the first backoff delay to apply then grow exponentially. Also
* minimum delay even taking jitter into account.
* @param maxBackoff the maximum delay to apply despite exponential growth and jitter.
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Mono<T> retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff) {
return retryWithBackoff(numRetries, firstBackoff, maxBackoff, 0.5d);
}

/**
* In case of error, retry this {@link Flux} up to {@code numRetries} times using a
* randomized exponential backoff strategy, randomized with a user-provided jitter
* factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}).
* Even with the jitter, the effective backoff delay cannot be less than
* {@code firstBackoff} nor more than {@code maxBackoff}.
<p>
* The randomized exponential backoff is good at preventing two typical issues with
* other simpler backoff strategies, namely:
* <ul>
* <li>
* having an exponentially growing backoff delay with a small initial delay gives
* the best tradeoff between not overwhelming the server and serving the client as
* fast as possible
* </li>
* <li>
* having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms"
* where eg. numerous clients would hit the server at the same time, causing it to
* display transient failures which would cause all clients to retry at the same
* backoff times, ultimately sparing no load on the server.
* </li>
* </ul>
*
* @param numRetries the maximum number of attempts before an {@link IllegalStateException}
* is raised (having the original retry-triggering exception as cause).
* @param firstBackoff the first backoff delay to apply then grow exponentially. Also
* minimum delay even taking jitter into account.
* @param maxBackoff the maximum delay to apply despite exponential growth and jitter.
* @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0).
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Mono<T> retryWithBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor) {
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor));
}

/**
* Expect exactly one item from this {@link Mono} source or signal
* {@link java.util.NoSuchElementException} for an empty source.
Expand Down
Loading

0 comments on commit f759799

Please sign in to comment.