From e03d2fdc696001ebdab2d82a279075c416d9742a Mon Sep 17 00:00:00 2001 From: woodsaj Date: Tue, 24 Jan 2017 12:59:06 +0800 Subject: [PATCH] process old metricPersist messages before consuming metrics - During startup we wait for the kafkaNotifier to consume its backlog of metricPersist messages before we start consuming metrics. - When using the kafkaNotifier we now also create Aggmetrics for any missing key provided in a metricPersist message. This ensures that when we do start consuming metrcs, the chunkSaveStart/chunkSaveFinish timestamps will be correctly set for the metric and we wont try and save chunks that have already been saved --- docker/docker-cluster/metrictank.ini | 2 + .../metrictank.ini | 2 + docs/config.md | 2 + docs/metrics.md | 2 +- mdata/notifier.go | 10 +++-- mdata/notifierKafka/cfg.go | 23 ++++++++++- mdata/notifierKafka/notifierKafka.go | 38 +++++++++++++++++-- metrictank-sample.ini | 2 + metrictank.go | 2 + scripts/config/metrictank-docker.ini | 2 + scripts/config/metrictank-package.ini | 2 + 11 files changed, 78 insertions(+), 9 deletions(-) diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index 2eef08b522..58b299a79f 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -194,6 +194,8 @@ partition-scheme = bySeries offset = last # save interval for offsets offset-commit-interval = 5s +# Maximum time backlog processing can block during metrictank startup. +backlog-process-timeout = 60s # directory to store partition offsets index. supports relative or absolute paths. empty means working dir. # it will be created (incl parent dirs) if not existing. data-dir = /var/lib/metrictank diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index e2f4aa13d5..fff5eda093 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -208,6 +208,8 @@ partition-scheme = bySeries offset = last # save interval for offsets offset-commit-interval = 5s +# Maximum time backlog processing can block during metrictank startup. +backlog-process-timeout = 60s # directory to store partition offsets index. supports relative or absolute paths. empty means working dir. # it will be created (incl parent dirs) if not existing. data-dir = /var/lib/metrictank diff --git a/docs/config.md b/docs/config.md index 035b2a9333..02827c7e7d 100644 --- a/docs/config.md +++ b/docs/config.md @@ -245,6 +245,8 @@ partition-scheme = bySeries offset = last # save interval for offsets offset-commit-interval = 5s +# Maximum time backlog processing can block during metrictank startup. +backlog-process-timeout = 60s # directory to store partition offsets index. supports relative or absolute paths. empty means working dir. # it will be created (incl parent dirs) if not existing. data-dir = diff --git a/docs/metrics.md b/docs/metrics.md index 155819aea5..85796cd841 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -199,4 +199,4 @@ The current offset for the partition (%d) that we have consumed. * `input.kafka-mdm.partition.%d.log_size`: The size of the kafka partition, aka the newest available offset. * `input.kafka-mdm.partition.%d.lag`: -How many messages (metrics) Kafaka has that we have not yet consumed. \ No newline at end of file +How many messages (metrics) Kafaka has that we have not yet consumed. diff --git a/mdata/notifier.go b/mdata/notifier.go index cb7552dd1e..53cb918910 100644 --- a/mdata/notifier.go +++ b/mdata/notifier.go @@ -46,8 +46,9 @@ func InitPersistNotifier(handlers ...NotifierHandler) { } type Notifier struct { - Instance string - Metrics Metrics + Instance string + Metrics Metrics + CreateMissingMetrics bool } func (cl Notifier) Handle(data []byte) { @@ -83,7 +84,10 @@ func (cl Notifier) Handle(data []byte) { } // get metric - if agg, ok := cl.Metrics.Get(ms.Key); ok { + if cl.CreateMissingMetrics { + agg := cl.Metrics.GetOrCreate(ms.Key) + agg.(*AggMetric).SyncChunkSaveState(ms.T0) + } else if agg, ok := cl.Metrics.Get(ms.Key); ok { agg.(*AggMetric).SyncChunkSaveState(ms.T0) } } diff --git a/mdata/notifierKafka/cfg.go b/mdata/notifierKafka/cfg.go index b406463a37..ec24815801 100644 --- a/mdata/notifierKafka/cfg.go +++ b/mdata/notifierKafka/cfg.go @@ -27,6 +27,9 @@ var partitionStr string var partitions []int32 var partitioner *cluster.KafkaPartitioner var partitionScheme string +var bootTimeOffsets map[int32]int64 +var backlogProcessTimeout time.Duration +var backlogProcessTimeoutStr string // metric cluster.notifier.kafka.messages-published is a counter of messages published to the kafka cluster notifier var messagesPublished = stats.NewCounter32("cluster.notifier.kafka.messages-published") @@ -40,10 +43,11 @@ func init() { fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)") fs.StringVar(&topic, "topic", "metricpersist", "kafka topic") fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in") - fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)") + fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)") fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") + fs.StringVar(&backlogProcessTimeoutStr, "backlog-process-timeout", "60s", "Maximum time backlog processing can block during metrictank startup.") globalconf.Register("kafka-cluster", fs) } @@ -76,6 +80,11 @@ func ConfigProcess(instance string) { log.Fatal(2, "kafka-cluster invalid consumer config: %s", err) } + backlogProcessTimeout, err = time.ParseDuration(backlogProcessTimeoutStr) + if err != nil { + log.Fatal(4, "kafka-cluster: unable to parse backlog-process-timeout. %s", err) + } + partitioner, err = cluster.NewKafkaPartitioner(partitionScheme) if err != nil { log.Fatal(4, "kafka-cluster: failed to initialize partitioner. %s", err) @@ -110,4 +119,16 @@ func ConfigProcess(instance string) { log.Fatal(4, "kafka-cluster: configured partitions not in list of available partitions. missing %v", missing) } } + + // get the "newest" offset for all partitions. + // when booting up, we will delay consuming metrics until we have + // caught up to these offsets. + bootTimeOffsets = make(map[int32]int64) + for _, part := range partitions { + offset, err := client.GetOffset(topic, part, sarama.OffsetNewest) + if err != nil { + log.Fatal(4, "kakfa-cluster: failed to get newest offset for %s:%d. %s", topic, part) + } + bootTimeOffsets[part] = offset + } } diff --git a/mdata/notifierKafka/notifierKafka.go b/mdata/notifierKafka/notifierKafka.go index 86e53d0bdb..3c960fc748 100644 --- a/mdata/notifierKafka/notifierKafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -62,8 +62,9 @@ func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierK producer: producer, instance: instance, Notifier: mdata.Notifier{ - Instance: instance, - Metrics: metrics, + Instance: instance, + Metrics: metrics, + CreateMissingMetrics: true, }, StopChan: make(chan int), stopConsuming: make(chan struct{}), @@ -78,6 +79,8 @@ func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierK func (c *NotifierKafka) start() { var err error + pre := time.Now() + processBacklog := new(sync.WaitGroup) for _, partition := range partitions { var offset int64 switch offsetStr { @@ -93,11 +96,30 @@ func (c *NotifierKafka) start() { if err != nil { log.Fatal(4, "kafka-cluster: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err) } - go c.consumePartition(topic, partition, offset) + processBacklog.Add(1) + go c.consumePartition(topic, partition, offset, processBacklog) } + // wait for our backlog to be processed before returning. This will block metrictank from consuming metrics until + // we have processed old metricPersist messages. The end result is that we wont overwrite chunks in cassandra that + // have already been previously written. + // We dont wait more then backlogProcessTimeout for the backlog to be processed. + log.Info("kafka-cluster: waiting for metricPersist backlog to be processed.") + backlogProcessed := make(chan struct{}, 1) + go func() { + processBacklog.Wait() + backlogProcessed <- struct{}{} + }() + + select { + case <-time.After(backlogProcessTimeout): + log.Warn("kafka-cluster: Processing metricPersist backlog has taken too long, giving up lock after %s.", backlogProcessTimeout) + case <-backlogProcessed: + log.Info("kafka-cluster: metricPersist backlog processed in %s.", time.Since(pre)) + } + } -func (c *NotifierKafka) consumePartition(topic string, partition int32, partitionOffset int64) { +func (c *NotifierKafka) consumePartition(topic string, partition int32, partitionOffset int64, processBacklog *sync.WaitGroup) { c.wg.Add(1) defer c.wg.Done() @@ -109,6 +131,10 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio currentOffset := partitionOffset messages := pc.Messages() ticker := time.NewTicker(offsetCommitInterval) + startingUp := true + // the bootTimeOffset is the next available offset. There may not be a message with that + // offset yet, so we subtract 1 to get the highest offset that we can fetch. + bootTimeOffset := bootTimeOffsets[partition] - 1 for { select { case msg := <-messages: @@ -121,6 +147,10 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil { log.Error(3, "kafka-cluster failed to commit offset for %s:%d, %s", topic, partition, err) } + if startingUp && currentOffset >= bootTimeOffset { + processBacklog.Done() + startingUp = false + } case <-c.stopConsuming: pc.Close() if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil { diff --git a/metrictank-sample.ini b/metrictank-sample.ini index b45114cbfa..d25d7d8f3f 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -212,6 +212,8 @@ partition-scheme = bySeries offset = last # save interval for offsets offset-commit-interval = 5s +# Maximum time backlog processing can block during metrictank startup. +backlog-process-timeout = 60s # directory to store partition offsets index. supports relative or absolute paths. empty means working dir. # it will be created (incl parent dirs) if not existing. data-dir = diff --git a/metrictank.go b/metrictank.go index b99849c564..132f9d2c95 100644 --- a/metrictank.go +++ b/metrictank.go @@ -395,6 +395,8 @@ func main() { ***********************************/ handlers := make([]mdata.NotifierHandler, 0) if notifierKafka.Enabled { + // The notifierKafka handler will block here until it has processed the backlog of metricPersist messages. + // it will block for at most kafka-cluster.backlog-process-timeout (default 60s) handlers = append(handlers, notifierKafka.New(*instance, metrics, metricIndex)) } diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index e9a793921c..00ac6a229e 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -207,6 +207,8 @@ partition-scheme = bySeries offset = last # save interval for offsets offset-commit-interval = 5s +# Maximum time backlog processing can block during metrictank startup. +backlog-process-timeout = 60s # directory to store partition offsets index. supports relative or absolute paths. empty means working dir. # it will be created (incl parent dirs) if not existing. data-dir = /var/lib/metrictank diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index c9f8643539..52ffb5bd1b 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -195,6 +195,8 @@ partition-scheme = bySeries offset = last # save interval for offsets offset-commit-interval = 5s +# Maximum time backlog processing can block during metrictank startup. +backlog-process-timeout = 60s # directory to store partition offsets index. supports relative or absolute paths. empty means working dir. # it will be created (incl parent dirs) if not existing. data-dir = /var/lib/metrictank