Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Sep 26, 2024
1 parent 3f9d233 commit dc0900a
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */
Expand Down Expand Up @@ -119,7 +120,7 @@ private void translateKnownPerWorkerCounters(List<PerStepNamespaceMetrics> metri
for (PerStepNamespaceMetrics perStepnamespaceMetrics : metrics) {
if (!BigQuerySinkMetrics.METRICS_NAMESPACE.equals(
perStepnamespaceMetrics.getMetricsNamespace())
&& !org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE.equals(
&& !KafkaSinkMetrics.METRICS_NAMESPACE.equals(
perStepnamespaceMetrics.getMetricsNamespace())) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Stores and exports metrics for a batch of Kafka Client RPCs. */
public interface KafkaMetrics {
Expand Down Expand Up @@ -63,6 +65,8 @@ static NoOpKafkaMetrics getInstance() {
@AutoValue
abstract class KafkaMetricsImpl implements KafkaMetrics {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsImpl.class);

static HashMap<String, Histogram> latencyHistograms = new HashMap<String, Histogram>();

abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();
Expand Down Expand Up @@ -118,6 +122,7 @@ private void recordRpcLatencyMetrics() {
@Override
public void updateKafkaMetrics() {
if (!isWritable().compareAndSet(true, false)) {
LOG.warn("Updating stale Kafka metrics container");
return;
}
recordRpcLatencyMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ public boolean advance() throws IOException {
// Pass metrics to container.
kafkaResults.updateKafkaMetrics();
return true;

} else { // -- (b)
nextBatch();

Expand Down Expand Up @@ -518,6 +517,7 @@ String name() {
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
LOG.info("xxx reader {}", source.getSpec().toString());

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -162,6 +163,8 @@ public KafkaUnboundedReader<K, V> createReader(
throw new RuntimeException(e);
}
}
// topic partitions isn't empty
LOG.info("xxx create reader {}", spec.getTopicPartitions().toString());
return new KafkaUnboundedReader<>(this, checkpointMark);
}

Expand Down

0 comments on commit dc0900a

Please sign in to comment.