From 02a8a92badee44899fd7c0eaecacd8afb5ca8b0c Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 18 Jul 2016 12:45:43 +0200 Subject: [PATCH] add kafka cluster implementation next to nsq note that we needed to move the config parameters out of mdata package, otherwise the names were conflicting --- mdata/clkafka.go | 171 +++++++++++++++++++++++++++++++++++++++++++ mdata/clkafka/cfg.go | 57 +++++++++++++++ mdata/clnsq.go | 49 +++++-------- mdata/clnsq/cfg.go | 4 +- metrictank.go | 10 ++- 5 files changed, 257 insertions(+), 34 deletions(-) create mode 100644 mdata/clkafka.go create mode 100644 mdata/clkafka/cfg.go diff --git a/mdata/clkafka.go b/mdata/clkafka.go new file mode 100644 index 0000000000..49d3d76bc1 --- /dev/null +++ b/mdata/clkafka.go @@ -0,0 +1,171 @@ +package mdata + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" + "github.com/raintank/met" + cfg "github.com/raintank/metrictank/mdata/clkafka" + "github.com/raintank/worldping-api/pkg/log" +) + +type ClKafka struct { + in chan SavedChunk + buf []SavedChunk + wg sync.WaitGroup + instance string + consumer *cluster.Consumer + producer sarama.SyncProducer + StopChan chan int + Cl +} + +func NewKafka(instance string, metrics Metrics, stats met.Backend) *ClKafka { + consumer, err := cluster.NewConsumer(cfg.Brokers, cfg.Group, cfg.Topics, cfg.CConfig) + if err != nil { + log.Fatal(2, "kafka-cluster failed to start consumer: %s", err) + } + log.Info("kafka-cluster consumer started without error") + + producer, err := sarama.NewSyncProducer(cfg.Brokers, cfg.PConfig) + if err != nil { + log.Fatal(2, "kafka-cluster failed to start producer: %s", err) + } + + c := ClKafka{ + in: make(chan SavedChunk), + consumer: consumer, + producer: producer, + instance: instance, + Cl: Cl{ + instance: instance, + metrics: metrics, + }, + StopChan: make(chan int), + } + go c.notifications() + go c.consume() + go c.produce() + + return &c +} + +func (c *ClKafka) consume() { + c.wg.Add(1) + messageChan := c.consumer.Messages() + for msg := range messageChan { + if LogLevel < 2 { + log.Debug("CLU kafka-cluster received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) + } + c.Handle(msg.Value) + c.consumer.MarkOffset(msg, "") + } + log.Info("CLU kafka-cluster consumer ended.") + c.wg.Done() +} + +func (c *ClKafka) notifications() { + c.wg.Add(1) + for msg := range c.consumer.Notifications() { + if len(msg.Claimed) > 0 { + for topic, partitions := range msg.Claimed { + log.Info("CLU kafka-cluster consumer claimed %d partitions on topic: %s", len(partitions), topic) + } + } + if len(msg.Released) > 0 { + for topic, partitions := range msg.Released { + log.Info("CLU kafka-cluster consumer released %d partitions on topic: %s", len(partitions), topic) + } + } + + if len(msg.Current) == 0 { + log.Info("CLU kafka-cluster consumer is no longer consuming from any partitions.") + } else { + log.Info("CLU kafka-cluster Current partitions:") + for topic, partitions := range msg.Current { + log.Info("CLU kafka-cluster Current partitions: %s: %v", topic, partitions) + } + } + } + log.Info("CLU kafka-cluster notification processing stopped") + c.wg.Done() +} + +// Stop will initiate a graceful stop of the Consumer (permanent) +// +// NOTE: receive on StopChan to block until this process completes +func (c *ClKafka) Stop() { + // closes notifications and messages channels, amongst others + c.consumer.Close() + c.producer.Close() + + go func() { + c.wg.Wait() + close(c.StopChan) + }() +} + +func (c *ClKafka) Send(sc SavedChunk) { + c.in <- sc +} + +func (c *ClKafka) produce() { + ticker := time.NewTicker(time.Second) + max := 5000 + for { + select { + case chunk := <-c.in: + c.buf = append(c.buf, chunk) + if len(c.buf) == max { + c.flush() + } + case <-ticker.C: + c.flush() + } + } +} + +// flush makes sure the batch gets sent, asynchronously. +func (c *ClKafka) flush() { + if len(c.buf) == 0 { + return + } + + msg := PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf} + c.buf = nil + + go func() { + log.Debug("CLU kafka-cluster sending %d batch metricPersist messages", len(msg.SavedChunks)) + + data, err := json.Marshal(&msg) + if err != nil { + log.Fatal(4, "CLU kafka-cluster failed to marshal persistMessage to json.") + } + buf := new(bytes.Buffer) + binary.Write(buf, binary.LittleEndian, uint8(PersistMessageBatchV1)) + buf.Write(data) + messagesSize.Value(int64(buf.Len())) + payload := &sarama.ProducerMessage{ + Topic: cfg.Topic, + Value: sarama.ByteEncoder(buf.Bytes()), + } + + sent := false + for !sent { + // note: currently we don't do partitioning yet for cluster msgs, so no key needed + _, _, err := c.producer.SendMessage(payload) + if err != nil { + log.Warn("CLU kafka-cluster publisher %s", err) + } else { + sent = true + } + time.Sleep(time.Second) + } + messagesPublished.Inc(1) + }() +} diff --git a/mdata/clkafka/cfg.go b/mdata/clkafka/cfg.go new file mode 100644 index 0000000000..5058e9a776 --- /dev/null +++ b/mdata/clkafka/cfg.go @@ -0,0 +1,57 @@ +package clkafka + +import ( + "flag" + "log" + + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" + "github.com/rakyll/globalconf" +) + +var Enabled bool +var broker string +var topic string +var Brokers []string +var Topic string +var Topics []string +var Group string +var CConfig *cluster.Config +var PConfig *sarama.Config + +func ConfigSetup() { + inKafkaMdam := flag.NewFlagSet("kafka-cluster", flag.ExitOnError) + inKafkaMdam.BoolVar(&Enabled, "enabled", false, "") + inKafkaMdam.StringVar(&broker, "broker", "kafka:9092", "tcp address for kafka") + inKafkaMdam.StringVar(&Topic, "topic", "metricpersist", "kafka topic") + inKafkaMdam.StringVar(&Group, "group", "group1", "kafka consumer group") + globalconf.Register("kafka-cluster", inKafkaMdam) +} + +func ConfigProcess(instance string) { + if !Enabled { + return + } + Brokers = []string{broker} + Topics = []string{Topic} + + CConfig = cluster.NewConfig() + // see https://github.com/raintank/metrictank/issues/236 + CConfig.Consumer.Offsets.Initial = sarama.OffsetNewest + CConfig.ClientID = instance + "-cluster" + CConfig.Group.Return.Notifications = true + CConfig.Config.Version = sarama.V0_10_0_0 + err := CConfig.Validate() + if err != nil { + log.Fatal(2, "kafka-cluster invalid consumer config: %s", err) + } + + PConfig = sarama.NewConfig() + PConfig.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + PConfig.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + PConfig.Producer.Compression = sarama.CompressionNone + err = PConfig.Validate() + if err != nil { + log.Fatal(2, "kafka-cluster invalid producer config: %s", err) + } +} diff --git a/mdata/clnsq.go b/mdata/clnsq.go index e962cfae15..5ee4d3719c 100644 --- a/mdata/clnsq.go +++ b/mdata/clnsq.go @@ -9,20 +9,14 @@ import ( "github.com/bitly/go-hostpool" "github.com/nsqio/go-nsq" "github.com/raintank/met" - clNSQ "github.com/raintank/metrictank/mdata/clnsq" + cfg "github.com/raintank/metrictank/mdata/clnsq" "github.com/raintank/misc/instrumented_nsq" "github.com/raintank/worldping-api/pkg/log" ) var ( - hostPool hostpool.HostPool - producers map[string]*nsq.Producer - nsqdAdds []string - lookupdAdds []string - topic string - channel string - pCfg *nsq.Config - cCfg *nsq.Config + hostPool hostpool.HostPool + producers map[string]*nsq.Producer ) type ClNSQ struct { @@ -33,29 +27,22 @@ type ClNSQ struct { } func NewNSQ(instance string, metrics Metrics, stats met.Backend) *ClNSQ { - nsqdAdds = clNSQ.NsqdAdds - lookupdAdds = clNSQ.LookupdAdds - topic = clNSQ.Topic - channel = clNSQ.Channel - pCfg = clNSQ.PCfg - cCfg = clNSQ.CCfg - // producers - hostPool = hostpool.NewEpsilonGreedy(nsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{}) + hostPool = hostpool.NewEpsilonGreedy(cfg.NsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{}) producers = make(map[string]*nsq.Producer) - for _, addr := range nsqdAdds { - producer, err := nsq.NewProducer(addr, pCfg) + for _, addr := range cfg.NsqdAdds { + producer, err := nsq.NewProducer(addr, cfg.PCfg) if err != nil { - log.Fatal(4, "failed creating producer %s", err.Error()) + log.Fatal(4, "nsq-cluster failed creating producer %s", err.Error()) } producers[addr] = producer } // consumers - consumer, err := insq.NewConsumer(topic, channel, cCfg, "metric_persist.%s", stats) + consumer, err := insq.NewConsumer(cfg.Topic, cfg.Channel, cfg.CCfg, "metric_persist.%s", stats) if err != nil { - log.Fatal(4, "Failed to create NSQ consumer. %s", err) + log.Fatal(4, "nsq-cluster failed to create NSQ consumer. %s", err) } c := &ClNSQ{ in: make(chan SavedChunk), @@ -67,15 +54,15 @@ func NewNSQ(instance string, metrics Metrics, stats met.Backend) *ClNSQ { } consumer.AddConcurrentHandlers(c, 2) - err = consumer.ConnectToNSQDs(nsqdAdds) + err = consumer.ConnectToNSQDs(cfg.NsqdAdds) if err != nil { - log.Fatal(4, "failed to connect to NSQDs. %s", err) + log.Fatal(4, "nsq-cluster failed to connect to NSQDs. %s", err) } - log.Info("persist consumer connected to nsqd") + log.Info("nsq-cluster persist consumer connected to nsqd") - err = consumer.ConnectToNSQLookupds(lookupdAdds) + err = consumer.ConnectToNSQLookupds(cfg.LookupdAdds) if err != nil { - log.Fatal(4, "failed to connect to NSQLookupds. %s", err) + log.Fatal(4, "nsq-cluster failed to connect to NSQLookupds. %s", err) } go c.run() return c @@ -116,11 +103,11 @@ func (c *ClNSQ) flush() { c.buf = nil go func() { - log.Debug("CLU sending %d batch metricPersist messages", len(msg.SavedChunks)) + log.Debug("CLU nsq-cluster sending %d batch metricPersist messages", len(msg.SavedChunks)) data, err := json.Marshal(&msg) if err != nil { - log.Fatal(4, "failed to marshal persistMessage to json.") + log.Fatal(4, "CLU nsq-cluster failed to marshal persistMessage to json.") } buf := new(bytes.Buffer) binary.Write(buf, binary.LittleEndian, uint8(PersistMessageBatchV1)) @@ -134,12 +121,12 @@ func (c *ClNSQ) flush() { // will result in this loop repeating forever until we successfully publish our msg. hostPoolResponse := hostPool.Get() prod := producers[hostPoolResponse.Host()] - err = prod.Publish(topic, buf.Bytes()) + err = prod.Publish(cfg.Topic, buf.Bytes()) // Hosts that are marked as dead will be retried after 30seconds. If we published // successfully, then sending a nil error will mark the host as alive again. hostPoolResponse.Mark(err) if err != nil { - log.Warn("publisher marking host %s as faulty due to %s", hostPoolResponse.Host(), err) + log.Warn("CLU nsq-cluster publisher marking host %s as faulty due to %s", hostPoolResponse.Host(), err) } else { sent = true } diff --git a/mdata/clnsq/cfg.go b/mdata/clnsq/cfg.go index 52b5f258db..3fc40bc68b 100644 --- a/mdata/clnsq/cfg.go +++ b/mdata/clnsq/cfg.go @@ -67,7 +67,7 @@ func ConfigProcess() { PCfg.UserAgent = "metrictank-cluster" err := app.ParseOpts(PCfg, ProducerOpts) if err != nil { - log.Fatal(4, "failed to parse nsq producer options. %s", err) + log.Fatal(4, "nsq-cluster: failed to parse nsq producer options. %s", err) } // consumer @@ -75,7 +75,7 @@ func ConfigProcess() { CCfg.UserAgent = "metrictank-cluster" err = app.ParseOpts(CCfg, ConsumerOpts) if err != nil { - log.Fatal(4, "failed to parse nsq consumer options. %s", err) + log.Fatal(4, "nsq-cluster: failed to parse nsq consumer options. %s", err) } CCfg.MaxInFlight = MaxInFlight } diff --git a/metrictank.go b/metrictank.go index 13a5831ced..363cb30407 100644 --- a/metrictank.go +++ b/metrictank.go @@ -29,6 +29,7 @@ import ( inNSQ "github.com/raintank/metrictank/in/nsq" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/mdata/chunk" + clKafka "github.com/raintank/metrictank/mdata/clkafka" clNSQ "github.com/raintank/metrictank/mdata/clnsq" "github.com/raintank/metrictank/metricdef" "github.com/raintank/metrictank/usage" @@ -41,7 +42,8 @@ var ( inKafkaMdmInst *inKafkaMdm.KafkaMdm inKafkaMdamInst *inKafkaMdam.KafkaMdam inNSQInst *inNSQ.NSQ - clNSQInst *mdata.ClNSQ + clKafkaInst *mdata.ClKafka + clNSQInst *mdata.ClNSQ logLevel int warmupPeriod time.Duration @@ -148,6 +150,7 @@ func main() { inKafkaMdam.ConfigSetup() inNSQ.ConfigSetup() clNSQ.ConfigSetup() + clKafka.ConfigSetup() conf.ParseAll() } @@ -188,6 +191,7 @@ func main() { inKafkaMdam.ConfigProcess(*instance) inNSQ.ConfigProcess() clNSQ.ConfigProcess() + clKafka.ConfigProcess(*instance) if !inCarbon.Enabled && !inKafkaMdm.Enabled && !inKafkaMdam.Enabled && !inNSQ.Enabled { log.Fatal(4, "you should enable at least 1 input plugin") @@ -312,6 +316,10 @@ func main() { clNSQInst = mdata.NewNSQ(*instance, metrics, stats) handlers = append(handlers, clNSQInst) } + if clKafka.Enabled { + clKafkaInst = mdata.NewKafka(*instance, metrics, stats) + handlers = append(handlers, clKafkaInst) + } mdata.InitCluster(stats, handlers...)