diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index dc4e26e9185..86adb467127 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -48,6 +48,7 @@ public class RuntimeMetrics { public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED = DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 6855ca66910..b05e7310521 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -62,6 +62,7 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { private ContextAwareMeter unexpectedErrors; private ContextAwareMeter messageProcessedMeter; private ContextAwareMeter messageFilteredOutMeter; + private ContextAwareMeter malformedMessagesSkippedMeter; private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge private volatile Long produceToConsumeDelayValue = -1L; @@ -123,6 +124,12 @@ protected void processMessage(DecodeableKafkaRecord message) { String flowName = value.getFlowName(); String flowExecutionId = value.getFlowExecutionId(); + if (value.getDagAction() == null) { + log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: " + + "{}", flowGroup, flowName, flowExecutionId, tid, operation); + this.malformedMessagesSkippedMeter.mark(); + return; + } DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString()); produceToConsumeDelayValue = calcMillisSince(produceTimestamp); @@ -225,6 +232,7 @@ protected void createMetrics() { this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS); this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED); this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT); + this.malformedMessagesSkippedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED); this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); this.getMetricContext().register(this.produceToConsumeDelayMillis); }