-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLinearRetry.java
60 lines (53 loc) · 2.09 KB
/
LinearRetry.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package edu.java.bot.clients.retry;
import java.time.Duration;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
public class LinearRetry extends Retry {
public final Duration minBackoff;
public final Duration maxBackoff = Duration.ofMillis(Long.MAX_VALUE);
public final long maxAttempts;
public final long step;
public final Predicate<Throwable> errorFilter;
public LinearRetry(
Duration minBackoff,
long maxAttempts,
long step,
Predicate<Throwable> errorFilter
) {
this.minBackoff = minBackoff;
this.maxAttempts = maxAttempts;
this.step = step;
this.errorFilter = errorFilter;
}
@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
return Flux.deferContextual((cv) -> retrySignals.contextWrite(cv).concatMap((retryWhenState) -> {
RetrySignal copy = retryWhenState.copy();
Throwable currentFailure = copy.failure();
long iteration = copy.totalRetries();
if (currentFailure == null) {
return Mono.error(
new IllegalStateException("Retry.RetrySignal#failure() not expected to be null")
);
} else if (!this.errorFilter.test(currentFailure)) {
return Mono.error(currentFailure);
} else if (iteration >= this.maxAttempts) {
return Mono.error(new RuntimeException());
}
Duration nextBackoff;
try {
nextBackoff = this.minBackoff.multipliedBy(iteration * step);
if (nextBackoff.compareTo(this.maxBackoff) > 0) {
nextBackoff = this.maxBackoff;
}
} catch (ArithmeticException e) {
nextBackoff = this.maxBackoff;
}
return Mono.delay(nextBackoff, Schedulers.parallel()).contextWrite(cv);
}).onErrorStop());
}
}