Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1315 from grafana/nowait_when_no_messages
Browse files Browse the repository at this point in the history
Rationalised NotifierKafka's backlog processing:

    keep bootTimeOffsets local to consumePartition and renamed it to lastAvailableOffsetAtStartup
    at startup invoke GetOffset() to retrieve the explicit index in all cases ('oldest', 'newest', timestamp)
    factored duplicated code into updateProcessBacklog() and updateMetrics()
    when consumePartition() starts, check if backlog has been processed; fixes all cases of backlog processing stuck when no messages need to be received.

Note: partitionLagMetric was previously set to GetOffset(..., OffsetNewest) - msg.Offset and is now set to GetOffset(..., OffsetNewest) - msg.Offset - 1. Other metrics remained unchanged.
  • Loading branch information
fkaleo authored May 23, 2019
2 parents ac7d3db + a211433 commit 146b5e6
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 52 deletions.
10 changes: 0 additions & 10 deletions mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var config *sarama.Config
var offsetDuration time.Duration
var partitionStr string
var partitions []int32
var bootTimeOffsets map[int32]int64
var backlogProcessTimeout time.Duration
var backlogProcessTimeoutStr string
var partitionOffset map[int32]*stats.Gauge64
Expand Down Expand Up @@ -125,16 +124,7 @@ func ConfigProcess(instance string) {
partitionLogSize = make(map[int32]*stats.Gauge64)
partitionLag = make(map[int32]*stats.Gauge64)

// get the "newest" offset for all partitions.
// when booting up, we will delay consuming metrics until we have
// caught up to these offsets.
bootTimeOffsets = make(map[int32]int64)
for _, part := range partitions {
offset, err := client.GetOffset(topic, part, sarama.OffsetNewest)
if err != nil {
log.Fatalf("kafka-cluster: failed to get newest offset for topic %s part %d: %s", topic, part, err)
}
bootTimeOffsets[part] = offset
// metric cluster.notifier.kafka.partition.%d.offset is the current offset for the partition (%d) that we have consumed
partitionOffset[part] = stats.NewGauge64(fmt.Sprintf("cluster.notifier.kafka.partition.%d.offset", part))
// metric cluster.notifier.kafka.partition.%d.log_size is the size of the kafka partition (%d), aka the newest available offset.
Expand Down
98 changes: 56 additions & 42 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,24 @@ func New(instance string, handler mdata.NotifierHandler) *NotifierKafka {
}

func (c *NotifierKafka) start() {
var err error
pre := time.Now()
processBacklog := new(sync.WaitGroup)
for _, partition := range partitions {
var offset int64
var offsetTime int64
switch offsetStr {
case "oldest":
offset = -2
offsetTime = sarama.OffsetOldest
case "newest":
offset = -1
offsetTime = sarama.OffsetNewest
default:
offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
if err != nil {
offset = sarama.OffsetOldest
log.Warnf("kafka-cluster: failed to get offset %s: %s -> will use oldest instead", offsetDuration, err)
}
offsetTime = time.Now().Add(-1*offsetDuration).UnixNano() / int64(time.Millisecond)
}
partitionLogSize[partition].Set(int(bootTimeOffsets[partition]))
if offset >= 0 {
partitionOffset[partition].Set(int(offset))
partitionLag[partition].Set(int(bootTimeOffsets[partition] - offset))
startOffset, err := c.client.GetOffset(topic, partition, offsetTime)
if err != nil {
log.Fatalf("kafka-cluster: failed to get offset %d: %s", offsetTime, err)
}
processBacklog.Add(1)
go c.consumePartition(topic, partition, offset, processBacklog)
go c.consumePartition(topic, partition, startOffset, processBacklog)
}
// wait for our backlog to be processed before returning. This will block metrictank from consuming metrics until
// we have processed old metricPersist messages. The end result is that we wont overwrite chunks in cassandra that
Expand All @@ -111,50 +105,70 @@ func (c *NotifierKafka) start() {

}

func (c *NotifierKafka) consumePartition(topic string, partition int32, currentOffset int64, processBacklog *sync.WaitGroup) {
func (c *NotifierKafka) updateProcessBacklog(lastReadOffset int64, lastAvailableOffsetAtStartup int64, processBacklog *sync.WaitGroup) bool {
if lastReadOffset >= lastAvailableOffsetAtStartup {
processBacklog.Done()
return true
}
return false
}

func (c *NotifierKafka) getLastAvailableOffset(topic string, partition int32) (lastAvailableOffset int64, err error) {
nextOffset, err := c.client.GetOffset(topic, partition, sarama.OffsetNewest)

if err != nil {
log.Errorf("kafka-cluster failed to get offset of last available message in partition %s:%d. %s", topic, partition, err)
lastAvailableOffset = -1
} else {
// nextOffset is the offset of the message that will be produced next. There is no
// message with that offset that we can consume yet
lastAvailableOffset = nextOffset - 1
}

return
}

func (c *NotifierKafka) updateMetrics(topic string, partition int32, lastReadOffset int64) {
lastAvailableOffset, err := c.getLastAvailableOffset(topic, partition)
if err == nil {
partitionLogSize[partition].Set(int(lastAvailableOffset + 1))
partitionLag[partition].Set(int(lastAvailableOffset - lastReadOffset))
}
partitionOffset[partition].Set(int(lastReadOffset))
}

func (c *NotifierKafka) consumePartition(topic string, partition int32, startOffset int64, processBacklog *sync.WaitGroup) {
c.wg.Add(1)
defer c.wg.Done()

pc, err := c.consumer.ConsumePartition(topic, partition, currentOffset)
pc, err := c.consumer.ConsumePartition(topic, partition, startOffset)
if err != nil {
log.Fatalf("kafka-cluster: failed to start partitionConsumer for %s:%d. %s", topic, partition, err)
}
log.Infof("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, currentOffset)
log.Infof("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, startOffset)

messages := pc.Messages()
ticker := time.NewTicker(5 * time.Second)
startingUp := true
// the bootTimeOffset is the next available offset. There may not be a message with that
// offset yet, so we subtract 1 to get the highest offset that we can fetch.
bootTimeOffset := bootTimeOffsets[partition] - 1
partitionOffsetMetric := partitionOffset[partition]
partitionLogSizeMetric := partitionLogSize[partition]
partitionLagMetric := partitionLag[partition]

lastReadOffset := startOffset - 1
lastAvailableOffsetAtStartup, err := c.getLastAvailableOffset(topic, partition)
if err != nil {
log.Fatalf("kafka-cluster: failed to get newest offset for topic %s part %d: %s", topic, partition, err)
}
backlogProcessed := c.updateProcessBacklog(lastReadOffset, lastAvailableOffsetAtStartup, processBacklog)
c.updateMetrics(topic, partition, lastReadOffset)

for {
select {
case msg := <-messages:
log.Debugf("kafka-cluster: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
c.handler.Handle(msg.Value)
currentOffset = msg.Offset
lastReadOffset = msg.Offset
case <-ticker.C:
if startingUp && currentOffset >= bootTimeOffset {
processBacklog.Done()
startingUp = false
}
offset, err := c.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Errorf("kafka-mdm failed to get log-size of partition %s:%d. %s", topic, partition, err)
} else {
partitionLogSizeMetric.Set(int(offset))
}
if currentOffset < 0 {
// we have not yet consumed any messages.
continue
}
partitionOffsetMetric.Set(int(currentOffset))
if err == nil {
partitionLagMetric.Set(int(offset - currentOffset))
if !backlogProcessed {
backlogProcessed = c.updateProcessBacklog(lastReadOffset, lastAvailableOffsetAtStartup, processBacklog)
}
c.updateMetrics(topic, partition, lastReadOffset)
case <-c.stopConsuming:
pc.Close()
log.Infof("kafka-cluster: consumer for %s:%d ended.", topic, partition)
Expand Down

0 comments on commit 146b5e6

Please sign in to comment.