From f8009c7f996e8525ec82e15740ab4a8fd02e598f Mon Sep 17 00:00:00 2001 From: Volodymyr Burenin Date: Mon, 1 Aug 2022 10:27:04 -0500 Subject: [PATCH 1/2] report number of messages to be processed via metrics --- .../utilities/deltastreamer/HoodieDeltaStreamerMetrics.java | 6 ++++++ .../java/org/apache/hudi/utilities/sources/KafkaSource.java | 1 + 2 files changed, 7 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index d361179a1db15..1e1d610b59908 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -101,6 +101,12 @@ public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) { } } + public void updateDeltaStreamerKafkaAvroMessagesToProcess(long totalNewMsgs) { + if (config.isMetricsOn()) { + Metrics.registerGauge(getMetricsName("deltastreamer", "kafka_avro_messages_in"), totalNewMsgs); + } + } + public long getDurationInMs(long ctxDuration) { return ctxDuration / 1000000; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index 6f2377fc7ce93..35b68f9d4bd77 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -57,6 +57,7 @@ protected InputBatch> fetchNewData(Option lastCheckpointStr, long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { + // TODO return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } JavaRDD newDataRDD = toRDD(offsetRanges); From 26c5a692265953fac1bbc9c9df7a2aee3a4611eb Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 17 Sep 2022 11:17:34 -0700 Subject: [PATCH 2/2] Fix the logic based on master --- .../utilities/deltastreamer/HoodieDeltaStreamerMetrics.java | 4 ++-- .../java/org/apache/hudi/utilities/sources/KafkaSource.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index 1e1d610b59908..2475e92f9607f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -101,9 +101,9 @@ public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) { } } - public void updateDeltaStreamerKafkaAvroMessagesToProcess(long totalNewMsgs) { + public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) { if (config.isMetricsOn()) { - Metrics.registerGauge(getMetricsName("deltastreamer", "kafka_avro_messages_in"), totalNewMsgs); + Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index 35b68f9d4bd77..5561356cabc76 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -57,9 +57,10 @@ protected InputBatch> fetchNewData(Option lastCheckpointStr, long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { - // TODO + metrics.updateDeltaStreamerKafkaMessageInCount(0); return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } + metrics.updateDeltaStreamerKafkaMessageInCount(totalNewMsgs); JavaRDD newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } catch (org.apache.kafka.common.errors.TimeoutException e) {