diff --git a/mdata/notifierKafka/cfg.go b/mdata/notifierKafka/cfg.go index a147957f4f..f628246efb 100644 --- a/mdata/notifierKafka/cfg.go +++ b/mdata/notifierKafka/cfg.go @@ -24,7 +24,6 @@ var config *sarama.Config var offsetDuration time.Duration var partitionStr string var partitions []int32 -var bootTimeOffsets map[int32]int64 var backlogProcessTimeout time.Duration var backlogProcessTimeoutStr string var partitionOffset map[int32]*stats.Gauge64 @@ -125,16 +124,7 @@ func ConfigProcess(instance string) { partitionLogSize = make(map[int32]*stats.Gauge64) partitionLag = make(map[int32]*stats.Gauge64) - // get the "newest" offset for all partitions. - // when booting up, we will delay consuming metrics until we have - // caught up to these offsets. - bootTimeOffsets = make(map[int32]int64) for _, part := range partitions { - offset, err := client.GetOffset(topic, part, sarama.OffsetNewest) - if err != nil { - log.Fatalf("kafka-cluster: failed to get newest offset for topic %s part %d: %s", topic, part, err) - } - bootTimeOffsets[part] = offset // metric cluster.notifier.kafka.partition.%d.offset is the current offset for the partition (%d) that we have consumed partitionOffset[part] = stats.NewGauge64(fmt.Sprintf("cluster.notifier.kafka.partition.%d.offset", part)) // metric cluster.notifier.kafka.partition.%d.log_size is the size of the kafka partition (%d), aka the newest available offset. diff --git a/mdata/notifierKafka/notifierKafka.go b/mdata/notifierKafka/notifierKafka.go index 5145de4121..9c2889951b 100644 --- a/mdata/notifierKafka/notifierKafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -66,30 +66,24 @@ func New(instance string, handler mdata.NotifierHandler) *NotifierKafka { } func (c *NotifierKafka) start() { - var err error pre := time.Now() processBacklog := new(sync.WaitGroup) for _, partition := range partitions { - var offset int64 + var offsetTime int64 switch offsetStr { case "oldest": - offset = -2 + offsetTime = sarama.OffsetOldest case "newest": - offset = -1 + offsetTime = sarama.OffsetNewest default: - offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond)) - if err != nil { - offset = sarama.OffsetOldest - log.Warnf("kafka-cluster: failed to get offset %s: %s -> will use oldest instead", offsetDuration, err) - } + offsetTime = time.Now().Add(-1*offsetDuration).UnixNano() / int64(time.Millisecond) } - partitionLogSize[partition].Set(int(bootTimeOffsets[partition])) - if offset >= 0 { - partitionOffset[partition].Set(int(offset)) - partitionLag[partition].Set(int(bootTimeOffsets[partition] - offset)) + startOffset, err := c.client.GetOffset(topic, partition, offsetTime) + if err != nil { + log.Fatalf("kafka-cluster: failed to get offset %d: %s", offsetTime, err) } processBacklog.Add(1) - go c.consumePartition(topic, partition, offset, processBacklog) + go c.consumePartition(topic, partition, startOffset, processBacklog) } // wait for our backlog to be processed before returning. This will block metrictank from consuming metrics until // we have processed old metricPersist messages. The end result is that we wont overwrite chunks in cassandra that @@ -111,50 +105,70 @@ func (c *NotifierKafka) start() { } -func (c *NotifierKafka) consumePartition(topic string, partition int32, currentOffset int64, processBacklog *sync.WaitGroup) { +func (c *NotifierKafka) updateProcessBacklog(lastReadOffset int64, lastAvailableOffsetAtStartup int64, processBacklog *sync.WaitGroup) bool { + if lastReadOffset >= lastAvailableOffsetAtStartup { + processBacklog.Done() + return true + } + return false +} + +func (c *NotifierKafka) getLastAvailableOffset(topic string, partition int32) (lastAvailableOffset int64, err error) { + nextOffset, err := c.client.GetOffset(topic, partition, sarama.OffsetNewest) + + if err != nil { + log.Errorf("kafka-cluster failed to get offset of last available message in partition %s:%d. %s", topic, partition, err) + lastAvailableOffset = -1 + } else { + // nextOffset is the offset of the message that will be produced next. There is no + // message with that offset that we can consume yet + lastAvailableOffset = nextOffset - 1 + } + + return +} + +func (c *NotifierKafka) updateMetrics(topic string, partition int32, lastReadOffset int64) { + lastAvailableOffset, err := c.getLastAvailableOffset(topic, partition) + if err == nil { + partitionLogSize[partition].Set(int(lastAvailableOffset + 1)) + partitionLag[partition].Set(int(lastAvailableOffset - lastReadOffset)) + } + partitionOffset[partition].Set(int(lastReadOffset)) +} + +func (c *NotifierKafka) consumePartition(topic string, partition int32, startOffset int64, processBacklog *sync.WaitGroup) { c.wg.Add(1) defer c.wg.Done() - pc, err := c.consumer.ConsumePartition(topic, partition, currentOffset) + pc, err := c.consumer.ConsumePartition(topic, partition, startOffset) if err != nil { log.Fatalf("kafka-cluster: failed to start partitionConsumer for %s:%d. %s", topic, partition, err) } - log.Infof("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, currentOffset) + log.Infof("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, startOffset) messages := pc.Messages() ticker := time.NewTicker(5 * time.Second) - startingUp := true - // the bootTimeOffset is the next available offset. There may not be a message with that - // offset yet, so we subtract 1 to get the highest offset that we can fetch. - bootTimeOffset := bootTimeOffsets[partition] - 1 - partitionOffsetMetric := partitionOffset[partition] - partitionLogSizeMetric := partitionLogSize[partition] - partitionLagMetric := partitionLag[partition] + + lastReadOffset := startOffset - 1 + lastAvailableOffsetAtStartup, err := c.getLastAvailableOffset(topic, partition) + if err != nil { + log.Fatalf("kafka-cluster: failed to get newest offset for topic %s part %d: %s", topic, partition, err) + } + backlogProcessed := c.updateProcessBacklog(lastReadOffset, lastAvailableOffsetAtStartup, processBacklog) + c.updateMetrics(topic, partition, lastReadOffset) + for { select { case msg := <-messages: log.Debugf("kafka-cluster: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) c.handler.Handle(msg.Value) - currentOffset = msg.Offset + lastReadOffset = msg.Offset case <-ticker.C: - if startingUp && currentOffset >= bootTimeOffset { - processBacklog.Done() - startingUp = false - } - offset, err := c.client.GetOffset(topic, partition, sarama.OffsetNewest) - if err != nil { - log.Errorf("kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err) - } else { - partitionLogSizeMetric.Set(int(offset)) - } - if currentOffset < 0 { - // we have not yet consumed any messages. - continue - } - partitionOffsetMetric.Set(int(currentOffset)) - if err == nil { - partitionLagMetric.Set(int(offset - currentOffset)) + if !backlogProcessed { + backlogProcessed = c.updateProcessBacklog(lastReadOffset, lastAvailableOffsetAtStartup, processBacklog) } + c.updateMetrics(topic, partition, lastReadOffset) case <-c.stopConsuming: pc.Close() log.Infof("kafka-cluster: consumer for %s:%d ended.", topic, partition)