diff --git a/mdata/notifierKafka/cfg.go b/mdata/notifierKafka/cfg.go index 4c656f5f8f..b000572a64 100644 --- a/mdata/notifierKafka/cfg.go +++ b/mdata/notifierKafka/cfg.go @@ -13,13 +13,13 @@ import ( var Enabled bool var brokerStr string -var Brokers []string -var Topic string -var OffsetStr string -var DataDir string -var Config *sarama.Config -var OffsetDuration time.Duration -var OffsetCommitInterval time.Duration +var brokers []string +var topic string +var offsetStr string +var dataDir string +var config *sarama.Config +var offsetDuration time.Duration +var offsetCommitInterval time.Duration var messagesPublished met.Count var messagesSize met.Meter @@ -28,10 +28,10 @@ func ConfigSetup() { fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError) fs.BoolVar(&Enabled, "enabled", false, "") fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)") - fs.StringVar(&Topic, "topic", "metricpersist", "kafka topic") - fs.StringVar(&OffsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") - fs.StringVar(&DataDir, "data-dir", "", "Directory to store partition offsets index") - fs.DurationVar(&OffsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") + fs.StringVar(&topic, "topic", "metricpersist", "kafka topic") + fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") + fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") + fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") globalconf.Register("kafka-cluster", fs) } @@ -40,25 +40,25 @@ func ConfigProcess(instance string) { return } var err error - switch OffsetStr { + switch offsetStr { case "last": case "oldest": case "newest": default: - OffsetDuration, err = time.ParseDuration(OffsetStr) + offsetDuration, err = time.ParseDuration(offsetStr) if err != nil { log.Fatal(4, "kafka-cluster: invalid offest format. %s", err) } } - Brokers = strings.Split(brokerStr, ",") + brokers = strings.Split(brokerStr, ",") - Config = sarama.NewConfig() - Config.ClientID = instance + "-cluster" - Config.Version = sarama.V0_10_0_0 - Config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message - Config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message - Config.Producer.Compression = sarama.CompressionNone - err = Config.Validate() + config = sarama.NewConfig() + config.ClientID = instance + "-cluster" + config.Version = sarama.V0_10_0_0 + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + config.Producer.Compression = sarama.CompressionNone + err = config.Validate() if err != nil { log.Fatal(2, "kafka-cluster invalid consumer config: %s", err) } diff --git a/mdata/notifierKafka/notifierKafka.go b/mdata/notifierKafka/notifierKafka.go index 8562aa8600..9171c59f88 100644 --- a/mdata/notifierKafka/notifierKafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -33,7 +33,7 @@ func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend) messagesPublished = stats.NewCount("notifier.kafka.messages-published") messagesSize = stats.NewMeter("notifier.kafka.message_size", 0) - client, err := sarama.NewClient(Brokers, Config) + client, err := sarama.NewClient(brokers, config) if err != nil { log.Fatal(2, "kafka-notifier failed to start client: %s", err) } @@ -48,7 +48,7 @@ func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend) log.Fatal(2, "kafka-notifier failed to initialize producer: %s", err) } - offsetMgr, err := kafka.NewOffsetMgr(DataDir) + offsetMgr, err := kafka.NewOffsetMgr(dataDir) if err != nil { log.Fatal(2, "kafka-notifier couldnt create offsetMgr. %s", err) } @@ -75,14 +75,13 @@ func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend) func (c *NotifierKafka) start() { // get partitions. - topic := Topic partitions, err := c.consumer.Partitions(topic) if err != nil { log.Fatal(4, "kafka-notifier: Faild to get partitions for topic %s. %s", topic, err) } for _, partition := range partitions { var offset int64 - switch OffsetStr { + switch offsetStr { case "oldest": offset = -2 case "newest": @@ -90,10 +89,10 @@ func (c *NotifierKafka) start() { case "last": offset, err = c.offsetMgr.Last(topic, partition) default: - offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*OffsetDuration).UnixNano()/int64(time.Millisecond)) + offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond)) } if err != nil { - log.Fatal(4, "kafka-notifier: failed to get %q offset for %s:%d. %q", OffsetStr, topic, partition, err) + log.Fatal(4, "kafka-notifier: failed to get %q offset for %s:%d. %q", offsetStr, topic, partition, err) } go c.consumePartition(topic, partition, offset) } @@ -110,7 +109,7 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio log.Info("kafka-notifier: consuming from %s:%d from offset %d", topic, partition, partitionOffset) currentOffset := partitionOffset messages := pc.Messages() - ticker := time.NewTicker(OffsetCommitInterval) + ticker := time.NewTicker(offsetCommitInterval) for { select { case msg := <-messages: @@ -190,7 +189,7 @@ func (c *NotifierKafka) flush() { buf.Write(data) messagesSize.Value(int64(buf.Len())) payload := &sarama.ProducerMessage{ - Topic: Topic, + Topic: topic, Value: sarama.ByteEncoder(buf.Bytes()), } diff --git a/mdata/notifierNsq/cfg.go b/mdata/notifierNsq/cfg.go index 3f778c78f3..f52ea27e62 100644 --- a/mdata/notifierNsq/cfg.go +++ b/mdata/notifierNsq/cfg.go @@ -19,17 +19,17 @@ import ( var ( Enabled bool - NsqdTCPAddrs string - LookupdHTTPAddrs string - NsqdAdds []string - LookupdAdds []string - Topic string - Channel string - MaxInFlight int - ProducerOpts string - ConsumerOpts string - PCfg *nsq.Config - CCfg *nsq.Config + nsqdTCPAddrs string + lookupdHTTPAddrs string + nsqdAdds []string + lookupdAdds []string + topic string + channel string + maxInFlight int + producerOpts string + consumerOpts string + pCfg *nsq.Config + cCfg *nsq.Config messagesPublished met.Count messagesSize met.Meter ) @@ -37,13 +37,13 @@ var ( func ConfigSetup() { fs := flag.NewFlagSet("nsq-cluster", flag.ExitOnError) fs.BoolVar(&Enabled, "enabled", false, "") - fs.StringVar(&NsqdTCPAddrs, "nsqd-tcp-address", "", "nsqd TCP address (may be given multiple times as comma-separated list)") - fs.StringVar(&LookupdHTTPAddrs, "lookupd-http-address", "", "lookupd HTTP address (may be given multiple times as comma-separated list)") - fs.StringVar(&Topic, "topic", "metricpersist", "NSQ topic for persist messages") - fs.StringVar(&Channel, "channel", "tank", "NSQ channel for persist messages") - fs.StringVar(&ProducerOpts, "producer-opt", "", "option to passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)") - fs.StringVar(&ConsumerOpts, "consumer-opt", "", "option to passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)") - fs.IntVar(&MaxInFlight, "max-in-flight", 200, "max number of messages to allow in flight for consumer") + fs.StringVar(&nsqdTCPAddrs, "nsqd-tcp-address", "", "nsqd TCP address (may be given multiple times as comma-separated list)") + fs.StringVar(&lookupdHTTPAddrs, "lookupd-http-address", "", "lookupd HTTP address (may be given multiple times as comma-separated list)") + fs.StringVar(&topic, "topic", "metricpersist", "NSQ topic for persist messages") + fs.StringVar(&channel, "channel", "tank", "NSQ channel for persist messages") + fs.StringVar(&producerOpts, "producer-opt", "", "option to passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)") + fs.StringVar(&consumerOpts, "consumer-opt", "", "option to passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)") + fs.IntVar(&maxInFlight, "max-in-flight", 200, "max number of messages to allow in flight for consumer") globalconf.Register("nsq-cluster", fs) } @@ -51,34 +51,34 @@ func ConfigProcess() { if !Enabled { return } - if Topic == "" { + if topic == "" { log.Fatal(4, "topic for nsq-cluster cannot be empty") } - NsqdAdds = strings.Split(NsqdTCPAddrs, ",") - if len(NsqdAdds) == 1 && NsqdAdds[0] == "" { - NsqdAdds = []string{} + nsqdAdds = strings.Split(nsqdTCPAddrs, ",") + if len(nsqdAdds) == 1 && nsqdAdds[0] == "" { + nsqdAdds = []string{} } - LookupdAdds = strings.Split(LookupdHTTPAddrs, ",") - if len(LookupdAdds) == 1 && LookupdAdds[0] == "" { - LookupdAdds = []string{} + lookupdAdds = strings.Split(lookupdHTTPAddrs, ",") + if len(lookupdAdds) == 1 && lookupdAdds[0] == "" { + lookupdAdds = []string{} } // producers - PCfg = nsq.NewConfig() - PCfg.UserAgent = "metrictank-cluster" - err := app.ParseOpts(PCfg, ProducerOpts) + pCfg = nsq.NewConfig() + pCfg.UserAgent = "metrictank-cluster" + err := app.ParseOpts(pCfg, producerOpts) if err != nil { log.Fatal(4, "nsq-cluster: failed to parse nsq producer options. %s", err) } // consumer - CCfg = nsq.NewConfig() - CCfg.UserAgent = "metrictank-cluster" - err = app.ParseOpts(CCfg, ConsumerOpts) + cCfg = nsq.NewConfig() + cCfg.UserAgent = "metrictank-cluster" + err = app.ParseOpts(cCfg, consumerOpts) if err != nil { log.Fatal(4, "nsq-cluster: failed to parse nsq consumer options. %s", err) } - CCfg.MaxInFlight = MaxInFlight + cCfg.MaxInFlight = maxInFlight } diff --git a/mdata/notifierNsq/notifierNsq.go b/mdata/notifierNsq/notifierNsq.go index 04257c8a29..d1a04ec843 100644 --- a/mdata/notifierNsq/notifierNsq.go +++ b/mdata/notifierNsq/notifierNsq.go @@ -30,11 +30,11 @@ func NewNSQ(instance string, metrics mdata.Metrics, stats met.Backend) *Notifier messagesPublished = stats.NewCount("notifier.nsq.messages-published") messagesSize = stats.NewMeter("notifier.nsq.message_size", 0) // producers - hostPool = hostpool.NewEpsilonGreedy(NsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{}) + hostPool = hostpool.NewEpsilonGreedy(nsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{}) producers = make(map[string]*nsq.Producer) - for _, addr := range NsqdAdds { - producer, err := nsq.NewProducer(addr, PCfg) + for _, addr := range nsqdAdds { + producer, err := nsq.NewProducer(addr, pCfg) if err != nil { log.Fatal(4, "nsq-cluster failed creating producer %s", err.Error()) } @@ -42,7 +42,7 @@ func NewNSQ(instance string, metrics mdata.Metrics, stats met.Backend) *Notifier } // consumers - consumer, err := insq.NewConsumer(Topic, Channel, CCfg, "metric_persist.%s", stats) + consumer, err := insq.NewConsumer(topic, channel, cCfg, "metric_persist.%s", stats) if err != nil { log.Fatal(4, "nsq-cluster failed to create NSQ consumer. %s", err) } @@ -56,13 +56,13 @@ func NewNSQ(instance string, metrics mdata.Metrics, stats met.Backend) *Notifier } consumer.AddConcurrentHandlers(c, 2) - err = consumer.ConnectToNSQDs(NsqdAdds) + err = consumer.ConnectToNSQDs(nsqdAdds) if err != nil { log.Fatal(4, "nsq-cluster failed to connect to NSQDs. %s", err) } log.Info("nsq-cluster persist consumer connected to nsqd") - err = consumer.ConnectToNSQLookupds(LookupdAdds) + err = consumer.ConnectToNSQLookupds(lookupdAdds) if err != nil { log.Fatal(4, "nsq-cluster failed to connect to NSQLookupds. %s", err) } @@ -123,7 +123,7 @@ func (c *NotifierNSQ) flush() { // will result in this loop repeating forever until we successfully publish our msg. hostPoolResponse := hostPool.Get() prod := producers[hostPoolResponse.Host()] - err = prod.Publish(Topic, buf.Bytes()) + err = prod.Publish(topic, buf.Bytes()) // Hosts that are marked as dead will be retried after 30seconds. If we published // successfully, then sending a nil error will mark the host as alive again. hostPoolResponse.Mark(err)