Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
process old metricPersist messages before consuming metrics
Browse files Browse the repository at this point in the history
- During startup we wait for the kafkaNotifier to consume its backlog
of metricPersist messages before we start consuming metrics.
- When using the kafkaNotifier we now also create Aggmetrics for any
missing key provided in a metricPersist message. This ensures that when
we do start consuming metrcs, the chunkSaveStart/chunkSaveFinish timestamps
will be correctly set for the metric and we wont try and save chunks that
have already been saved
  • Loading branch information
woodsaj committed Jan 25, 2017
1 parent 13e27a5 commit e03d2fd
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ partition-scheme = bySeries
offset = last
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ partition-scheme = bySeries
offset = last
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank
Expand Down
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ partition-scheme = bySeries
offset = last
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir =
Expand Down
2 changes: 1 addition & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,4 @@ The current offset for the partition (%d) that we have consumed.
* `input.kafka-mdm.partition.%d.log_size`:
The size of the kafka partition, aka the newest available offset.
* `input.kafka-mdm.partition.%d.lag`:
How many messages (metrics) Kafaka has that we have not yet consumed.
How many messages (metrics) Kafaka has that we have not yet consumed.
10 changes: 7 additions & 3 deletions mdata/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func InitPersistNotifier(handlers ...NotifierHandler) {
}

type Notifier struct {
Instance string
Metrics Metrics
Instance string
Metrics Metrics
CreateMissingMetrics bool
}

func (cl Notifier) Handle(data []byte) {
Expand Down Expand Up @@ -83,7 +84,10 @@ func (cl Notifier) Handle(data []byte) {
}

// get metric
if agg, ok := cl.Metrics.Get(ms.Key); ok {
if cl.CreateMissingMetrics {
agg := cl.Metrics.GetOrCreate(ms.Key)
agg.(*AggMetric).SyncChunkSaveState(ms.T0)
} else if agg, ok := cl.Metrics.Get(ms.Key); ok {
agg.(*AggMetric).SyncChunkSaveState(ms.T0)
}
}
Expand Down
23 changes: 22 additions & 1 deletion mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var partitionStr string
var partitions []int32
var partitioner *cluster.KafkaPartitioner
var partitionScheme string
var bootTimeOffsets map[int32]int64
var backlogProcessTimeout time.Duration
var backlogProcessTimeoutStr string

// metric cluster.notifier.kafka.messages-published is a counter of messages published to the kafka cluster notifier
var messagesPublished = stats.NewCounter32("cluster.notifier.kafka.messages-published")
Expand All @@ -40,10 +43,11 @@ func init() {
fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index")
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
fs.StringVar(&backlogProcessTimeoutStr, "backlog-process-timeout", "60s", "Maximum time backlog processing can block during metrictank startup.")
globalconf.Register("kafka-cluster", fs)
}

Expand Down Expand Up @@ -76,6 +80,11 @@ func ConfigProcess(instance string) {
log.Fatal(2, "kafka-cluster invalid consumer config: %s", err)
}

backlogProcessTimeout, err = time.ParseDuration(backlogProcessTimeoutStr)
if err != nil {
log.Fatal(4, "kafka-cluster: unable to parse backlog-process-timeout. %s", err)
}

partitioner, err = cluster.NewKafkaPartitioner(partitionScheme)
if err != nil {
log.Fatal(4, "kafka-cluster: failed to initialize partitioner. %s", err)
Expand Down Expand Up @@ -110,4 +119,16 @@ func ConfigProcess(instance string) {
log.Fatal(4, "kafka-cluster: configured partitions not in list of available partitions. missing %v", missing)
}
}

// get the "newest" offset for all partitions.
// when booting up, we will delay consuming metrics until we have
// caught up to these offsets.
bootTimeOffsets = make(map[int32]int64)
for _, part := range partitions {
offset, err := client.GetOffset(topic, part, sarama.OffsetNewest)
if err != nil {
log.Fatal(4, "kakfa-cluster: failed to get newest offset for %s:%d. %s", topic, part)
}
bootTimeOffsets[part] = offset
}
}
38 changes: 34 additions & 4 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierK
producer: producer,
instance: instance,
Notifier: mdata.Notifier{
Instance: instance,
Metrics: metrics,
Instance: instance,
Metrics: metrics,
CreateMissingMetrics: true,
},
StopChan: make(chan int),
stopConsuming: make(chan struct{}),
Expand All @@ -78,6 +79,8 @@ func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierK

func (c *NotifierKafka) start() {
var err error
pre := time.Now()
processBacklog := new(sync.WaitGroup)
for _, partition := range partitions {
var offset int64
switch offsetStr {
Expand All @@ -93,11 +96,30 @@ func (c *NotifierKafka) start() {
if err != nil {
log.Fatal(4, "kafka-cluster: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
}
go c.consumePartition(topic, partition, offset)
processBacklog.Add(1)
go c.consumePartition(topic, partition, offset, processBacklog)
}
// wait for our backlog to be processed before returning. This will block metrictank from consuming metrics until
// we have processed old metricPersist messages. The end result is that we wont overwrite chunks in cassandra that
// have already been previously written.
// We dont wait more then backlogProcessTimeout for the backlog to be processed.
log.Info("kafka-cluster: waiting for metricPersist backlog to be processed.")
backlogProcessed := make(chan struct{}, 1)
go func() {
processBacklog.Wait()
backlogProcessed <- struct{}{}
}()

select {
case <-time.After(backlogProcessTimeout):
log.Warn("kafka-cluster: Processing metricPersist backlog has taken too long, giving up lock after %s.", backlogProcessTimeout)
case <-backlogProcessed:
log.Info("kafka-cluster: metricPersist backlog processed in %s.", time.Since(pre))
}

}

func (c *NotifierKafka) consumePartition(topic string, partition int32, partitionOffset int64) {
func (c *NotifierKafka) consumePartition(topic string, partition int32, partitionOffset int64, processBacklog *sync.WaitGroup) {
c.wg.Add(1)
defer c.wg.Done()

Expand All @@ -109,6 +131,10 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio
currentOffset := partitionOffset
messages := pc.Messages()
ticker := time.NewTicker(offsetCommitInterval)
startingUp := true
// the bootTimeOffset is the next available offset. There may not be a message with that
// offset yet, so we subtract 1 to get the highest offset that we can fetch.
bootTimeOffset := bootTimeOffsets[partition] - 1
for {
select {
case msg := <-messages:
Expand All @@ -121,6 +147,10 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio
if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Error(3, "kafka-cluster failed to commit offset for %s:%d, %s", topic, partition, err)
}
if startingUp && currentOffset >= bootTimeOffset {
processBacklog.Done()
startingUp = false
}
case <-c.stopConsuming:
pc.Close()
if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ partition-scheme = bySeries
offset = last
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir =
Expand Down
2 changes: 2 additions & 0 deletions metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ func main() {
***********************************/
handlers := make([]mdata.NotifierHandler, 0)
if notifierKafka.Enabled {
// The notifierKafka handler will block here until it has processed the backlog of metricPersist messages.
// it will block for at most kafka-cluster.backlog-process-timeout (default 60s)
handlers = append(handlers, notifierKafka.New(*instance, metrics, metricIndex))
}

Expand Down
2 changes: 2 additions & 0 deletions scripts/config/metrictank-docker.ini
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ partition-scheme = bySeries
offset = last
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank
Expand Down
2 changes: 2 additions & 0 deletions scripts/config/metrictank-package.ini
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ partition-scheme = bySeries
offset = last
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank
Expand Down

0 comments on commit e03d2fd

Please sign in to comment.