Skip to content

Commit

Permalink
Draft for async retry
Browse files Browse the repository at this point in the history
  • Loading branch information
ashamukov committed May 18, 2019
1 parent 5852e1b commit 49f66e0
Show file tree
Hide file tree
Showing 13 changed files with 1,096 additions and 214 deletions.
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,48 @@ Here is an example of declarative iteration using Spring AOP to repeat a service

The example above uses a default `RetryTemplate` inside the interceptor. To change the policies or listeners, you only need to inject an instance of `RetryTemplate` into the interceptor.

## Asynchronous retry
### Terms
```java

CompletableFuture<HttpResponse<String>> completableFuture = retryTemplate.execute(
ctx -> httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
);
```
- __async callback__ - a callback that returns one of the supported async types (CompletableFuture, Future). Usually, async retry callback is one, that does not perform a heavy work by itself, but schedules the work to some worker and returns an instance of async type to track the progress. Failure of async callback itself usually means failure of scheduling (but not of actual work).
Failure of async callback (of scheduling) and of actual job will both be retried on a common basis, according to configured policies.

- __job__ - a task with payload, usually heavy, which result will be available through the instance of async type, returned by async callback (and, consequently, by _execute_ method)

- __rescheduling executor__ - an instance of executor, used for scheduling a new retry attempt after a delay (provided by a backoff policy). The type of executor is restricted by ScheduledExecutorService, to take advantage of its "schedule after delay" feature, which allows us to implement backoff without blocking a thread. Rescheduling executor is used for all retries except of initial scheduling retries (initial invocation of async callback).

### Initial invocation of async callback
Invocation of template.execute(asyncCallback) returns when first scheduling of job succeeded, or all initial scheduling attempts failed. Retry template does not produce async containers by itself, therefore there is nothing to return from _execute_ until initial invocation succeed. Backoffs between failing initial scheduling attempts will be performed by default sleeper by means of Thread.sleep() on caller thread. Why this approach is used:
- to be compatible with generic API of RetryOperations (where return type of callback equals to retrun type of execute(...))
- to provide an additional mean of back pressure

### Subsequent invocations of async callback
If the first execution of the _job_ failed and a retry is allowed by the policy, the next invocation of the async callback will be scheduled on _rescheduling executor_

### Async callbacks without executor
If executor is not provided, a backoff will be performed by Thread.sleep() on the client thread (for initial scheduling) or on the worker thread (for job failures, or for subsequent schedulings).

### Configuration example
```java
RetryTemplate.builder()
// activte the async retry feature with an executor
.asyncRetry(Executors.newScheduledThreadPool(1))
.fixedBackoff(1000)
.build();

RetryTemplate.builder()
// activte the async retry feature without an executor.
// Thread.sleep() will be used for backoff.
.asyncRetry()
.fixedBackoff(1000)
.build();
```

## Contributing

Spring Retry is released under the non-restrictive Apache 2.0 license,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.springframework.retry.backoff;

import java.util.function.Supplier;

public interface LastBackoffPeriodSupplier extends Supplier<Long> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.springframework.retry.backoff;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RememberPeriodSleeper implements Sleeper, LastBackoffPeriodSupplier {

private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class);

private volatile Long lastBackoffPeriod;

@Override
public void sleep(long backOffPeriod) {
logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod);
lastBackoffPeriod = backOffPeriod;
}

@Override
public Long get() {
return lastBackoffPeriod;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.retry.support;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryException;
import org.springframework.retry.backoff.LastBackoffPeriodSupplier;

/**
* @author Dave Syer
*/
public abstract class AsyncRetryResultProcessor<T> implements RetryResultProcessor<T> {
private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class);

protected T doNewAttempt(Supplier<Result<T>> supplier) throws Throwable {
logger.debug("Performing the next async callback invocation...");
return supplier.get().getOrThrow();
}

protected abstract T scheduleNewAttemptAfterDelay(
Supplier<Result<T>> supplier,
ScheduledExecutorService reschedulingExecutor,
long rescheduleAfterMillis,
RetryContext ctx
) throws Throwable;

protected T handleException(Supplier<Result<T>> supplier,
Consumer<Throwable> handler,
Throwable throwable,
ScheduledExecutorService reschedulingExecutor,
LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {
try {
handler.accept(unwrapIfNeed(throwable));

if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) {
return doNewAttempt(supplier);
} else {
long rescheduleAfterMillis = lastBackoffPeriodSupplier.get();
logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis...");
return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx);
}
}
catch (Throwable t) {
throw RetryTemplate.runtimeException(unwrapIfNeed(t));
}
}

