Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1934] Monitor High Level Consumer queue size #3805

Merged
merged 7 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.atomic.AtomicIntegerArray;
umustafi marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.commons.lang3.reflect.ConstructorUtils;

import com.codahale.metrics.Counter;
Expand All @@ -51,6 +52,7 @@
import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
Expand Down Expand Up @@ -87,7 +89,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
@Getter
protected final String topic;
protected final Config config;
private final int numThreads;
protected final int numThreads;

/**
* {@link MetricContext} for the consumer. Note this is instantiated when {@link #startUp()} is called, so
Expand All @@ -100,7 +102,8 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
private final ScheduledExecutorService consumerExecutor;
private final ExecutorService queueExecutor;
private final BlockingQueue[] queues;
protected final BlockingQueue[] queues;
protected ContextAwareGauge[] queueSizeGauges;
private final AtomicInteger recordsProcessed;
private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
private final boolean enableAutoCommit;
Expand Down Expand Up @@ -197,6 +200,14 @@ protected void shutdownMetrics() throws IOException {
*/
protected void createMetrics() {
this.messagesRead = this.metricContext.counter(RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
this.queueSizeGauges = new ContextAwareGauge[numThreads];
for (int i=0; i < numThreads; i++) {
umustafi marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, one more i=0 (spaces)

// An 'effectively' final variable is needed inside the lambda expression below
int finalI = i;
umustafi marked this conversation as resolved.
Show resolved Hide resolved
this.queueSizeGauges[i] = this.metricContext.newContextAwareGauge(
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX + "-" + i,
() -> queues[finalI].size());
}
}

/**
Expand Down Expand Up @@ -237,6 +248,7 @@ protected void startUp() {
private void consume() {
try {
Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
// TODO: we may be committing too early and only want to commit after process messages
if(!enableAutoCommit) {
commitOffsets();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class RuntimeMetrics {
// Metric names
public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ =
"gobblin.kafka.highLevelConsumer.messagesRead";
public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX = "gobblin.kafka.highLevelConsumer.queueSize";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS = "gobblin.jobMonitor.kafka.totalSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS = "gobblin.jobMonitor.kafka.newSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_UPDATED_SPECS = "gobblin.jobMonitor.kafka.updatedSpecs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;

import java.util.concurrent.atomic.AtomicIntegerArray;
umustafi marked this conversation as resolved.
Show resolved Hide resolved
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
Expand Down Expand Up @@ -217,7 +217,16 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) {

@Override
protected void createMetrics() {
super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
super.messagesRead = this.getMetricContext().counter(RuntimeMetrics.DAG_ACTION_STORE_MONITOR_PREFIX + "." + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
super.queueSizeGauges = new ContextAwareGauge[super.numThreads];
for (int i=0; i < numThreads; i++) {
// An 'effectively' final variable is needed inside the lambda expression below
int finalI = i;
this.queueSizeGauges[i] = this.getMetricContext().newContextAwareGauge(
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX + "-" + i,
() -> super.queues[finalI].size());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may not be clicking for me... but why can't this all be replaced by super.createMetrics()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh oops I overwrote the important piece of the code, but I need to add the gobblin.service and dagActionMonitor prefix to actually report this metric and be able to distinguish it from any other classes that use HighLevelConsumer

// Dag Action specific metrics
this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
Expand Down
Loading