Skip to content

Commit

Permalink
[GOBBLIN-2119] ignore adding deadline dag actions if they are already…
Browse files Browse the repository at this point in the history
… present (apache#4008)

* ignore adding deadline dag actions if they are already present
* fix bug
  • Loading branch information
arjun4084346 authored Jul 23, 2024
1 parent e68b62e commit 2ea9622
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams
leaseParams);
int numRowsUpdated = attemptLeaseIfNewRow(leaseParams.getDagAction(),
ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
.initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random() * DELAY_FOR_RETRY_RANGE_MILLIS)
.initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) (Math.random() * DELAY_FOR_RETRY_RANGE_MILLIS))
.build());
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, Optional.empty(),
adoptConsensusFlowExecutionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.proc;

import java.io.IOException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -193,15 +194,34 @@ public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {

private static void sendEnforceJobStartDeadlineDagAction(DagManagementStateStore dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(), dagNode.getValue().getFlowName(),
dagNode.getValue().getFlowExecutionId(), dagNode.getValue().getJobName(),
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(dagNode.getValue().getFlowGroup(),
dagNode.getValue().getFlowName(), dagNode.getValue().getFlowExecutionId(), dagNode.getValue().getJobName(),
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
try {
dagManagementStateStore.addDagAction(dagAction);
} catch (IOException e) {
if (e.getCause() != null && e.getCause() instanceof SQLIntegrityConstraintViolationException) {
// delete old dag action and have a new deadline dag proc with the new deadline time
dagManagementStateStore.deleteDagAction(dagAction);
log.warn("Duplicate ENFORCE_JOB_START_DEADLINE Dag Action is being created. Ignoring... " + e.getMessage());
dagManagementStateStore.addDagAction(dagAction);
}
}
}

public static void sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStore dagManagementStateStore, DagActionStore.DagAction launchDagAction)
throws IOException {
dagManagementStateStore.addFlowDagAction(launchDagAction.getFlowGroup(), launchDagAction.getFlowName(),
DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(launchDagAction.getFlowGroup(), launchDagAction.getFlowName(),
launchDagAction.getFlowExecutionId(), DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
try {
dagManagementStateStore.addDagAction(dagAction);
} catch (IOException e) {
if (e.getCause() != null && e.getCause() instanceof SQLIntegrityConstraintViolationException) {
dagManagementStateStore.deleteDagAction(dagAction);
log.warn("Duplicate ENFORCE_FLOW_FINISH_DEADLINE Dag Action is being created. Ignoring... " + e.getMessage());
dagManagementStateStore.addDagAction(dagAction);
}
}
}

public static long getDefaultJobStartDeadline(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus);

if (!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should have been created only for finished status - {}",
dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
// this may happen if adding job status in the store failed after adding a ReevaluateDagAction in KafkaJobStatusMonitor
throw new RuntimeException(String.format("Job status for dagNode %s is %s. Re-evaluate dag action are created for"
+ " new jobs with no job status when there are multiple of them to run next; or when a job finishes with status - %s",
dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES));
// this may happen if adding job status in the store failed/delayed after adding a ReevaluateDagAction in KafkaJobStatusMonitor
throw new RuntimeException(String.format("Job status for dagNode %s is %s. Re-evaluate dag action should have been "
+ "created only for finished status - %s. This may happen if reevaluate dag action launched reevaluate dag "
+ "proc before job status is updated in the store in KafkaJobStatusMonitor", dagNodeId, executionStatus,
FlowStatusGenerator.FINISHED_STATUSES));
}

// get the dag after updating dag node status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
try {
this.dagManagementStateStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
} catch (Exception e) {
if (isExceptionInstanceOf(e, nonRetryableExceptions)) {
} catch (IOException e) {
if (e.getCause() != null && isThrowableInstanceOf(e.getCause(), nonRetryableExceptions)) {
// todo - add metrics
log.warn("Duplicate REEVALUATE Dag Action is being created. Ignoring... " + e.getMessage());
} else {
Expand Down Expand Up @@ -426,7 +426,7 @@ public static long getExecutionIdFromTableName(String tableName) {

protected abstract org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event);

public static boolean isExceptionInstanceOf(Exception exception, List<Class<? extends Exception>> typesList) {
public static boolean isThrowableInstanceOf(Throwable exception, List<Class<? extends Exception>> typesList) {
return typesList.stream().anyMatch(e -> e.isInstance(exception));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -55,13 +54,17 @@
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.util.ConfigUtils;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -123,7 +126,7 @@ public void launchDag() throws IOException, InterruptedException, URISyntaxExcep
.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));

Mockito.verify(this.dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
.addFlowDagAction(any(), any(), anyLong(), eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
.addJobDagAction(any(), any(), anyLong(), eq(DagActionStore.NO_JOB_NAME_DEFAULT), eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
}

@Test
Expand Down

0 comments on commit 2ea9622

Please sign in to comment.