Skip to content

Commit

Permalink
feat(core): add a retry on storage interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 26, 2022
1 parent 0b0e8de commit 8673acc
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 11 deletions.
6 changes: 6 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ endpoints:
write-sensitive: false

kestra:
retries:
attempts: 5
multiplier: 2.0
delay: 1s
maxDelay: ""

kafka:
defaults:
topic-prefix: "kestra_"
Expand Down
68 changes: 68 additions & 0 deletions core/src/main/java/io/kestra/core/annotations/Retryable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.core.annotations;

import io.micronaut.aop.Around;
import io.micronaut.context.annotation.AliasFor;
import io.micronaut.context.annotation.Type;
import io.micronaut.retry.annotation.DefaultRetryPredicate;
import io.micronaut.retry.annotation.RetryPredicate;
import io.micronaut.retry.intercept.OverrideRetryInterceptor;

import java.lang.annotation.*;

import javax.validation.constraints.Digits;

import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Inherited
@Documented
@Retention(RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Around
@Type(OverrideRetryInterceptor.class)
public @interface Retryable {
int MAX_INTEGRAL_DIGITS = 4;

/**
* @return The exception types to include (defaults to all)
*/
Class<? extends Throwable>[] value() default {};

/**
* @return The exception types to include (defaults to all)
*/
@AliasFor(member = "value")
Class<? extends Throwable>[] includes() default {};

/**
* @return The exception types to exclude (defaults to none)
*/
Class<? extends Throwable>[] excludes() default {};

/**
* @return The maximum number of retry attempts
*/
@Digits(integer = MAX_INTEGRAL_DIGITS, fraction = 0)
String attempts() default "${kestra.retries.attempts:5}";

/**
* @return The delay between retry attempts
*/
String delay() default "${kestra.retries.delay:1s}";

/**
* @return The maximum overall delay
*/
String maxDelay() default "${kestra.retries.max-delay:}";

/**
* @return The multiplier to use to calculate the delay
*/
@Digits(integer = 2, fraction = 2)
String multiplier() default "${kestra.retries.multiplier:2.0}";

/**
* @return The retry predicate class to use instead of {@link io.micronaut.retry.annotation.Retryable#includes} and {@link io.micronaut.retry.annotation.Retryable#excludes}
* (defaults to none)
*/
Class<? extends RetryPredicate> predicate() default DefaultRetryPredicate.class;
}
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/listeners/RetryEvents.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ public class RetryEvents {
void onRetry(final RetryEvent event) {
log.info(
"Retry from '{}.{}()', attempt {}, overallDelay {}",
event.getSource().getExecutableMethod().getClass().getName(),
event.getSource().getName(),
event.getSource().getTarget().getClass().getName(),
event.getSource().getExecutableMethod().getName(),
event.getRetryState().currentAttempt(),
event.getRetryState().getOverallDelay()
);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public org.slf4j.Logger logger() {
return runContextLogger.logger();
}

public InputStream uriToInputStream(URI uri) throws FileNotFoundException {
public InputStream uriToInputStream(URI uri) throws IOException {
if (uri.getScheme().equals("kestra")) {
return this.storageInterface.get(uri);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import java.util.List;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

/**
* Provides business logic to manipulate {@link Condition}
*/
@Singleton
public class ConditionService {
@Inject
private RunContextFactory runContextFactory;
Expand Down Expand Up @@ -97,7 +99,6 @@ public ConditionContext conditionContext(RunContext runContext, Flow flow, @Null
.build();
}


public ConditionContext conditionContext(RunContext runContext, Flow flow, @Nullable Execution execution) {
return this.conditionContext(runContext, flow, execution, null);
}
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import io.kestra.core.annotations.Retryable;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
Expand All @@ -23,14 +24,19 @@

@Introspected
public interface StorageInterface {
InputStream get(URI uri) throws FileNotFoundException;
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(URI uri) throws IOException;

Long size(URI uri) throws FileNotFoundException;
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
Long size(URI uri) throws IOException;

@Retryable(includes = {IOException.class})
URI put(URI uri, InputStream data) throws IOException;

@Retryable(includes = {IOException.class})
boolean delete(URI uri) throws IOException;

@Retryable(includes = {IOException.class})
List<URI> deleteByPrefix(URI storagePrefix) throws IOException;

default String executionPrefix(Flow flow, Execution execution) {
Expand Down Expand Up @@ -91,7 +97,7 @@ default Optional<String> extractExecutionId(URI path) {
return Optional.of(matcher.group(2));
}

default URI uri(Flow flow, Execution execution, String inputName, String file) throws URISyntaxException {
default URI uri(Flow flow, Execution execution, String inputName, String file) throws URISyntaxException {
return new URI("/" + String.join(
"/",
Arrays.asList(
Expand All @@ -103,6 +109,7 @@ default URI uri(Flow flow, Execution execution, String inputName, String file) t
));
}

@Retryable(includes = {IOException.class})
default URI from(Flow flow, Execution execution, String input, File file) throws IOException {
try {
return this.put(
Expand All @@ -114,6 +121,7 @@ default URI from(Flow flow, Execution execution, String input, File file) throws
}
}

@Retryable(includes = {IOException.class})
default URI from(Flow flow, Execution execution, Input input, File file) throws IOException {
return this.from(flow, execution, input.getName(), file);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package io.micronaut.retry.intercept;

import io.kestra.core.annotations.Retryable;
import io.micronaut.aop.InterceptPhase;
import io.micronaut.aop.InterceptedMethod;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.value.MutableConvertibleValues;
import io.micronaut.retry.RetryState;
import io.micronaut.retry.annotation.DefaultRetryPredicate;
import io.micronaut.retry.event.RetryEvent;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* Replace {@link DefaultRetryInterceptor} only to catch Throwable
*/
@Singleton
public class OverrideRetryInterceptor implements MethodInterceptor<Object, Object> {
private static final Logger LOG = LoggerFactory.getLogger(OverrideRetryInterceptor.class);
private final ApplicationEventPublisher<RetryEvent> eventPublisher;

public OverrideRetryInterceptor(ApplicationEventPublisher<RetryEvent> eventPublisher) {
this.eventPublisher = eventPublisher;
}

@Override
public int getOrder() {
return InterceptPhase.RETRY.getPosition();
}

@Nullable
@Override
public Object intercept(MethodInvocationContext<Object, Object> context) {
Optional<AnnotationValue<Retryable>> opt = context.findAnnotation(Retryable.class);
if (opt.isEmpty()) {
return context.proceed();
}

AnnotationValue<Retryable> retry = opt.get();

MutableRetryState retryState = new SimpleRetry(
retry.get("attempts", Integer.class).orElse(5),
retry.get("multiplier", Double.class).orElse(2D),
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
retry.get("maxDelay", Duration.class).orElse(null),
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes"))
);

MutableConvertibleValues<Object> attrs = context.getAttributes();
attrs.put(RetryState.class.getName(), retry);

InterceptedMethod interceptedMethod = InterceptedMethod.of(context);
try {
retryState.open();

Object result = retrySync(context, retryState, interceptedMethod);
switch (interceptedMethod.resultType()) {
case SYNCHRONOUS:
retryState.close(null);
return result;
default:
return interceptedMethod.unsupported();
}
} catch (Exception e) {
return interceptedMethod.handleException(e);
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static List<Class<? extends Throwable>> resolveIncludes(AnnotationValue<Retryable> retry, String includes) {
Class<?>[] values = retry.classValues(includes);
return (List) Collections.unmodifiableList(Arrays.asList(values));
}

private Object retrySync(MethodInvocationContext<Object, Object> context, MutableRetryState retryState, InterceptedMethod interceptedMethod) {
boolean firstCall = true;
while (true) {
try {
if (firstCall) {
firstCall = false;
return interceptedMethod.interceptResult();
}
return interceptedMethod.interceptResult(this);
} catch (Throwable e) {
if (!retryState.canRetry(e)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot retry anymore. Rethrowing original exception for method: {}", context);
}
retryState.close(e);
throw e;
} else {
long delayMillis = retryState.nextDelay();
try {
if (eventPublisher != null) {
try {
eventPublisher.publishEvent(new RetryEvent(context, retryState, e));
} catch (Exception e1) {
LOG.error("Error occurred publishing RetryEvent: " + e1.getMessage(), e1);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying execution for method [{}] after delay of {}ms for exception: {}", context, delayMillis, e.getMessage());
}
Thread.sleep(delayMillis);
} catch (InterruptedException e1) {
throw e;
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.micronaut.retry.intercept;

import io.kestra.core.annotations.Retryable;
import io.micronaut.retry.event.RetryEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.nio.channels.AlreadyBoundException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

@MicronautTest
class OverrideRetryInterceptorTest {
@Inject
RetryEvents retryEvents;

@Inject
TestRetry retry;

@Test
void test() {
assertThrows(AlreadyBoundException.class, retry::failedMethod);

assertThat(retryEvents.count, is(5));
}

@Singleton
public static class TestRetry {
@Retryable(delay = "1s", multiplier = "2.0")
public String failedMethod() {
throw new AlreadyBoundException();
}
}

@Singleton
@Slf4j
public static class RetryEvents {
public int count = 0;
@EventListener
void onRetry(final RetryEvent event) {
this.count++;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ private void createDirectory(URI append) {
}

@Override
public InputStream get(URI uri) throws FileNotFoundException {
public InputStream get(URI uri) throws IOException {
return new BufferedInputStream(new FileInputStream(getPath(URI.create(uri.getPath()))
.toAbsolutePath()
.toString())
);
}

@Override
public Long size(URI uri) throws FileNotFoundException {
public Long size(URI uri) throws IOException {
try {
return Files.size(getPath(URI.create(uri.getPath())));
} catch (NoSuchFileException e) {
throw new FileNotFoundException("Unable to find file at '" + uri + "'");
} catch (IOException e) {
throw new FileNotFoundException("Unable to find file at '" + uri + "' with message '" + e.getMessage() + "'");
throw new IOException("Unable to find file at '" + uri + "' with message '" + e.getMessage() + "'");
}
}

Expand Down
4 changes: 4 additions & 0 deletions ui/src/styles/layout/bootstrap.scss
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ table {
border-radius: 0;
}
}

td {
word-break: break-all;
}
}

.table {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public HttpResponse<StreamedFile> file(
public HttpResponse<FileMetas> filesize(
String executionId,
@QueryValue(value = "path") URI path
) throws FileNotFoundException {
) throws IOException {
HttpResponse<FileMetas> httpResponse =this.validateFile(executionId, path, "/api/v1/executions/{executionId}/file/metas?path=" + path);
if (httpResponse != null) {
return httpResponse;
Expand Down

0 comments on commit 8673acc

Please sign in to comment.