Skip to content

Commit

Permalink
[GOBBLIN-1935] Skip null dag action types unable to be processed (#3807)
Browse files Browse the repository at this point in the history
* Skip over null dag actions from malformed messages

* Add new metric for skipped messages

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
  • Loading branch information
umustafi and Urmi Mustafi authored Oct 25, 2023
1 parent 495bdaf commit 9a516d3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class RuntimeMetrics {
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED = DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
private ContextAwareMeter messageFilteredOutMeter;
private ContextAwareMeter malformedMessagesSkippedMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge

private volatile Long produceToConsumeDelayValue = -1L;
Expand Down Expand Up @@ -123,6 +124,12 @@ protected void processMessage(DecodeableKafkaRecord message) {
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();

if (value.getDagAction() == null) {
log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: "
+ "{}", flowGroup, flowName, flowExecutionId, tid, operation);
this.malformedMessagesSkippedMeter.mark();
return;
}
DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());

produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
Expand Down Expand Up @@ -225,6 +232,7 @@ protected void createMetrics() {
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
this.malformedMessagesSkippedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED);
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
Expand Down

0 comments on commit 9a516d3

Please sign in to comment.