From 8673accad6adeef3904939d1dd3003a163f58269 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Wed, 26 Jan 2022 21:43:38 +0100 Subject: [PATCH] feat(core): add a retry on storage interface --- cli/src/main/resources/application.yml | 6 + .../io/kestra/core/annotations/Retryable.java | 68 ++++++++++ .../io/kestra/core/listeners/RetryEvents.java | 4 +- .../io/kestra/core/runners/RunContext.java | 2 +- .../core/services/ConditionService.java | 3 +- .../core/storages/StorageInterface.java | 14 +- .../intercept/OverrideRetryInterceptor.java | 123 ++++++++++++++++++ .../OverrideRetryInterceptorTest.java | 50 +++++++ .../io/kestra/storage/local/LocalStorage.java | 6 +- ui/src/styles/layout/bootstrap.scss | 4 + .../controllers/ExecutionController.java | 2 +- 11 files changed, 271 insertions(+), 11 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/annotations/Retryable.java create mode 100644 core/src/main/java/io/micronaut/retry/intercept/OverrideRetryInterceptor.java create mode 100644 core/src/test/java/io/micronaut/retry/intercept/OverrideRetryInterceptorTest.java diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index f11d3e73e7..9628acc4ef 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -39,6 +39,12 @@ endpoints: write-sensitive: false kestra: + retries: + attempts: 5 + multiplier: 2.0 + delay: 1s + maxDelay: "" + kafka: defaults: topic-prefix: "kestra_" diff --git a/core/src/main/java/io/kestra/core/annotations/Retryable.java b/core/src/main/java/io/kestra/core/annotations/Retryable.java new file mode 100644 index 0000000000..be52890d78 --- /dev/null +++ b/core/src/main/java/io/kestra/core/annotations/Retryable.java @@ -0,0 +1,68 @@ +package io.kestra.core.annotations; + +import io.micronaut.aop.Around; +import io.micronaut.context.annotation.AliasFor; +import io.micronaut.context.annotation.Type; +import io.micronaut.retry.annotation.DefaultRetryPredicate; +import io.micronaut.retry.annotation.RetryPredicate; +import io.micronaut.retry.intercept.OverrideRetryInterceptor; + +import java.lang.annotation.*; + +import javax.validation.constraints.Digits; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Inherited +@Documented +@Retention(RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE, ElementType.ANNOTATION_TYPE}) +@Around +@Type(OverrideRetryInterceptor.class) +public @interface Retryable { + int MAX_INTEGRAL_DIGITS = 4; + + /** + * @return The exception types to include (defaults to all) + */ + Class[] value() default {}; + + /** + * @return The exception types to include (defaults to all) + */ + @AliasFor(member = "value") + Class[] includes() default {}; + + /** + * @return The exception types to exclude (defaults to none) + */ + Class[] excludes() default {}; + + /** + * @return The maximum number of retry attempts + */ + @Digits(integer = MAX_INTEGRAL_DIGITS, fraction = 0) + String attempts() default "${kestra.retries.attempts:5}"; + + /** + * @return The delay between retry attempts + */ + String delay() default "${kestra.retries.delay:1s}"; + + /** + * @return The maximum overall delay + */ + String maxDelay() default "${kestra.retries.max-delay:}"; + + /** + * @return The multiplier to use to calculate the delay + */ + @Digits(integer = 2, fraction = 2) + String multiplier() default "${kestra.retries.multiplier:2.0}"; + + /** + * @return The retry predicate class to use instead of {@link io.micronaut.retry.annotation.Retryable#includes} and {@link io.micronaut.retry.annotation.Retryable#excludes} + * (defaults to none) + */ + Class predicate() default DefaultRetryPredicate.class; +} \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/listeners/RetryEvents.java b/core/src/main/java/io/kestra/core/listeners/RetryEvents.java index 4ab22224e9..bb957f2b2b 100644 --- a/core/src/main/java/io/kestra/core/listeners/RetryEvents.java +++ b/core/src/main/java/io/kestra/core/listeners/RetryEvents.java @@ -13,8 +13,8 @@ public class RetryEvents { void onRetry(final RetryEvent event) { log.info( "Retry from '{}.{}()', attempt {}, overallDelay {}", - event.getSource().getExecutableMethod().getClass().getName(), - event.getSource().getName(), + event.getSource().getTarget().getClass().getName(), + event.getSource().getExecutableMethod().getName(), event.getRetryState().currentAttempt(), event.getRetryState().getOverallDelay() ); diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index 33eb96826d..2f813c96ee 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -340,7 +340,7 @@ public org.slf4j.Logger logger() { return runContextLogger.logger(); } - public InputStream uriToInputStream(URI uri) throws FileNotFoundException { + public InputStream uriToInputStream(URI uri) throws IOException { if (uri.getScheme().equals("kestra")) { return this.storageInterface.get(uri); } diff --git a/core/src/main/java/io/kestra/core/services/ConditionService.java b/core/src/main/java/io/kestra/core/services/ConditionService.java index 37d7aea092..7a5116b5e3 100644 --- a/core/src/main/java/io/kestra/core/services/ConditionService.java +++ b/core/src/main/java/io/kestra/core/services/ConditionService.java @@ -18,10 +18,12 @@ import java.util.List; import java.util.stream.Collectors; import jakarta.inject.Inject; +import jakarta.inject.Singleton; /** * Provides business logic to manipulate {@link Condition} */ +@Singleton public class ConditionService { @Inject private RunContextFactory runContextFactory; @@ -97,7 +99,6 @@ public ConditionContext conditionContext(RunContext runContext, Flow flow, @Null .build(); } - public ConditionContext conditionContext(RunContext runContext, Flow flow, @Nullable Execution execution) { return this.conditionContext(runContext, flow, execution, null); } diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterface.java b/core/src/main/java/io/kestra/core/storages/StorageInterface.java index f06fa98ca6..1e4dfc977a 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageInterface.java +++ b/core/src/main/java/io/kestra/core/storages/StorageInterface.java @@ -2,6 +2,7 @@ import com.google.common.base.Charsets; import com.google.common.hash.Hashing; +import io.kestra.core.annotations.Retryable; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; @@ -23,14 +24,19 @@ @Introspected public interface StorageInterface { - InputStream get(URI uri) throws FileNotFoundException; + @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) + InputStream get(URI uri) throws IOException; - Long size(URI uri) throws FileNotFoundException; + @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) + Long size(URI uri) throws IOException; + @Retryable(includes = {IOException.class}) URI put(URI uri, InputStream data) throws IOException; + @Retryable(includes = {IOException.class}) boolean delete(URI uri) throws IOException; + @Retryable(includes = {IOException.class}) List deleteByPrefix(URI storagePrefix) throws IOException; default String executionPrefix(Flow flow, Execution execution) { @@ -91,7 +97,7 @@ default Optional extractExecutionId(URI path) { return Optional.of(matcher.group(2)); } - default URI uri(Flow flow, Execution execution, String inputName, String file) throws URISyntaxException { + default URI uri(Flow flow, Execution execution, String inputName, String file) throws URISyntaxException { return new URI("/" + String.join( "/", Arrays.asList( @@ -103,6 +109,7 @@ default URI uri(Flow flow, Execution execution, String inputName, String file) t )); } + @Retryable(includes = {IOException.class}) default URI from(Flow flow, Execution execution, String input, File file) throws IOException { try { return this.put( @@ -114,6 +121,7 @@ default URI from(Flow flow, Execution execution, String input, File file) throws } } + @Retryable(includes = {IOException.class}) default URI from(Flow flow, Execution execution, Input input, File file) throws IOException { return this.from(flow, execution, input.getName(), file); } diff --git a/core/src/main/java/io/micronaut/retry/intercept/OverrideRetryInterceptor.java b/core/src/main/java/io/micronaut/retry/intercept/OverrideRetryInterceptor.java new file mode 100644 index 0000000000..931396167e --- /dev/null +++ b/core/src/main/java/io/micronaut/retry/intercept/OverrideRetryInterceptor.java @@ -0,0 +1,123 @@ +package io.micronaut.retry.intercept; + +import io.kestra.core.annotations.Retryable; +import io.micronaut.aop.InterceptPhase; +import io.micronaut.aop.InterceptedMethod; +import io.micronaut.aop.MethodInterceptor; +import io.micronaut.aop.MethodInvocationContext; +import io.micronaut.context.event.ApplicationEventPublisher; +import io.micronaut.core.annotation.AnnotationValue; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.convert.value.MutableConvertibleValues; +import io.micronaut.retry.RetryState; +import io.micronaut.retry.annotation.DefaultRetryPredicate; +import io.micronaut.retry.event.RetryEvent; +import jakarta.inject.Singleton; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * Replace {@link DefaultRetryInterceptor} only to catch Throwable + */ +@Singleton +public class OverrideRetryInterceptor implements MethodInterceptor { + private static final Logger LOG = LoggerFactory.getLogger(OverrideRetryInterceptor.class); + private final ApplicationEventPublisher eventPublisher; + + public OverrideRetryInterceptor(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + @Override + public int getOrder() { + return InterceptPhase.RETRY.getPosition(); + } + + @Nullable + @Override + public Object intercept(MethodInvocationContext context) { + Optional> opt = context.findAnnotation(Retryable.class); + if (opt.isEmpty()) { + return context.proceed(); + } + + AnnotationValue retry = opt.get(); + + MutableRetryState retryState = new SimpleRetry( + retry.get("attempts", Integer.class).orElse(5), + retry.get("multiplier", Double.class).orElse(2D), + retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)), + retry.get("maxDelay", Duration.class).orElse(null), + new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")) + ); + + MutableConvertibleValues attrs = context.getAttributes(); + attrs.put(RetryState.class.getName(), retry); + + InterceptedMethod interceptedMethod = InterceptedMethod.of(context); + try { + retryState.open(); + + Object result = retrySync(context, retryState, interceptedMethod); + switch (interceptedMethod.resultType()) { + case SYNCHRONOUS: + retryState.close(null); + return result; + default: + return interceptedMethod.unsupported(); + } + } catch (Exception e) { + return interceptedMethod.handleException(e); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static List> resolveIncludes(AnnotationValue retry, String includes) { + Class[] values = retry.classValues(includes); + return (List) Collections.unmodifiableList(Arrays.asList(values)); + } + + private Object retrySync(MethodInvocationContext context, MutableRetryState retryState, InterceptedMethod interceptedMethod) { + boolean firstCall = true; + while (true) { + try { + if (firstCall) { + firstCall = false; + return interceptedMethod.interceptResult(); + } + return interceptedMethod.interceptResult(this); + } catch (Throwable e) { + if (!retryState.canRetry(e)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot retry anymore. Rethrowing original exception for method: {}", context); + } + retryState.close(e); + throw e; + } else { + long delayMillis = retryState.nextDelay(); + try { + if (eventPublisher != null) { + try { + eventPublisher.publishEvent(new RetryEvent(context, retryState, e)); + } catch (Exception e1) { + LOG.error("Error occurred publishing RetryEvent: " + e1.getMessage(), e1); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retrying execution for method [{}] after delay of {}ms for exception: {}", context, delayMillis, e.getMessage()); + } + Thread.sleep(delayMillis); + } catch (InterruptedException e1) { + throw e; + } + } + } + } + } +} diff --git a/core/src/test/java/io/micronaut/retry/intercept/OverrideRetryInterceptorTest.java b/core/src/test/java/io/micronaut/retry/intercept/OverrideRetryInterceptorTest.java new file mode 100644 index 0000000000..e67f9816ee --- /dev/null +++ b/core/src/test/java/io/micronaut/retry/intercept/OverrideRetryInterceptorTest.java @@ -0,0 +1,50 @@ +package io.micronaut.retry.intercept; + +import io.kestra.core.annotations.Retryable; +import io.micronaut.retry.event.RetryEvent; +import io.micronaut.runtime.event.annotation.EventListener; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.nio.channels.AlreadyBoundException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@MicronautTest +class OverrideRetryInterceptorTest { + @Inject + RetryEvents retryEvents; + + @Inject + TestRetry retry; + + @Test + void test() { + assertThrows(AlreadyBoundException.class, retry::failedMethod); + + assertThat(retryEvents.count, is(5)); + } + + @Singleton + public static class TestRetry { + @Retryable(delay = "1s", multiplier = "2.0") + public String failedMethod() { + throw new AlreadyBoundException(); + } + } + + @Singleton + @Slf4j + public static class RetryEvents { + public int count = 0; + @EventListener + void onRetry(final RetryEvent event) { + this.count++; + } + } +} \ No newline at end of file diff --git a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java index fd5e109336..c69a4887a3 100644 --- a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java +++ b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java @@ -49,7 +49,7 @@ private void createDirectory(URI append) { } @Override - public InputStream get(URI uri) throws FileNotFoundException { + public InputStream get(URI uri) throws IOException { return new BufferedInputStream(new FileInputStream(getPath(URI.create(uri.getPath())) .toAbsolutePath() .toString()) @@ -57,13 +57,13 @@ public InputStream get(URI uri) throws FileNotFoundException { } @Override - public Long size(URI uri) throws FileNotFoundException { + public Long size(URI uri) throws IOException { try { return Files.size(getPath(URI.create(uri.getPath()))); } catch (NoSuchFileException e) { throw new FileNotFoundException("Unable to find file at '" + uri + "'"); } catch (IOException e) { - throw new FileNotFoundException("Unable to find file at '" + uri + "' with message '" + e.getMessage() + "'"); + throw new IOException("Unable to find file at '" + uri + "' with message '" + e.getMessage() + "'"); } } diff --git a/ui/src/styles/layout/bootstrap.scss b/ui/src/styles/layout/bootstrap.scss index 6e719a29ec..a757d4fff9 100644 --- a/ui/src/styles/layout/bootstrap.scss +++ b/ui/src/styles/layout/bootstrap.scss @@ -242,6 +242,10 @@ table { border-radius: 0; } } + + td { + word-break: break-all; + } } .table { diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index 644528ccef..d78c4c7462 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -378,7 +378,7 @@ public HttpResponse file( public HttpResponse filesize( String executionId, @QueryValue(value = "path") URI path - ) throws FileNotFoundException { + ) throws IOException { HttpResponse httpResponse =this.validateFile(executionId, path, "/api/v1/executions/{executionId}/file/metas?path=" + path); if (httpResponse != null) { return httpResponse;