Skip to content

Commit

Permalink
Handling for Non transient exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
vsinghal85 committed Dec 11, 2024
1 parent b596948 commit 57752a7
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.service.modules.orchestration;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +41,7 @@
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.util.ExecutorsUtils;


Expand Down Expand Up @@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService {
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartDeadlineTimeMillis;
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
// Todo Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined
public static List<Class<? extends Exception>> retryableExceptions = Collections.EMPTY_LIST;

@Inject
public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, DagProcFactory dagProcFactory,
Expand All @@ -85,6 +90,10 @@ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) {
defaultJobStartDeadlineTimeMillis = deadlineTimeMs;
}

public static boolean isTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions);
}

@Override
protected void startUp() {
Integer numThreads = ConfigUtils.getInt
Expand Down Expand Up @@ -149,6 +158,13 @@ public void run() {
dagTask.conclude();
log.info(dagProc.contextualizeStatus("concluded dagTask"));
} catch (Exception e) {
if(!DagProcessingEngine.isTransientException(e)){
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
dagTask, e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagTask.conclude();
}
// Todo add the else block for transient exceptions and add conclude task only if retry limit is not breached
log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
Expand Down Expand Up @@ -92,20 +91,9 @@ public final void process(DagManagementStateStore dagManagementStateStore,
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
try {
logContextualizedInfo("ready to process");
act(dagManagementStateStore, state, dagProcEngineMetrics);
logContextualizedInfo("processed");
} catch (Exception e) {
if (isNonTransientException(e)) {
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
getDagTask(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
} else {
throw e;
}
}
}

protected abstract T initialize(DagManagementStateStore dagManagementStateStore) throws IOException;
Expand All @@ -126,8 +114,4 @@ public String contextualizeStatus(String message) {
public void logContextualizedInfo(String message) {
log.info(contextualizeStatus(message));
}

protected boolean isNonTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, this.nonRetryableExceptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class DagProcUtils {
public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId) throws IOException {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagUtils.getNext(dag);

if (nextNodes.size() == 1) {
Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId);
Expand All @@ -98,6 +97,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
Dag.DagId dagId) {
DagUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = DagUtils.getJobExecutionPlan(dagNode);

JobSpec jobSpec = DagUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);

Expand Down Expand Up @@ -139,12 +139,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
// Only mark the job as failed in case of non transient exceptions
if(!DagProcessingEngine.isTransientException(e)){
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
}
try {
// when there is no exception, quota will be released in job status monitor or re-evaluate dag proc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@
import org.apache.gobblin.testing.AssertWithBackoff;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;


@Slf4j
public class DagProcessingEngineTest {
Expand Down Expand Up @@ -196,9 +195,15 @@ public void dagProcessingTest()
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. "
+ "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);

// Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions);
}

@Test
public void isNonTransientExceptionTest(){
Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!")));
Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!")));
}

private enum ExceptionType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.apache.gobblin.service.modules.orchestration.proc;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@RunWith(PowerMockRunner.class)
@PrepareForTest(EventSubmitter.class)
public class DagProcUtilsTest {

DagManagementStateStore dagManagementStateStore;
SpecExecutor mockSpecExecutor;

@BeforeClass
public void setUp() {
dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
}

@Test
public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException {
Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
}
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

@Test
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException, ExecutionException, InterruptedException {
Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

@Test(expectedExceptions = RuntimeException.class)
public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{
Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
SpecProducer<Spec> mockedSpecProducer = mockSpecExecutor.getProducer().get();
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

private List<JobExecutionPlan> getJobExecutionPlans() throws URISyntaxException {
Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3);

Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build();
Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build();
Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build();
List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3);
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Config jobConfig = jobConfigs.get(i);
FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
if(i==2){
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, ConfigFactory.empty()));
}
else{
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty()));
}
}
return jobExecutionPlans;
}
}

0 comments on commit 57752a7

Please sign in to comment.