-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2113] Process Heartbeat DagAction CDC messages with empty FlowExecutionId str #4004
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4004 +/- ##
=========================================
Coverage 38.72% 38.72%
Complexity 1596 1596
=========================================
Files 388 388
Lines 15957 15957
Branches 1578 1578
=========================================
Hits 6180 6180
Misses 9283 9283
Partials 494 494 ☔ View full report in Codecov by Sentry. |
@@ -150,7 +150,7 @@ public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoun | |||
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction") | |||
public void testProcessMessageWithHeartbeatAndFlowInfo() throws SpecNotFoundException { | |||
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = | |||
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME); | |||
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", DagActionValue.RESUME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a real case? can a heartbeat message not have flow name and have dag action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a real case but wanted to check some edge case behavior
checks on my branch umustafi#34 |
if (!jobExecutionPlanDagOptional.isPresent()) { | ||
return Optional.absent(); | ||
} | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still want the try/catch here?
flowCompilationTimer.stop(flowMetadata); | ||
return jobExecutionPlanDagOptional; | ||
} catch (IOException e) { | ||
log.error("Encountered exception when attempting to compile and perform checks for flow: {}", flowSpec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to log this out maybe only print out the flowspec IDs
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
After changing
flowExecutionId
from a Str to a long in a previous PR, we encounterNumberFormatException
in theDagActionStoreChangeMonitor
when processing HB events. This ends up killing theHighLevelConsumer
queues for the hosts that receive the HB events in their partition.Tests
Updates the following unit test for Heartbeat (HB) events which was using valid flow names, groups, and flowExecutionId for HB event that did not accurately reflect a HB event received from CDC. After updating its values the test failed locally
It passes after the update.
Commits