Skip to content

Commit

Permalink
[GOBBLIN-1934] Monitor High Level Consumer queue size (apache#3805)
Browse files Browse the repository at this point in the history
* Emit metrics to monitor high level consumer queue size

* Empty commit to trigger tests

* Use BlockingQueue.size() func instead of atomic integer array

* Remove unused import & add DagActionChangeMonitor prefix to metric

* Refactor to avoid repeating code

* Make protected variables private where possible

* Fix white space

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
  • Loading branch information
2 people authored and phet committed Oct 26, 2023
1 parent 1744a71 commit 1d222cc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
Expand Down Expand Up @@ -101,6 +102,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
private final ScheduledExecutorService consumerExecutor;
private final ExecutorService queueExecutor;
private final BlockingQueue[] queues;
private ContextAwareGauge[] queueSizeGauges;
private final AtomicInteger recordsProcessed;
private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
private final boolean enableAutoCommit;
Expand All @@ -127,7 +129,7 @@ public HighLevelConsumer(String topic, Config config, int numThreads) {
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread")));
this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d")));
this.queues = new LinkedBlockingQueue[numThreads];
for(int i=0; i<queues.length; i++) {
for(int i = 0; i < queues.length; i++) {
this.queues[i] = new LinkedBlockingQueue();
}
this.recordsProcessed = new AtomicInteger(0);
Expand Down Expand Up @@ -196,7 +198,24 @@ protected void shutdownMetrics() throws IOException {
* this method to instantiate their own metrics.
*/
protected void createMetrics() {
this.messagesRead = this.metricContext.counter(RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
String prefix = getMetricsPrefix();
this.messagesRead = this.metricContext.counter(prefix +
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
this.queueSizeGauges = new ContextAwareGauge[numThreads];
for (int i = 0; i < numThreads; i++) {
// An 'effectively' final variable is needed inside the lambda expression below
int finalI = i;
this.queueSizeGauges[i] = this.metricContext.newContextAwareGauge(prefix +
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX + "-" + i,
() -> queues[finalI].size());
}
}

/**
* Used by child classes to distinguish prefixes from one another
*/
protected String getMetricsPrefix() {
return "";
}

/**
Expand Down Expand Up @@ -237,6 +256,7 @@ protected void startUp() {
private void consume() {
try {
Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
// TODO: we may be committing too early and only want to commit after process messages
if(!enableAutoCommit) {
commitOffsets();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class RuntimeMetrics {
// Metric names
public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ =
"gobblin.kafka.highLevelConsumer.messagesRead";
public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX = "gobblin.kafka.highLevelConsumer.queueSize";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS = "gobblin.jobMonitor.kafka.totalSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS = "gobblin.jobMonitor.kafka.newSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_UPDATED_SPECS = "gobblin.jobMonitor.kafka.updatedSpecs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
Expand Down Expand Up @@ -217,7 +216,8 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) {

@Override
protected void createMetrics() {
super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
super.createMetrics();
// Dag Action specific metrics
this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
Expand All @@ -228,4 +228,9 @@ protected void createMetrics() {
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}

@Override
protected String getMetricsPrefix() {
return RuntimeMetrics.DAG_ACTION_STORE_MONITOR_PREFIX + ".";
}
}

0 comments on commit 1d222cc

Please sign in to comment.