diff --git a/input/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go index 797dd22216..4f6eaf1716 100644 --- a/input/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -224,15 +224,19 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset partitionOffsetMetric := partitionOffset[partition] partitionLogSizeMetric := partitionLogSize[partition] partitionLagMetric := partitionLag[partition] - - partitionOffsetMetric.Set(int(currentOffset)) - // we need the currentLogSize to be able to record our inital Lag. offset, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest) if err != nil { log.Error(3, "kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err) } else { partitionLogSizeMetric.Set(int(offset)) - partitionLagMetric.Set(int(offset - currentOffset)) + } + if currentOffset >= 0 { + // we cant set the offsetMetrics until we know what offset we are at. + partitionOffsetMetric.Set(int(currentOffset)) + // we need the currentLogSize to be able to record our inital Lag. + if err == nil { + partitionLagMetric.Set(int(offset - currentOffset)) + } } log.Info("kafka-mdm: consuming from %s:%d from offset %d", topic, partition, currentOffset) @@ -251,12 +255,18 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil { log.Error(3, "kafka-mdm failed to commit offset for %s:%d, %s", topic, partition, err) } - partitionOffsetMetric.Set(int(currentOffset)) offset, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest) if err != nil { log.Error(3, "kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err) } else { partitionLogSizeMetric.Set(int(offset)) + } + if currentOffset < 0 { + // we have not yet consumed any messages. + continue + } + partitionOffsetMetric.Set(int(currentOffset)) + if err == nil { partitionLagMetric.Set(int(offset - currentOffset)) } case <-k.stopConsuming: