Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2113] Process Heartbeat DagAction CDC messages with empty FlowExecutionId str #4004

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class DagActionStoreChangeMonitorTest {

private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
private final long FLOW_EXECUTION_ID = 123L;
private final String FLOW_EXECUTION_ID = "123456789";
private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
private int txidCounter = 0;

Expand Down Expand Up @@ -135,7 +135,7 @@ public void tearDown() throws Exception {
@Test
public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoundException {
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", FLOW_EXECUTION_ID, null);
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
Expand All @@ -150,7 +150,7 @@ public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoun
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
public void testProcessMessageWithHeartbeatAndFlowInfo() throws SpecNotFoundException {
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, "", "", DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
Expand Down Expand Up @@ -251,28 +251,28 @@ public void testStartupSequenceHandlesFailures() throws Exception {
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType,
String flowGroup, String flowName, long flowExecutionId, DagActionValue dagAction) {
String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) {
String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType);
txidCounter++;
return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, String.valueOf(flowExecutionId),
return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, flowExecutionId,
DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
}

/**
* Form a key for events using the flow identifiers
* @return a key formed by adding an '_' delimiter between the flow identifiers
*/
public static String getKeyForFlow(String flowGroup, String flowName, long flowExecutionId) {
public static String getKeyForFlow(String flowGroup, String flowName, String flowExecutionId) {
return flowGroup + "_" + flowName + "_" + flowExecutionId;
}

/**
* Util to create wrapper around DagActionStoreChangeEvent
*/
private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType,
String flowGroup, String flowName, long flowExecutionId, DagActionValue dagAction) {
String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
private final int OFFSET = 1;
private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
private final long FLOW_EXECUTION_ID = 123L;
private final String FLOW_EXECUTION_ID = "987654321";
private final String JOB_NAME = "jobName";
private MockDagManagementDagActionStoreChangeMonitor mockDagManagementDagActionStoreChangeMonitor;
private int txidCounter = 0;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void setUp() throws Exception {
public void testProcessMessageWithDelete() throws SchedulerException {
Kafka09ConsumerClient.Kafka09ConsumerRecord<String, DagActionStoreChangeEvent> consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME, DagActionValue.ENFORCE_JOB_START_DEADLINE);
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, Long.parseLong(FLOW_EXECUTION_ID), JOB_NAME,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
/* TODO: skip deadline removal for now and let them fire
Expand All @@ -124,7 +124,7 @@ public void testProcessMessageWithDelete() throws SchedulerException {
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType,
String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionValue dagAction) {
String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionValue dagAction) {
String key = DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup, flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType);
Expand All @@ -137,7 +137,7 @@ private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType
* Util to create wrapper around DagActionStoreChangeEvent
*/
private Kafka09ConsumerClient.Kafka09ConsumerRecord<String, DagActionStoreChangeEvent> wrapDagActionStoreChangeEvent(
OperationType operationType, String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionValue dagAction) {
OperationType operationType, String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionValue dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ public void compileAndSubmitFlowToDagManager(FlowSpec flowSpec) throws IOExcepti
_log.warn("Flow: {} submitted to dagManager failed to compile and produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
} catch (IOException | InterruptedException e) {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
throw e;
} finally {
this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,27 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS

TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata);

if (!jobExecutionPlanDagOptional.isPresent()) {
return Optional.absent();
}
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want the try/catch here?

Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata);

if (jobExecutionPlanDagOptional.get().isEmpty()) {
populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata);
return Optional.absent();
}
if (!jobExecutionPlanDagOptional.isPresent()) {
return Optional.absent();
}

if (jobExecutionPlanDagOptional.get().isEmpty()) {
populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata);
return Optional.absent();
}

flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
} catch (IOException e) {
log.error("Encountered exception when attempting to compile and perform checks for flowGroup: {} flowName: {}",
flowGroup, flowName);
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected void processMessage(DecodeableKafkaRecord<String, DagActionStoreChange
String operation = value.getChangeEventIdentifier().getOperationType().name();
String flowGroup = value.getFlowGroup();
String flowName = value.getFlowName();
long flowExecutionId = Long.parseLong(value.getFlowExecutionId());
String flowExecutionId = value.getFlowExecutionId();
String jobName = value.getJobName();

produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
Expand All @@ -219,12 +219,14 @@ protected void processMessage(DecodeableKafkaRecord<String, DagActionStoreChange
}

DagActionStore.DagActionType dagActionType = DagActionStore.DagActionType.valueOf(value.getDagAction().toString());
// Parse flowExecutionIds after filtering out HB messages to prevent exception from parsing empty strings
long flowExecutionIdLong = Long.parseLong(flowExecutionId);

// Used to easily log information to identify the dag action
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionIdLong, jobName,
dagActionType);

handleDagAction(operation, dagAction, flowGroup, flowName, flowExecutionId, dagActionType);
handleDagAction(operation, dagAction, flowGroup, flowName, flowExecutionIdLong, dagActionType);

dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
Expand Down
Loading