static Throwable unwrapIfNeed(Throwable throwable) {
if (throwable instanceof ExecutionException
|| throwable instanceof CompletionException
|| throwable instanceof RetryException) {
return throwable.getCause();
} else {
return throwable;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryException;
import org.springframework.retry.backoff.LastBackoffPeriodSupplier;

/**
* A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a
Expand All @@ -33,58 +39,47 @@
*
* @author Dave Syer
*/
public class CompletableFutureRetryResultProcessor
implements RetryResultProcessor<CompletableFuture<?>> {
public class CompletableFutureRetryResultProcessor<V>
extends AsyncRetryResultProcessor<CompletableFuture<V>> {

protected final Log logger = LogFactory.getLog(getClass());

@Override
public Result<CompletableFuture<?>> process(CompletableFuture<?> completable,
Supplier<Result<CompletableFuture<?>>> supplier,
Consumer<Throwable> handler) {
@SuppressWarnings("unchecked")
CompletableFuture<Object> typed = (CompletableFuture<Object>) completable;
CompletableFuture<?> handle = typed
.thenApply(value -> CompletableFuture.completedFuture(value))
.exceptionally(throwable -> apply(supplier, handler, throwable))
public Result<CompletableFuture<V>> process(CompletableFuture<V> completable,
Supplier<Result<CompletableFuture<V>>> supplier,
Consumer<Throwable> handler, ScheduledExecutorService reschedulingExecutor,
LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {

CompletableFuture<V> handle = completable
.thenApply(CompletableFuture::completedFuture)
.exceptionally(throwable -> handleException(
supplier, handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)
)
.thenCompose(Function.identity());

return new Result<>(handle);
}

private CompletableFuture<Object> apply(
Supplier<Result<CompletableFuture<?>>> supplier, Consumer<Throwable> handler,
Throwable throwable) {
Throwable error = throwable;
try {
if (throwable instanceof ExecutionException
|| throwable instanceof CompletionException) {
error = throwable.getCause();
}
handler.accept(error);
Result<CompletableFuture<?>> result = supplier.get();
if (result.isComplete()) {
@SuppressWarnings("unchecked")
CompletableFuture<Object> output = (CompletableFuture<Object>) result
.getResult();
return output;
protected CompletableFuture<V> scheduleNewAttemptAfterDelay(
Supplier<Result<CompletableFuture<V>>> supplier,
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis,
RetryContext ctx)
{
CompletableFuture<CompletableFuture<V>> futureOfFurtherScheduling = new CompletableFuture<>();

reschedulingExecutor.schedule(() -> {
try {
RetrySynchronizationManager.register(ctx);
futureOfFurtherScheduling.complete(doNewAttempt(supplier));
} catch (Throwable t) {
futureOfFurtherScheduling.completeExceptionally(t);
throw RetryTemplate.runtimeException(t);
} finally {
RetrySynchronizationManager.clear();
}
throw result.exception;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
error = e;
}
catch (CompletionException e) {
error = e.getCause();
}
catch (ExecutionException e) {
error = e.getCause();
}
catch (RetryException e) {
error = e.getCause();
}
catch (Throwable e) {
error = e;
}
throw RetryTemplate.runtimeException(error);
}
}, rescheduleAfterMillis, TimeUnit.MILLISECONDS);

return futureOfFurtherScheduling.thenCompose(Function.identity());
}
}
Loading

0 comments on commit 49f66e0

Please sign in to comment.