Skip to content

Commit

Permalink
Added collection and export of Earliest Offset (seglo#86)
Browse files Browse the repository at this point in the history
* Added collection and export of Earliest Offset

kafka_partition_earliest_offset is now gathered and exported for all of the topic partitions which had been gathering the latest offset.

* referencing 0.5.4 in readme
  • Loading branch information
graphex authored and anbarasan committed Nov 24, 2019
1 parent 2b06f41 commit fb075d4
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 7 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ Labels: `cluster_name, topic, partition`

The latest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the last produced offset. The last produced offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the last produced offset and the last consumed offset in certain panels.

**`kafka_partition_earliest_offset`**

Labels: `cluster_name, topic, partition`

The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels.


### Labels

Each metric may include the following labels when reported. If you define the `labels` property for configuration of a cluster then those labels will also be included.
Expand Down Expand Up @@ -477,6 +484,10 @@ required. Before running a release make sure the following pre-req's are met.

## Change log

0.5.5

* Added kafka_partition_earliest_offset metric for determining the volume of offsets stored in Kafka.

0.5.4

* Bugfix: Accidentally released with local repo.
Expand Down
2 changes: 1 addition & 1 deletion scripts/update_readme_version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ echo helm install $RELEASE_URL
sed -i -r "s/helm install http\S*/helm install ${RELEASE_URL_ESCAPED}/g" $DIR/../README.md

echo Update README.md Running Docker Image version to:
echo lightbend/kafka-lag-exporter:0.5.0
echo lightbend/kafka-lag-exporter:${VERSION}
sed -i -r "s/lightbend\/kafka-lag-exporter:\S*/lightbend\/kafka-lag-exporter:${VERSION}/g" $DIR/../README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object ConsumerGroupCollector {
final case class OffsetsSnapshot(
timestamp: Long,
groups: List[String],
earliestOffsets: PartitionOffsets,
latestOffsets: PartitionOffsets,
lastGroupOffsets: GroupOffsets
) extends Message {
Expand All @@ -44,6 +45,10 @@ object ConsumerGroupCollector {
}

override def toString: String = {
val earliestOffsetHeader = TpoFormat.format("Topic", "Partition", "Earliest")
val earliestOffsetsStr = earliestOffsets.map {
case (TopicPartition(t, p), LookupTable.Point(offset, _)) => TpoFormat.format(t,p,offset)
}
val latestOffsetHeader = TpoFormat.format("Topic", "Partition", "Offset")
val latestOffsetsStr = latestOffsets.map {
case (TopicPartition(t, p), LookupTable.Point(offset, _)) => TpoFormat.format(t,p,offset)
Expand All @@ -57,6 +62,9 @@ object ConsumerGroupCollector {
s"""
|Timestamp: $timestamp
|Groups: ${groups.mkString(",")}
|Earliest Offsets:
|$earliestOffsetHeader
|${earliestOffsetsStr.mkString("\n")}
|Latest Offsets:
|$latestOffsetHeader
|${latestOffsetsStr.mkString("\n")}
Expand Down Expand Up @@ -90,7 +98,8 @@ object ConsumerGroupCollector {

context.self ! Collect

val collectorState = CollectorState(topicPartitionTables = Domain.TopicPartitionTable(config.lookupTableSize))
val collectorState = CollectorState(
topicPartitionTables = Domain.TopicPartitionTable(config.lookupTableSize))
collector(config, clientCreator(config.cluster), reporter, collectorState)
}
}.onFailure(SupervisorStrategy.restartWithBackoff(1 seconds, 10 seconds, 0.2))
Expand All @@ -117,12 +126,14 @@ object ConsumerGroupCollector {
val distinctPartitions = groupTopicPartitions.map(_.tp).toSet

val groupOffsetsFuture = client.getGroupOffsets(now, groups, groupTopicPartitions)
val earliestOffsetsTry = client.getEarliestOffsets(now, distinctPartitions)
val latestOffsetsTry = client.getLatestOffsets(now, distinctPartitions)

for {
groupOffsets <- groupOffsetsFuture
Success(earliestOffsets) <- Future.successful(earliestOffsetsTry)
Success(latestOffsets) <- Future.successful(latestOffsetsTry)
} yield OffsetsSnapshot(now, groups, latestOffsets, groupOffsets)
} yield OffsetsSnapshot(now, groups, earliestOffsets, latestOffsets, groupOffsets)
}

context.log.info("Collecting offsets")
Expand Down Expand Up @@ -156,6 +167,7 @@ object ConsumerGroupCollector {
refreshLookupTable(state, snapshot, evictedTps)

context.log.info("Reporting offsets")
reportEarliestOffsetMetrics(config, reporter, snapshot)
reportLatestOffsetMetrics(config, reporter, state.topicPartitionTables)
reportConsumerGroupMetrics(config, reporter, snapshot, state.topicPartitionTables)

Expand Down Expand Up @@ -239,6 +251,16 @@ object ConsumerGroupCollector {
}
}

private def reportEarliestOffsetMetrics(
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
offsetsSnapshot: OffsetsSnapshot
): Unit = {
for {(tp, topicPoint) <- offsetsSnapshot.earliestOffsets} yield {
reporter ! Metrics.TopicPartitionValueMessage(Metrics.EarliestOffsetMetric, config.cluster.name, tp, topicPoint.offset)
}
}

private def reportLatestOffsetMetrics(
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
Expand Down
12 changes: 12 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object KafkaClient {
trait KafkaClientContract {
def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])]
def getGroupOffsets(now: Long, groups: List[String], groupTopicPartitions: List[Domain.GroupTopicPartition]): Future[GroupOffsets]
def getEarliestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets]
def getLatestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets]
def close(): Unit
}
Expand Down Expand Up @@ -118,13 +119,16 @@ object KafkaClient {
}

trait ConsumerKafkaClientContract {
def beginningOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long]
def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long]
def close(): Unit
}

class ConsumerKafkaClient private[kafkalagexporter](consumer: KafkaConsumer[Byte,Byte], clientTimeout: FiniteDuration) extends ConsumerKafkaClientContract {
private val _clientTimeout: Duration = clientTimeout.toJava

def beginningOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] =
consumer.beginningOffsets(partitions, _clientTimeout)
def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] =
consumer.endOffsets(partitions, _clientTimeout)
def close(): Unit = consumer.close(_clientTimeout)
Expand Down Expand Up @@ -170,6 +174,14 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
groupTopicPartitions.toList
}

/**
* Get earliest offsets for a set of topic partitions.
*/
def getEarliestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] = Try {
val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.beginningOffsets(topicPartitions.map(_.asKafka).asJava)
topicPartitions.map(tp => tp -> LookupTable.Point(offsets.get(tp.asKafka).toLong,now)).toMap
}

/**
* Get latest offsets for a set of topic partitions.
*/
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ object Metrics {
topicPartitionLabels
)

val EarliestOffsetMetric = GaugeDefinition(
"kafka_partition_earliest_offset",
"Earliest offset of a partition",
topicPartitionLabels
)

val groupLabels = List("cluster_name", "group")

val ClusterLabels = List("cluster_name")
Expand Down Expand Up @@ -118,6 +124,7 @@ object Metrics {

val definitions = List(
LatestOffsetMetric,
EarliestOffsetMetric,
MaxGroupOffsetLagMetric,
MaxGroupTimeLagMetric,
LastGroupOffsetMetric,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = timestampNow))
val newLatestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = timestampNow))
val newLastGroupOffsets = GroupOffsets(gtpSingleMember -> Some(Point(offset = 180, time = timestampNow)))

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets))
testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

"report 6 metrics" in { metrics.length shouldBe 6 }
"report 7 metrics" in { metrics.length shouldBe 7 }

"earliest offset metric" in {
metrics should contain(
Metrics.TopicPartitionValueMessage(EarliestOffsetMetric, config.cluster.name, topicPartition0, value = 0))
}

"latest offset metric" in {
metrics should contain(
Expand Down Expand Up @@ -91,6 +97,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 0, time = 200),
topicPartition1 -> Point(offset = 0, time = 200),
topicPartition2 -> Point(offset = 0, time = 200)
)
val newLatestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 200, time = 200),
topicPartition1 -> Point(offset = 200, time = 200),
Expand All @@ -102,7 +113,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
gtp2 -> Some(Point(offset = 180, time = 200)),
)

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets))
testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

Expand All @@ -128,10 +139,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = timestampNow))
val newLatestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = timestampNow))
val newLastGroupOffsets = GroupOffsets(gtpSingleMember -> None)

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newLatestOffsets, newLastGroupOffsets))
testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

Expand Down Expand Up @@ -167,6 +179,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
}.nonEmpty shouldBe true
}

"earliest offset metric" in {
metrics should contain(
Metrics.TopicPartitionValueMessage(EarliestOffsetMetric, config.cluster.name, topicPartition0, value = 0))
}

"latest offset metric" in {
metrics should contain(
Metrics.TopicPartitionValueMessage(LatestOffsetMetric, config.cluster.name, topicPartition0, value = 200))
Expand All @@ -184,6 +201,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
lastSnapshot = Some(ConsumerGroupCollector.OffsetsSnapshot(
timestamp = lastTimestamp,
groups = List(groupId),
earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = lastTimestamp)),
latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = lastTimestamp)),
lastGroupOffsets = GroupOffsets(gtpSingleMember -> Some(Point(offset = 180, time = lastTimestamp)))
))
Expand All @@ -198,6 +216,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val snapshot = ConsumerGroupCollector.OffsetsSnapshot(
timestamp = timestampNow,
groups = List(groupId),
earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = 200)),
latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = 200)),
lastGroupOffsets = GroupOffsets(gtpSingleMember.copy(consumerId = newConsumerId) -> Some(Point(offset = 180, time = 200)))
)
Expand All @@ -219,6 +238,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val snapshot = ConsumerGroupCollector.OffsetsSnapshot(
timestamp = timestampNow,
groups = List(newGroupId),
earliestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 0, time = 200)),
latestOffsets = PartitionOffsets(topicPartition0 -> Point(offset = 200, time = 200)),
lastGroupOffsets = GroupOffsets(gtpSingleMember.copy(id = newGroupId) -> Some(Point(offset = 180, time = 200)))
)
Expand All @@ -245,6 +265,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val snapshot = ConsumerGroupCollector.OffsetsSnapshot(
timestamp = timestampNow,
groups = List(),
earliestOffsets = PartitionOffsets(),
latestOffsets = PartitionOffsets(),
lastGroupOffsets = GroupOffsets()
)
Expand Down

0 comments on commit fb075d4

Please sign in to comment.