Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
dont record offsetMetrics until we know our current offset
Browse files Browse the repository at this point in the history
  • Loading branch information
woodsaj committed Jan 24, 2017
1 parent 9ac3c8f commit 13e27a5
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
partitionOffsetMetric := partitionOffset[partition]
partitionLogSizeMetric := partitionLogSize[partition]
partitionLagMetric := partitionLag[partition]

partitionOffsetMetric.Set(int(currentOffset))
// we need the currentLogSize to be able to record our inital Lag.
offset, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Error(3, "kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err)
} else {
partitionLogSizeMetric.Set(int(offset))
partitionLagMetric.Set(int(offset - currentOffset))
}
if currentOffset >= 0 {
// we cant set the offsetMetrics until we know what offset we are at.
partitionOffsetMetric.Set(int(currentOffset))
// we need the currentLogSize to be able to record our inital Lag.
if err == nil {
partitionLagMetric.Set(int(offset - currentOffset))
}
}

log.Info("kafka-mdm: consuming from %s:%d from offset %d", topic, partition, currentOffset)
Expand All @@ -251,12 +255,18 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Error(3, "kafka-mdm failed to commit offset for %s:%d, %s", topic, partition, err)
}
partitionOffsetMetric.Set(int(currentOffset))
offset, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Error(3, "kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err)
} else {
partitionLogSizeMetric.Set(int(offset))
}
if currentOffset < 0 {
// we have not yet consumed any messages.
continue
}
partitionOffsetMetric.Set(int(currentOffset))
if err == nil {
partitionLagMetric.Set(int(offset - currentOffset))
}
case <-k.stopConsuming:
Expand Down

0 comments on commit 13e27a5

Please sign in to comment.