Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
changed RetryTemplate logic for finding TransientException
Browse files Browse the repository at this point in the history
  • Loading branch information
aravindanr committed Jul 7, 2022
1 parent ee02a86 commit 9e1364b
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.slf4j.LoggerFactory;

import com.netflix.conductor.cassandra.config.CassandraProperties;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.metrics.Monitors;

import com.datastax.driver.core.DataType;
Expand Down Expand Up @@ -236,15 +236,15 @@ String toJson(Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new TransientException("Error serializing to json", e);
throw new NonTransientException("Error serializing to json", e);
}
}

<T> T readValue(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
throw new TransientException("Error serializing json", e);
throw new NonTransientException("Error de-serializing json", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -29,14 +28,12 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
Expand All @@ -47,7 +44,6 @@

import static com.netflix.conductor.core.events.EventQueues.EVENT_QUEUE_PROVIDERS_QUALIFIER;
import static com.netflix.conductor.core.execution.tasks.SystemTaskRegistry.ASYNC_SYSTEM_TASKS_QUALIFIER;
import static com.netflix.conductor.core.utils.Utils.isTransientException;

import static java.util.function.Function.identity;

Expand Down Expand Up @@ -120,24 +116,10 @@ public Map<String, EventQueueProvider> getEventQueueProviders(

@Bean
public RetryTemplate onTransientErrorRetryTemplate() {
SimpleRetryPolicy retryPolicy = new CustomRetryPolicy();
retryPolicy.setMaxAttempts(3);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(new NoBackOffPolicy());
return retryTemplate;
}

public static class CustomRetryPolicy extends SimpleRetryPolicy {

@Override
public boolean canRetry(final RetryContext context) {
final Optional<Throwable> lastThrowable =
Optional.ofNullable(context.getLastThrowable());
return lastThrowable
.map(throwable -> super.canRetry(context) && isTransientException(throwable))
.orElseGet(() -> super.canRetry(context));
}
return RetryTemplate.builder()
.retryOn(TransientException.class)
.maxAttempts(3)
.noBackoff()
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public DefaultEventProcessor(

public void handle(ObservableQueue queue, Message msg) {
List<EventExecution> transientFailures = null;
Boolean executionFailed = false;
boolean executionFailed = false;
try {
if (isEventMessageIndexingEnabled) {
executionService.addMessage(queue.getName(), msg);
Expand Down Expand Up @@ -155,7 +155,7 @@ protected List<EventExecution> executeEvent(String event, Message msg) throws Ex
String evaluatorType = eventHandler.getEvaluatorType();
// Set default to true so that if condition is not specified, it falls through
// to process the event.
Boolean success = true;
boolean success = true;
if (StringUtils.isNotEmpty(condition) && evaluators.get(evaluatorType) != null) {
Object result =
evaluators
Expand Down Expand Up @@ -267,6 +267,7 @@ protected EventExecution execute(EventExecution eventExecution, Action action, O
eventExecution.getMessageId(),
payload);

// TODO: Switch to @Retryable annotation on SimpleActionProcessor.execute()
Map<String, Object> output =
retryTemplate.execute(
context ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public static void checkNotNull(Object object, String errorMessage) {
*/
public static boolean isTransientException(Throwable throwable) {
if (throwable != null) {
return !((throwable instanceof UnsupportedOperationException)
|| (throwable instanceof TransientException));
return throwable instanceof TransientException;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -141,7 +142,7 @@ public void testMetadataPopulationOnlyOnNecessaryWorkflowTasks() {
verifyNoMoreInteractions(metadataDAO);
}

@Test(expected = NotFoundException.class)
@Test
public void testMetadataPopulationMissingDefinitions() {
String nameTaskDefinition1 = "task4";
WorkflowTask workflowTask1 = createWorkflowTask(nameTaskDefinition1);
Expand All @@ -157,7 +158,11 @@ public void testMetadataPopulationMissingDefinitions() {
when(metadataDAO.getTaskDef(nameTaskDefinition1)).thenReturn(taskDefinition);
when(metadataDAO.getTaskDef(nameTaskDefinition2)).thenReturn(null);

metadataMapperService.populateTaskDefinitions(workflowDefinition);
try {
metadataMapperService.populateTaskDefinitions(workflowDefinition);
} catch (NotFoundException nfe) {
fail("Missing TaskDefinitions are not defaulted");
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public void testCreateTaskException() {
task.setTaskId(UUID.randomUUID().toString());
task.setTaskDefName("task1");

expectedException.expect(NullPointerException.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Workflow instance id cannot be null");
getExecutionDAO().createTasks(List.of(task));

task.setWorkflowInstanceId(UUID.randomUUID().toString());
expectedException.expect(NullPointerException.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Task reference name cannot be null");
getExecutionDAO().createTasks(List.of(task));
}
Expand Down

0 comments on commit 9e1364b

Please sign in to comment.