Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Kafka notifier: do not wait until timeout when there is no backlog to process, ie. no message in the partition queue #1315

Merged
merged 3 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var config *sarama.Config
var offsetDuration time.Duration
var partitionStr string
var partitions []int32
var bootTimeOffsets map[int32]int64
var backlogProcessTimeout time.Duration
var backlogProcessTimeoutStr string
var partitionOffset map[int32]*stats.Gauge64
Expand Down Expand Up @@ -125,16 +124,7 @@ func ConfigProcess(instance string) {
partitionLogSize = make(map[int32]*stats.Gauge64)
partitionLag = make(map[int32]*stats.Gauge64)

// get the "newest" offset for all partitions.
// when booting up, we will delay consuming metrics until we have
// caught up to these offsets.
bootTimeOffsets = make(map[int32]int64)
for _, part := range partitions {
offset, err := client.GetOffset(topic, part, sarama.OffsetNewest)
if err != nil {
log.Fatalf("kafka-cluster: failed to get newest offset for topic %s part %d: %s", topic, part, err)
}
bootTimeOffsets[part] = offset
// metric cluster.notifier.kafka.partition.%d.offset is the current offset for the partition (%d) that we have consumed
partitionOffset[part] = stats.NewGauge64(fmt.Sprintf("cluster.notifier.kafka.partition.%d.offset", part))
// metric cluster.notifier.kafka.partition.%d.log_size is the size of the kafka partition (%d), aka the newest available offset.
Expand Down
98 changes: 56 additions & 42 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,24 @@ func New(instance string, handler mdata.NotifierHandler) *NotifierKafka {
}

func (c *NotifierKafka) start() {
var err error
pre := time.Now()
processBacklog := new(sync.WaitGroup)
for _, partition := range partitions {
var offset int64
var offsetTime int64
switch offsetStr {
case "oldest":
offset = -2
offsetTime = sarama.OffsetOldest
case "newest":
offset = -1
offsetTime = sarama.OffsetNewest
default:
offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
if err != nil {
offset = sarama.OffsetOldest
log.Warnf("kafka-cluster: failed to get offset %s: %s -> will use oldest instead", offsetDuration, err)
}
offsetTime = time.Now().Add(-1*offsetDuration).UnixNano() / int64(time.Millisecond)
}
partitionLogSize[partition].Set(int(bootTimeOffsets[partition]))
if offset >= 0 {
partitionOffset[partition].Set(int(offset))
partitionLag[partition].Set(int(bootTimeOffsets[partition] - offset))
startOffset, err := c.client.GetOffset(topic, partition, offsetTime)
if err != nil {
log.Fatalf("kafka-cluster: failed to get offset %d: %s", offsetTime, err)
}
processBacklog.Add(1)
go c.consumePartition(topic, partition, offset, processBacklog)
go c.consumePartition(topic, partition, startOffset, processBacklog)
}
// wait for our backlog to be processed before returning. This will block metrictank from consuming metrics until
// we have processed old metricPersist messages. The end result is that we wont overwrite chunks in cassandra that
Expand All @@ -111,50 +105,70 @@ func (c *NotifierKafka) start() {

}

func (c *NotifierKafka) consumePartition(topic string, partition int32, currentOffset int64, processBacklog *sync.WaitGroup) {
func (c *NotifierKafka) updateProcessBacklog(lastReadOffset int64, lastAvailableOffsetAtStartup int64, processBacklog *sync.WaitGroup) bool {
if lastReadOffset >= lastAvailableOffsetAtStartup {
processBacklog.Done()
return true
}
return false
}

func (c *NotifierKafka) getLastAvailableOffset(topic string, partition int32) (lastAvailableOffset int64, err error) {
nextOffset, err := c.client.GetOffset(topic, partition, sarama.OffsetNewest)

if err != nil {
log.Errorf("kafka-cluster failed to get offset of last available message in partition %s:%d. %s", topic, partition, err)
lastAvailableOffset = -1
} else {
// nextOffset is the offset of the message that will be produced next. There is no
// message with that offset that we can consume yet
lastAvailableOffset = nextOffset - 1
}

return
}

func (c *NotifierKafka) updateMetrics(topic string, partition int32, lastReadOffset int64) {
lastAvailableOffset, err := c.getLastAvailableOffset(topic, partition)
if err == nil {
partitionLogSize[partition].Set(int(lastAvailableOffset + 1))
partitionLag[partition].Set(int(lastAvailableOffset - lastReadOffset))
}
partitionOffset[partition].Set(int(lastReadOffset))
}

func (c *NotifierKafka) consumePartition(topic string, partition int32, startOffset int64, processBacklog *sync.WaitGroup) {
c.wg.Add(1)
defer c.wg.Done()

pc, err := c.consumer.ConsumePartition(topic, partition, currentOffset)
pc, err := c.consumer.ConsumePartition(topic, partition, startOffset)
if err != nil {
log.Fatalf("kafka-cluster: failed to start partitionConsumer for %s:%d. %s", topic, partition, err)
}
log.Infof("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, currentOffset)
log.Infof("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, startOffset)

messages := pc.Messages()
ticker := time.NewTicker(5 * time.Second)
startingUp := true
// the bootTimeOffset is the next available offset. There may not be a message with that
// offset yet, so we subtract 1 to get the highest offset that we can fetch.
bootTimeOffset := bootTimeOffsets[partition] - 1
partitionOffsetMetric := partitionOffset[partition]
partitionLogSizeMetric := partitionLogSize[partition]
partitionLagMetric := partitionLag[partition]

lastReadOffset := startOffset - 1
lastAvailableOffsetAtStartup, err := c.getLastAvailableOffset(topic, partition)
if err != nil {
log.Fatalf("kafka-cluster: failed to get newest offset for topic %s part %d: %s", topic, partition, err)
}
backlogProcessed := c.updateProcessBacklog(lastReadOffset, lastAvailableOffsetAtStartup, processBacklog)
c.updateMetrics(topic, partition, lastReadOffset)

for {
select {
case msg := <-messages:
log.Debugf("kafka-cluster: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
c.handler.Handle(msg.Value)
currentOffset = msg.Offset
lastReadOffset = msg.Offset
case <-ticker.C:
if startingUp && currentOffset >= bootTimeOffset {
processBacklog.Done()
startingUp = false
}
offset, err := c.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Errorf("kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err)
} else {
partitionLogSizeMetric.Set(int(offset))
}
if currentOffset < 0 {
// we have not yet consumed any messages.
continue
}
partitionOffsetMetric.Set(int(currentOffset))
if err == nil {
partitionLagMetric.Set(int(offset - currentOffset))
if !backlogProcessed {
backlogProcessed = c.updateProcessBacklog(lastReadOffset, lastAvailableOffsetAtStartup, processBacklog)
}
c.updateMetrics(topic, partition, lastReadOffset)
case <-c.stopConsuming:
pc.Close()
log.Infof("kafka-cluster: consumer for %s:%d ended.", topic, partition)
Expand Down