From 3f9d233776841f81d26d2da4c37324e89b1081f8 Mon Sep 17 00:00:00 2001 From: Naireen Date: Mon, 16 Sep 2024 21:01:26 +0000 Subject: [PATCH] Add kafka poll latency metrics --- .../apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 956a2a7cb0bd9..c71e1b93c1729 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -587,6 +587,14 @@ private void consumerPollLoop() { while (!closed.get()) { try { if (records.isEmpty()) { + // Each source has a single unique topic. + List topicPartitions = source.getSpec().getTopicPartitions(); + Preconditions.checkStateNotNull(topicPartitions); + String topicName = "null"; // value will be overridden + for (TopicPartition topicPartition : topicPartitions) { + topicName = topicPartition.topic(); + break; + } stopwatch.start(); records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); @@ -617,7 +625,6 @@ private void consumerPollLoop() { private void commitCheckpointMark() { KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null); - if (checkpointMark != null) { LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark); Consumer consumer = Preconditions.checkStateNotNull(this.consumer);