Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1984] Add consensus flowExecutionId to FlowSpec to use for compilation #3857

Merged
merged 7 commits into from
Jan 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into updateFlowSpecWithId
  • Loading branch information
umustafi authored Jan 12, 2024
commit 70a2e64087a8b0f6ca391aeec2da22957840a80b
Original file line number Diff line number Diff line change
@@ -263,7 +263,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
return;
}
Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
if (compiledDag == null || compiledDag.isEmpty()) {
if (compiledDag.isEmpty()) {
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
Original file line number Diff line number Diff line change
@@ -90,9 +90,7 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
}

addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
if (flowCompilationTimer.isPresent()) {
flowCompilationTimer.get().stop(flowMetadata);
}
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}

@@ -127,10 +125,8 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
// Send FLOW_FAILED event
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flow flowSpec to change this behaviour.");
if (eventSubmitter.isPresent()) {
new TimingEvent(eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
}
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
return Optional.absent();
}
}
@@ -153,7 +149,7 @@ private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, St
* @param flowSpec
* @param flowMetadata
*/
public static void populateFlowCompilationFailedEventMessage(Optional<EventSubmitter> eventSubmitter,
public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter,
FlowSpec flowSpec, Map<String, String> flowMetadata) {
// For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow
// compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId.
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.