Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Added collection and export of Earliest Offset #86

Merged
merged 2 commits into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ Labels: `cluster_name, topic, partition`

The latest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the last produced offset. The last produced offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the last produced offset and the last consumed offset in certain panels.

**`kafka_partition_earliest_offset`**

Labels: `cluster_name, topic, partition`

The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels.


### Labels

Each metric may include the following labels when reported. If you define the `labels` property for configuration of a cluster then those labels will also be included.
Expand Down Expand Up @@ -477,6 +484,10 @@ required. Before running a release make sure the following pre-req's are met.

## Change log

0.5.5

* Added kafka_partition_earliest_offset metric for determining the volume of offsets stored in Kafka.

0.5.4

* Bugfix: Accidentally released with local repo.
Expand Down
2 changes: 1 addition & 1 deletion scripts/update_readme_version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ echo helm install $RELEASE_URL
sed -i -r "s/helm install http\S*/helm install ${RELEASE_URL_ESCAPED}/g" $DIR/../README.md

echo Update README.md Running Docker Image version to:
echo lightbend/kafka-lag-exporter:0.5.0
echo lightbend/kafka-lag-exporter:${VERSION}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is meant to be variable.

sed -i -r "s/lightbend\/kafka-lag-exporter:\S*/lightbend\/kafka-lag-exporter:${VERSION}/g" $DIR/../README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object ConsumerGroupCollector {
final case class OffsetsSnapshot(
timestamp: Long,
groups: List[String],
earliestOffsets: PartitionOffsets,
latestOffsets: PartitionOffsets,
lastGroupOffsets: GroupOffsets
) extends Message {
Expand All @@ -43,6 +44,10 @@ object ConsumerGroupCollector {
}

override def toString: String = {
val earliestOffsetHeader = TpoFormat.format("Topic", "Partition", "Earliest")
val earliestOffsetsStr = earliestOffsets.map {
case (TopicPartition(t, p), LookupTable.Point(offset, _)) => TpoFormat.format(t,p,offset)
}
val latestOffsetHeader = TpoFormat.format("Topic", "Partition", "Offset")
val latestOffsetsStr = latestOffsets.map {
case (TopicPartition(t, p), LookupTable.Point(offset, _)) => TpoFormat.format(t,p,offset)
Expand All @@ -56,6 +61,9 @@ object ConsumerGroupCollector {
s"""
|Timestamp: $timestamp
|Groups: ${groups.mkString(",")}
|Earliest Offsets:
|$earliestOffsetHeader
|${earliestOffsetsStr.mkString("\n")}
|Latest Offsets:
|$latestOffsetHeader
|${latestOffsetsStr.mkString("\n")}
Expand Down Expand Up @@ -89,7 +97,8 @@ object ConsumerGroupCollector {

context.self ! Collect

val collectorState = CollectorState(topicPartitionTables = Domain.TopicPartitionTable(config.lookupTableSize))
val collectorState = CollectorState(
topicPartitionTables = Domain.TopicPartitionTable(config.lookupTableSize))
collector(config, clientCreator(config.cluster), reporter, collectorState)
}
}.onFailure(SupervisorStrategy.restartWithBackoff(1 seconds, 10 seconds, 0.2))
Expand All @@ -116,12 +125,14 @@ object ConsumerGroupCollector {
val distinctPartitions = groupTopicPartitions.map(_.tp).toSet

val groupOffsetsFuture = client.getGroupOffsets(now, groups, groupTopicPartitions)
val earliestOffsetsTry = client.getEarliestOffsets(now, distinctPartitions)
val latestOffsetsTry = client.getLatestOffsets(now, distinctPartitions)

for {
groupOffsets <- groupOffsetsFuture
Success(earliestOffsets) <- Future.successful(earliestOffsetsTry)
Success(latestOffsets) <- Future.successful(latestOffsetsTry)
} yield OffsetsSnapshot(now, groups, latestOffsets, groupOffsets)
} yield OffsetsSnapshot(now, groups, earliestOffsets, latestOffsets, groupOffsets)
}

context.log.info("Collecting offsets")
Expand Down Expand Up @@ -151,6 +162,7 @@ object ConsumerGroupCollector {
refreshLookupTable(state, snapshot, evictedTps)

context.log.info("Reporting offsets")
reportEarliestOffsetMetrics(config, reporter, snapshot)
reportLatestOffsetMetrics(config, reporter, state.topicPartitionTables)
reportConsumerGroupMetrics(config, reporter, snapshot, state.topicPartitionTables)

Expand Down Expand Up @@ -226,6 +238,16 @@ object ConsumerGroupCollector {
}
}

private def reportEarliestOffsetMetrics(
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
offsetsSnapshot: OffsetsSnapshot
): Unit = {
for {(tp, topicPoint) <- offsetsSnapshot.earliestOffsets} yield {
reporter ! Metrics.TopicPartitionValueMessage(Metrics.EarliestOffsetMetric, config.cluster.name, tp, topicPoint.offset)
}
}

private def reportLatestOffsetMetrics(
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
Expand Down
12 changes: 12 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object KafkaClient {
trait KafkaClientContract {
def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])]
def getGroupOffsets(now: Long, groups: List[String], groupTopicPartitions: List[Domain.GroupTopicPartition]): Future[GroupOffsets]
def getEarliestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets]
def getLatestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets]
def close(): Unit
}
Expand Down Expand Up @@ -118,13 +119,16 @@ object KafkaClient {
}

trait ConsumerKafkaClientContract {
def beginningOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long]
def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long]
def close(): Unit
}

class ConsumerKafkaClient private[kafkalagexporter](consumer: KafkaConsumer[Byte,Byte], clientTimeout: FiniteDuration) extends ConsumerKafkaClientContract {
private val _clientTimeout: Duration = clientTimeout.toJava

def beginningOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] =
consumer.beginningOffsets(partitions, _clientTimeout)
def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] =
consumer.endOffsets(partitions, _clientTimeout)
def close(): Unit = consumer.close(_clientTimeout)
Expand Down Expand Up @@ -170,6 +174,14 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
groupTopicPartitions.toList
}

/**
* Get earliest offsets for a set of topic partitions.
*/
def getEarliestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] = Try {
val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.beginningOffsets(topicPartitions.map(_.asKafka).asJava)
topicPartitions.map(tp => tp -> LookupTable.Point(offsets.get(tp.asKafka).toLong,now)).toMap
}

/**
* Get latest offsets for a set of topic partitions.
*/
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ object Metrics {
topicPartitionLabels
)

val EarliestOffsetMetric = GaugeDefinition(
"kafka_partition_earliest_offset",
"Earliest offset of a partition",
topicPartitionLabels
)

val groupLabels = List("cluster_name", "group")

val MaxGroupOffsetLagMetric = GaugeDefinition(
Expand Down Expand Up @@ -98,6 +104,7 @@ object Metrics {

val definitions = List(
LatestOffsetMetric,
EarliestOffsetMetric,
MaxGroupOffsetLagMetric,
MaxGroupTimeLagMetric,
LastGroupOffsetMetric,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = timestampNow))
val newLatestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = timestampNow))
val newLastGroupOffsets = GroupOffsets(gtpSingleMember -> Some(Point(offset = 180, time = timestampNow)))

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets))
testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

"report 6 metrics" in { metrics.length shouldBe 6 }
"report 7 metrics" in { metrics.length shouldBe 7 }

"earliest offset metric" in {
metrics should contain(
Metrics.TopicPartitionValueMessage(EarliestOffsetMetric, config.cluster.name, topicPartition0, value = 0))
}

"latest offset metric" in {
metrics should contain(
Expand Down Expand Up @@ -91,6 +97,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 0, time = 200),
topicPartition1 -> Point(offset = 0, time = 200),
topicPartition2 -> Point(offset = 0, time = 200)
)
val newLatestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 200, time = 200),
topicPartition1 -> Point(offset = 200, time = 200),
Expand All @@ -102,7 +113,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
gtp2 -> Some(Point(offset = 180, time = 200)),
)

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets))
testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

Expand All @@ -128,10 +139,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = timestampNow))
val newLatestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = timestampNow))
val newLastGroupOffsets = GroupOffsets(gtpSingleMember -> None)

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets))
testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

Expand Down Expand Up @@ -167,6 +179,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
}.nonEmpty shouldBe true
}

"earliest offset metric" in {
metrics should contain(
Metrics.TopicPartitionValueMessage(EarliestOffsetMetric, config.cluster.name, topicPartition0, value = 0))
}

"latest offset metric" in {
metrics should contain(
Metrics.TopicPartitionValueMessage(LatestOffsetMetric, config.cluster.name, topicPartition0, value = 200))
Expand All @@ -184,6 +201,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
lastSnapshot = Some(ConsumerGroupCollector.OffsetsSnapshot(
timestamp = lastTimestamp,
groups = List(groupId),
earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = lastTimestamp)),
latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = lastTimestamp)),
lastGroupOffsets = GroupOffsets(gtpSingleMember -> Some(Point(offset = 180, time = lastTimestamp)))
))
Expand All @@ -198,6 +216,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val snapshot = ConsumerGroupCollector.OffsetsSnapshot(
timestamp = timestampNow,
groups = List(groupId),
earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = 200)),
latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = 200)),
lastGroupOffsets = GroupOffsets(gtpSingleMember.copy(consumerId = newConsumerId) -> Some(Point(offset = 180, time = 200)))
)
Expand All @@ -219,6 +238,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val snapshot = ConsumerGroupCollector.OffsetsSnapshot(
timestamp = timestampNow,
groups = List(newGroupId),
earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = 200)),
latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = 200)),
lastGroupOffsets = GroupOffsets(gtpSingleMember.copy(id = newGroupId) -> Some(Point(offset = 180, time = 200)))
)
Expand All @@ -245,6 +265,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val snapshot = ConsumerGroupCollector.OffsetsSnapshot(
timestamp = timestampNow,
groups = List(),
earliestOffsets = PartitionOffsets(),
latestOffsets = PartitionOffsets(),
lastGroupOffsets = GroupOffsets()
)
Expand Down