diff --git a/README.md b/README.md index 480e63ee..0b947ae2 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,13 @@ Labels: `cluster_name, topic, partition` The latest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the last produced offset. The last produced offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the last produced offset and the last consumed offset in certain panels. +**`kafka_partition_earliest_offset`** + +Labels: `cluster_name, topic, partition` + +The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels. + + ### Labels Each metric may include the following labels when reported. If you define the `labels` property for configuration of a cluster then those labels will also be included. @@ -477,6 +484,10 @@ required. Before running a release make sure the following pre-req's are met. ## Change log +0.5.5 + +* Added kafka_partition_earliest_offset metric for determining the volume of offsets stored in Kafka. + 0.5.4 * Bugfix: Accidentally released with local repo. diff --git a/scripts/update_readme_version.sh b/scripts/update_readme_version.sh index 3ecad92f..5044790d 100755 --- a/scripts/update_readme_version.sh +++ b/scripts/update_readme_version.sh @@ -13,5 +13,5 @@ echo helm install $RELEASE_URL sed -i -r "s/helm install http\S*/helm install ${RELEASE_URL_ESCAPED}/g" $DIR/../README.md echo Update README.md Running Docker Image version to: -echo lightbend/kafka-lag-exporter:0.5.0 +echo lightbend/kafka-lag-exporter:${VERSION} sed -i -r "s/lightbend\/kafka-lag-exporter:\S*/lightbend\/kafka-lag-exporter:${VERSION}/g" $DIR/../README.md \ No newline at end of file diff --git a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala index 8918ed34..99a621ea 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala @@ -29,6 +29,7 @@ object ConsumerGroupCollector { final case class OffsetsSnapshot( timestamp: Long, groups: List[String], + earliestOffsets: PartitionOffsets, latestOffsets: PartitionOffsets, lastGroupOffsets: GroupOffsets ) extends Message { @@ -43,6 +44,10 @@ object ConsumerGroupCollector { } override def toString: String = { + val earliestOffsetHeader = TpoFormat.format("Topic", "Partition", "Earliest") + val earliestOffsetsStr = earliestOffsets.map { + case (TopicPartition(t, p), LookupTable.Point(offset, _)) => TpoFormat.format(t,p,offset) + } val latestOffsetHeader = TpoFormat.format("Topic", "Partition", "Offset") val latestOffsetsStr = latestOffsets.map { case (TopicPartition(t, p), LookupTable.Point(offset, _)) => TpoFormat.format(t,p,offset) @@ -56,6 +61,9 @@ object ConsumerGroupCollector { s""" |Timestamp: $timestamp |Groups: ${groups.mkString(",")} + |Earliest Offsets: + |$earliestOffsetHeader + |${earliestOffsetsStr.mkString("\n")} |Latest Offsets: |$latestOffsetHeader |${latestOffsetsStr.mkString("\n")} @@ -89,7 +97,8 @@ object ConsumerGroupCollector { context.self ! Collect - val collectorState = CollectorState(topicPartitionTables = Domain.TopicPartitionTable(config.lookupTableSize)) + val collectorState = CollectorState( + topicPartitionTables = Domain.TopicPartitionTable(config.lookupTableSize)) collector(config, clientCreator(config.cluster), reporter, collectorState) } }.onFailure(SupervisorStrategy.restartWithBackoff(1 seconds, 10 seconds, 0.2)) @@ -116,12 +125,14 @@ object ConsumerGroupCollector { val distinctPartitions = groupTopicPartitions.map(_.tp).toSet val groupOffsetsFuture = client.getGroupOffsets(now, groups, groupTopicPartitions) + val earliestOffsetsTry = client.getEarliestOffsets(now, distinctPartitions) val latestOffsetsTry = client.getLatestOffsets(now, distinctPartitions) for { groupOffsets <- groupOffsetsFuture + Success(earliestOffsets) <- Future.successful(earliestOffsetsTry) Success(latestOffsets) <- Future.successful(latestOffsetsTry) - } yield OffsetsSnapshot(now, groups, latestOffsets, groupOffsets) + } yield OffsetsSnapshot(now, groups, earliestOffsets, latestOffsets, groupOffsets) } context.log.info("Collecting offsets") @@ -151,6 +162,7 @@ object ConsumerGroupCollector { refreshLookupTable(state, snapshot, evictedTps) context.log.info("Reporting offsets") + reportEarliestOffsetMetrics(config, reporter, snapshot) reportLatestOffsetMetrics(config, reporter, state.topicPartitionTables) reportConsumerGroupMetrics(config, reporter, snapshot, state.topicPartitionTables) @@ -226,6 +238,16 @@ object ConsumerGroupCollector { } } + private def reportEarliestOffsetMetrics( + config: CollectorConfig, + reporter: ActorRef[MetricsSink.Message], + offsetsSnapshot: OffsetsSnapshot + ): Unit = { + for {(tp, topicPoint) <- offsetsSnapshot.earliestOffsets} yield { + reporter ! Metrics.TopicPartitionValueMessage(Metrics.EarliestOffsetMetric, config.cluster.name, tp, topicPoint.offset) + } + } + private def reportLatestOffsetMetrics( config: CollectorConfig, reporter: ActorRef[MetricsSink.Message], diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 4afb34da..5d27925c 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -38,6 +38,7 @@ object KafkaClient { trait KafkaClientContract { def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])] def getGroupOffsets(now: Long, groups: List[String], groupTopicPartitions: List[Domain.GroupTopicPartition]): Future[GroupOffsets] + def getEarliestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] def getLatestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] def close(): Unit } @@ -118,6 +119,7 @@ object KafkaClient { } trait ConsumerKafkaClientContract { + def beginningOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] def close(): Unit } @@ -125,6 +127,8 @@ object KafkaClient { class ConsumerKafkaClient private[kafkalagexporter](consumer: KafkaConsumer[Byte,Byte], clientTimeout: FiniteDuration) extends ConsumerKafkaClientContract { private val _clientTimeout: Duration = clientTimeout.toJava + def beginningOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] = + consumer.beginningOffsets(partitions, _clientTimeout) def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] = consumer.endOffsets(partitions, _clientTimeout) def close(): Unit = consumer.close(_clientTimeout) @@ -170,6 +174,14 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, groupTopicPartitions.toList } + /** + * Get earliest offsets for a set of topic partitions. + */ + def getEarliestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] = Try { + val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.beginningOffsets(topicPartitions.map(_.asKafka).asJava) + topicPartitions.map(tp => tp -> LookupTable.Point(offsets.get(tp.asKafka).toLong,now)).toMap + } + /** * Get latest offsets for a set of topic partitions. */ diff --git a/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala b/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala index 929f7b91..15ea4114 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala @@ -62,6 +62,12 @@ object Metrics { topicPartitionLabels ) + val EarliestOffsetMetric = GaugeDefinition( + "kafka_partition_earliest_offset", + "Earliest offset of a partition", + topicPartitionLabels + ) + val groupLabels = List("cluster_name", "group") val MaxGroupOffsetLagMetric = GaugeDefinition( @@ -98,6 +104,7 @@ object Metrics { val definitions = List( LatestOffsetMetric, + EarliestOffsetMetric, MaxGroupOffsetLagMetric, MaxGroupTimeLagMetric, LastGroupOffsetMetric, diff --git a/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala index d4c5cb45..71a4c9b6 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala @@ -38,14 +38,20 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state) val testKit = BehaviorTestKit(behavior) + val newEarliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = timestampNow)) val newLatestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = timestampNow)) val newLastGroupOffsets = GroupOffsets(gtpSingleMember -> Some(Point(offset = 180, time = timestampNow))) - testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets)) + testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets)) val metrics = reporter.receiveAll() - "report 6 metrics" in { metrics.length shouldBe 6 } + "report 7 metrics" in { metrics.length shouldBe 7 } + + "earliest offset metric" in { + metrics should contain( + Metrics.TopicPartitionValueMessage(EarliestOffsetMetric, config.cluster.name, topicPartition0, value = 0)) + } "latest offset metric" in { metrics should contain( @@ -91,6 +97,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state) val testKit = BehaviorTestKit(behavior) + val newEarliestOffsets = PartitionOffsets( + topicPartition0 -> Point(offset = 0, time = 200), + topicPartition1 -> Point(offset = 0, time = 200), + topicPartition2 -> Point(offset = 0, time = 200) + ) val newLatestOffsets = PartitionOffsets( topicPartition0 -> Point(offset = 200, time = 200), topicPartition1 -> Point(offset = 200, time = 200), @@ -102,7 +113,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi gtp2 -> Some(Point(offset = 180, time = 200)), ) - testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets)) + testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets)) val metrics = reporter.receiveAll() @@ -128,10 +139,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state) val testKit = BehaviorTestKit(behavior) + val newEarliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = timestampNow)) val newLatestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = timestampNow)) val newLastGroupOffsets = GroupOffsets(gtpSingleMember -> None) - testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets)) + testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets)) val metrics = reporter.receiveAll() @@ -167,6 +179,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi }.nonEmpty shouldBe true } + "earliest offset metric" in { + metrics should contain( + Metrics.TopicPartitionValueMessage(EarliestOffsetMetric, config.cluster.name, topicPartition0, value = 0)) + } + "latest offset metric" in { metrics should contain( Metrics.TopicPartitionValueMessage(LatestOffsetMetric, config.cluster.name, topicPartition0, value = 200)) @@ -184,6 +201,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi lastSnapshot = Some(ConsumerGroupCollector.OffsetsSnapshot( timestamp = lastTimestamp, groups = List(groupId), + earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = lastTimestamp)), latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = lastTimestamp)), lastGroupOffsets = GroupOffsets(gtpSingleMember -> Some(Point(offset = 180, time = lastTimestamp))) )) @@ -198,6 +216,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val snapshot = ConsumerGroupCollector.OffsetsSnapshot( timestamp = timestampNow, groups = List(groupId), + earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = 200)), latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = 200)), lastGroupOffsets = GroupOffsets(gtpSingleMember.copy(consumerId = newConsumerId) -> Some(Point(offset = 180, time = 200))) ) @@ -219,6 +238,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val snapshot = ConsumerGroupCollector.OffsetsSnapshot( timestamp = timestampNow, groups = List(newGroupId), + earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = 200)), latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = 200)), lastGroupOffsets = GroupOffsets(gtpSingleMember.copy(id = newGroupId) -> Some(Point(offset = 180, time = 200))) ) @@ -245,6 +265,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val snapshot = ConsumerGroupCollector.OffsetsSnapshot( timestamp = timestampNow, groups = List(), + earliestOffsets = PartitionOffsets(), latestOffsets = PartitionOffsets(), lastGroupOffsets = GroupOffsets() )