From 1d222ccebf7504b793279680b7e505660da21a75 Mon Sep 17 00:00:00 2001 From: umustafi Date: Tue, 24 Oct 2023 14:01:44 -0700 Subject: [PATCH] [GOBBLIN-1934] Monitor High Level Consumer queue size (#3805) * Emit metrics to monitor high level consumer queue size * Empty commit to trigger tests * Use BlockingQueue.size() func instead of atomic integer array * Remove unused import & add DagActionChangeMonitor prefix to metric * Refactor to avoid repeating code * Make protected variables private where possible * Fix white space --------- Co-authored-by: Urmi Mustafi --- .../runtime/kafka/HighLevelConsumer.java | 24 +++++++++++++++++-- .../runtime/metrics/RuntimeMetrics.java | 1 + .../DagActionStoreChangeMonitor.java | 9 +++++-- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java index 7b494201eca..1cba016bd1e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java @@ -51,6 +51,7 @@ import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener; import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient; import org.apache.gobblin.kafka.client.KafkaConsumerRecord; +import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.metrics.RuntimeMetrics; @@ -101,6 +102,7 @@ public abstract class HighLevelConsumer extends AbstractIdleService { private final ScheduledExecutorService consumerExecutor; private final ExecutorService queueExecutor; private final BlockingQueue[] queues; + private ContextAwareGauge[] queueSizeGauges; private final AtomicInteger recordsProcessed; private final Map partitionOffsetsToCommit; private final boolean enableAutoCommit; @@ -127,7 +129,7 @@ public HighLevelConsumer(String topic, Config config, int numThreads) { this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread"))); this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d"))); this.queues = new LinkedBlockingQueue[numThreads]; - for(int i=0; i queues[finalI].size()); + } + } + + /** + * Used by child classes to distinguish prefixes from one another + */ + protected String getMetricsPrefix() { + return ""; } /** @@ -237,6 +256,7 @@ protected void startUp() { private void consume() { try { Iterator itr = gobblinKafkaConsumerClient.consume(); + // TODO: we may be committing too early and only want to commit after process messages if(!enableAutoCommit) { commitOffsets(); } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index 8fc1258ab50..dc4e26e9185 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -28,6 +28,7 @@ public class RuntimeMetrics { // Metric names public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ = "gobblin.kafka.highLevelConsumer.messagesRead"; + public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX = "gobblin.kafka.highLevelConsumer.queueSize"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS = "gobblin.jobMonitor.kafka.totalSpecs"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS = "gobblin.jobMonitor.kafka.newSpecs"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_UPDATED_SPECS = "gobblin.jobMonitor.kafka.updatedSpecs"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index e5a2d090d3a..6855ca66910 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -34,7 +34,6 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; -import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.SpecNotFoundException; @@ -217,7 +216,8 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) { @Override protected void createMetrics() { - super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ); + super.createMetrics(); + // Dag Action specific metrics this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED); this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED); this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED); @@ -228,4 +228,9 @@ protected void createMetrics() { this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); this.getMetricContext().register(this.produceToConsumeDelayMillis); } + + @Override + protected String getMetricsPrefix() { + return RuntimeMetrics.DAG_ACTION_STORE_MONITOR_PREFIX + "."; + } }