diff --git a/docker/docker-cluster/docker-compose.yml b/docker/docker-cluster/docker-compose.yml index 4c319aef7f..cdbde6060e 100644 --- a/docker/docker-cluster/docker-compose.yml +++ b/docker/docker-cluster/docker-compose.yml @@ -13,6 +13,7 @@ services: WAIT_HOSTS: kafka:9092,cassandra:9042 WAIT_TIMEOUT: 30 MT_KAFKA_MDM_IN_PARTITIONS: 0,1,2,3 + MT_KAFKA_CLUSTER_PARTITIONS: 0,1,2,3 MT_INSTANCE: metrictank0 MT_LOG_LEVEL: 2 MT_CLUSTER_MODE: multi @@ -33,6 +34,7 @@ services: WAIT_HOSTS: kafka:9092,cassandra:9042,metrictank0:6060 WAIT_TIMEOUT: 30 MT_KAFKA_MDM_IN_PARTITIONS: 0,1,2,3 + MT_KAFKA_CLUSTER_PARTITIONS: 0,1,2,3 MT_INSTANCE: metrictank1 MT_LOG_LEVEL: 2 MT_CLUSTER_MODE: multi @@ -54,6 +56,7 @@ services: WAIT_HOSTS: kafka:9092,cassandra:9042,metrictank0:6060 WAIT_TIMEOUT: 30 MT_KAFKA_MDM_IN_PARTITIONS: 4,5,6,7 + MT_KAFKA_CLUSTER_PARTITIONS: 4,5,6,7 MT_INSTANCE: metrictank2 MT_LOG_LEVEL: 2 MT_CLUSTER_MODE: multi @@ -76,6 +79,7 @@ services: WAIT_HOSTS: kafka:9092,cassandra:9042,metrictank0:6060 WAIT_TIMEOUT: 30 MT_KAFKA_MDM_IN_PARTITIONS: 4,5,6,7 + MT_KAFKA_CLUSTER_PARTITIONS: 4,5,6,7 MT_INSTANCE: metrictank3 MT_LOG_LEVEL: 2 MT_CLUSTER_MODE: multi diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index 026cbf1cfe..3a8e2e814d 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -184,6 +184,8 @@ enabled = true brokers = kafka:9092 # kafka topic (only one) topic = metricpersist +# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries +partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration offset = last # save interval for offsets diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index e75c57ace5..4eb68b7f48 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -200,6 +200,10 @@ enabled = true brokers = kafka:9092 # kafka topic (only one) topic = metricpersist +# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. +partitions = * +# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries +partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration offset = last # save interval for offsets diff --git a/docs/config.md b/docs/config.md index 92238f677a..bb73dbc15b 100644 --- a/docs/config.md +++ b/docs/config.md @@ -237,6 +237,10 @@ enabled = false brokers = kafka:9092 # kafka topic (only one) topic = metricpersist +# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. +partitions = * +# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries +partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration offset = last # save interval for offsets diff --git a/input/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go index cddb490670..271a654e84 100644 --- a/input/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -107,17 +107,6 @@ func ConfigProcess(instance string) { brokers = strings.Split(brokerStr, ",") topics = strings.Split(topicStr, ",") - if partitionStr != "*" { - parts := strings.Split(partitionStr, ",") - for _, part := range parts { - i, err := strconv.Atoi(part) - if err != nil { - log.Fatal(4, "could not parse partition %q. partitions must be '*' or a comma separated list of id's", part) - } - partitions = append(partitions, int32(i)) - } - } - config = sarama.NewConfig() config.ClientID = instance + "-mdm" @@ -139,51 +128,31 @@ func ConfigProcess(instance string) { } defer client.Close() - partitionCount := 0 - for i, topic := range topics { - availParts, err := client.Partitions(topic) - if err != nil { - log.Fatal(4, "kafka-mdm: Faild to get partitions for topic %s. %s", topic, err) - } - if len(availParts) == 0 { - log.Fatal(4, "kafka-mdm: No partitions returned for topic %s", topic) - } - log.Info("kafka-mdm: available partitions: %v", availParts) - if i > 0 { - if len(availParts) != partitionCount { - log.Fatal(4, "kafka-mdm: configured topics have different partition counts, this is not supported") + availParts, err := kafka.GetPartitions(client, topics) + if err != nil { + log.Fatal(4, "kafka-mdm: %s", err.Error()) + } + log.Info("kafka-mdm: available partitions %v", availParts) + if partitionStr == "*" { + partitions = availParts + } else { + parts := strings.Split(partitionStr, ",") + for _, part := range parts { + i, err := strconv.Atoi(part) + if err != nil { + log.Fatal(4, "could not parse partition %q. partitions must be '*' or a comma separated list of id's", part) } - continue + partitions = append(partitions, int32(i)) } - partitionCount = len(availParts) - if partitionStr == "*" { - partitions = availParts - } else { - missing := diffPartitions(partitions, availParts) - if len(missing) > 0 { - log.Fatal(4, "kafka-mdm: configured partitions not in list of available partitions. missing %v", missing) - } + missing := kafka.DiffPartitions(partitions, availParts) + if len(missing) > 0 { + log.Fatal(4, "kafka-mdm: configured partitions not in list of available partitions. missing %v", missing) } } // record our partitions so others (MetricIdx) can use the partitioning information. cluster.Manager.SetPartitions(partitions) } -// setDiff returns elements that are in a but not in b -func diffPartitions(a []int32, b []int32) []int32 { - var diff []int32 -Iter: - for _, eA := range a { - for _, eB := range b { - if eA == eB { - continue Iter - } - } - diff = append(diff, eA) - } - return diff -} - func New() *KafkaMdm { client, err := sarama.NewClient(brokers, config) if err != nil { diff --git a/kafka/partitions.go b/kafka/partitions.go new file mode 100644 index 0000000000..4d4e59e57e --- /dev/null +++ b/kafka/partitions.go @@ -0,0 +1,45 @@ +package kafka + +import ( + "fmt" + + "github.com/Shopify/sarama" +) + +// returns elements that are in a but not in b +func DiffPartitions(a []int32, b []int32) []int32 { + var diff []int32 +Iter: + for _, eA := range a { + for _, eB := range b { + if eA == eB { + continue Iter + } + } + diff = append(diff, eA) + } + return diff +} + +func GetPartitions(client sarama.Client, topics []string) ([]int32, error) { + partitionCount := 0 + partitions := make([]int32, 0) + var err error + for i, topic := range topics { + partitions, err = client.Partitions(topic) + if err != nil { + return nil, fmt.Errorf("Failed to get partitions for topic %s. %s", topic, err) + } + if len(partitions) == 0 { + return nil, fmt.Errorf("No partitions returned for topic %s", topic) + } + if i > 0 { + if len(partitions) != partitionCount { + return nil, fmt.Errorf("Configured topics have different partition counts, this is not supported") + } + continue + } + partitionCount = len(partitions) + } + return partitions, nil +} diff --git a/mdata/notifierKafka/cfg.go b/mdata/notifierKafka/cfg.go index c2ea0b1be8..723c816e4c 100644 --- a/mdata/notifierKafka/cfg.go +++ b/mdata/notifierKafka/cfg.go @@ -3,10 +3,13 @@ package notifierKafka import ( "flag" "log" + "strconv" "strings" "time" "github.com/Shopify/sarama" + "github.com/raintank/metrictank/cluster" + "github.com/raintank/metrictank/kafka" "github.com/raintank/metrictank/stats" "github.com/rakyll/globalconf" ) @@ -20,15 +23,24 @@ var dataDir string var config *sarama.Config var offsetDuration time.Duration var offsetCommitInterval time.Duration +var partitionStr string +var partitions []int32 +var partitioner *cluster.KafkaPartitioner +var partitionScheme string -var messagesPublished *stats.Counter32 -var messagesSize *stats.Meter32 +// metric cluster.notifier.kafka.messages-published is a counter of messages published to the kafka cluster notifier +var messagesPublished = stats.NewCounter32("cluster.notifier.kafka.messages-published") -func ConfigSetup() { +// metric cluster.notifier.kafka.message_size is the sizes seen of messages through the kafka cluster notifier +var messagesSize = stats.NewMeter32("cluster.notifier.kafka.message_size", false) + +func init() { fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError) fs.BoolVar(&Enabled, "enabled", false, "") fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)") fs.StringVar(&topic, "topic", "metricpersist", "kafka topic") + fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in") + fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for paritioning metrics. (byOrg|bySeries)") fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") @@ -57,9 +69,45 @@ func ConfigProcess(instance string) { config.Version = sarama.V0_10_0_0 config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message - config.Producer.Compression = sarama.CompressionNone + config.Producer.Compression = sarama.CompressionSnappy + config.Producer.Return.Successes = true err = config.Validate() if err != nil { log.Fatal(2, "kafka-cluster invalid consumer config: %s", err) } + + partitioner, err = cluster.NewKafkaPartitioner(partitionScheme) + if err != nil { + log.Fatal(4, "kafka-cluster: failed to initialize partitioner. %s", err) + } + + if partitionStr != "*" { + parts := strings.Split(partitionStr, ",") + for _, part := range parts { + i, err := strconv.Atoi(part) + if err != nil { + log.Fatal(4, "kafka-cluster: could not parse partition %q. partitions must be '*' or a comma separated list of id's", part) + } + partitions = append(partitions, int32(i)) + } + } + // validate our partitions + client, err := sarama.NewClient(brokers, config) + if err != nil { + log.Fatal(4, "kafka-cluster failed to create client. %s", err) + } + defer client.Close() + + availParts, err := kafka.GetPartitions(client, []string{topic}) + if err != nil { + log.Fatal(4, "kafka-cluster: %s", err.Error()) + } + if partitionStr == "*" { + partitions = availParts + } else { + missing := kafka.DiffPartitions(partitions, availParts) + if len(missing) > 0 { + log.Fatal(4, "kafka-cluster: configured partitions not in list of available partitions. missing %v", missing) + } + } } diff --git a/mdata/notifierKafka/notifierKafka.go b/mdata/notifierKafka/notifierKafka.go index e36b82b6da..86e53d0bdb 100644 --- a/mdata/notifierKafka/notifierKafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -4,13 +4,15 @@ import ( "bytes" "encoding/binary" "encoding/json" + "strings" "sync" "time" "github.com/Shopify/sarama" + "github.com/raintank/metrictank/idx" "github.com/raintank/metrictank/kafka" "github.com/raintank/metrictank/mdata" - "github.com/raintank/metrictank/stats" + "github.com/raintank/metrictank/util" "github.com/raintank/worldping-api/pkg/log" ) @@ -27,32 +29,29 @@ type NotifierKafka struct { // signal to PartitionConsumers to shutdown stopConsuming chan struct{} mdata.Notifier + idx idx.MetricIndex + bPool *util.BufferPool } -func New(instance string, metrics mdata.Metrics) *NotifierKafka { - // metric cluster.notifier.kafka.messages-published is a counter of messages published to the kafka cluster notifier - messagesPublished = stats.NewCounter32("cluster.notifier.kafka.messages-published") - // metric cluster.notifier.kafka.message_size is the sizes seen of messages through the kafka cluster notifier - messagesSize = stats.NewMeter32("cluster.notifier.kafka.message_size", false) - +func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierKafka { client, err := sarama.NewClient(brokers, config) if err != nil { - log.Fatal(2, "kafka-notifier failed to start client: %s", err) + log.Fatal(2, "kafka-cluster failed to start client: %s", err) } consumer, err := sarama.NewConsumerFromClient(client) if err != nil { - log.Fatal(2, "kafka-notifier failed to initialize consumer: %s", err) + log.Fatal(2, "kafka-cluster failed to initialize consumer: %s", err) } - log.Info("kafka-notifier consumer initialized without error") + log.Info("kafka-cluster consumer initialized without error") producer, err := sarama.NewSyncProducerFromClient(client) if err != nil { - log.Fatal(2, "kafka-notifier failed to initialize producer: %s", err) + log.Fatal(2, "kafka-cluster failed to initialize producer: %s", err) } offsetMgr, err := kafka.NewOffsetMgr(dataDir) if err != nil { - log.Fatal(2, "kafka-notifier couldnt create offsetMgr. %s", err) + log.Fatal(2, "kafka-cluster couldnt create offsetMgr. %s", err) } c := NotifierKafka{ @@ -68,6 +67,8 @@ func New(instance string, metrics mdata.Metrics) *NotifierKafka { }, StopChan: make(chan int), stopConsuming: make(chan struct{}), + idx: idx, + bPool: util.NewBufferPool(), } c.start() go c.produce() @@ -76,11 +77,7 @@ func New(instance string, metrics mdata.Metrics) *NotifierKafka { } func (c *NotifierKafka) start() { - // get partitions. - partitions, err := c.consumer.Partitions(topic) - if err != nil { - log.Fatal(4, "kafka-notifier: Faild to get partitions for topic %s. %s", topic, err) - } + var err error for _, partition := range partitions { var offset int64 switch offsetStr { @@ -94,7 +91,7 @@ func (c *NotifierKafka) start() { offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond)) } if err != nil { - log.Fatal(4, "kafka-notifier: failed to get %q offset for %s:%d. %q", offsetStr, topic, partition, err) + log.Fatal(4, "kafka-cluster: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err) } go c.consumePartition(topic, partition, offset) } @@ -106,9 +103,9 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio pc, err := c.consumer.ConsumePartition(topic, partition, partitionOffset) if err != nil { - log.Fatal(4, "kafka-notifier: failed to start partitionConsumer for %s:%d. %s", topic, partition, err) + log.Fatal(4, "kafka-cluster: failed to start partitionConsumer for %s:%d. %s", topic, partition, err) } - log.Info("kafka-notifier: consuming from %s:%d from offset %d", topic, partition, partitionOffset) + log.Info("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, partitionOffset) currentOffset := partitionOffset messages := pc.Messages() ticker := time.NewTicker(offsetCommitInterval) @@ -116,20 +113,20 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio select { case msg := <-messages: if mdata.LogLevel < 2 { - log.Debug("kafka-notifier received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) + log.Debug("kafka-cluster received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) } c.Handle(msg.Value) currentOffset = msg.Offset case <-ticker.C: if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil { - log.Error(3, "kafka-notifier failed to commit offset for %s:%d, %s", topic, partition, err) + log.Error(3, "kafka-cluster failed to commit offset for %s:%d, %s", topic, partition, err) } case <-c.stopConsuming: pc.Close() if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil { - log.Error(3, "kafka-notifier failed to commit offset for %s:%d, %s", topic, partition, err) + log.Error(3, "kafka-cluster failed to commit offset for %s:%d, %s", topic, partition, err) } - log.Info("kafka-notifier consumer for %s:%d ended.", topic, partition) + log.Info("kafka-cluster consumer for %s:%d ended.", topic, partition) return } } @@ -176,36 +173,57 @@ func (c *NotifierKafka) flush() { return } - msg := mdata.PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf} - c.buf = nil - - go func() { - log.Debug("kafka-notifier sending %d batch metricPersist messages", len(msg.SavedChunks)) - - data, err := json.Marshal(&msg) + // In order to correctly route the saveMessages to the correct partition, + // we cant send them in batches anymore. + payload := make([]*sarama.ProducerMessage, 0, len(c.buf)) + var pMsg mdata.PersistMessageBatch + for i, msg := range c.buf { + def, err := c.idx.Get(strings.SplitN(msg.Key, "_", 2)[0]) if err != nil { - log.Fatal(4, "kafka-notifier failed to marshal persistMessage to json.") + log.Error(3, "kafka-cluster: failed to lookup metricDef with id %s", msg.Key) + continue } - buf := new(bytes.Buffer) + buf := bytes.NewBuffer(c.bPool.Get()) binary.Write(buf, binary.LittleEndian, uint8(mdata.PersistMessageBatchV1)) - buf.Write(data) + encoder := json.NewEncoder(buf) + pMsg = mdata.PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf[i : i+1]} + err = encoder.Encode(&pMsg) + if err != nil { + log.Fatal(4, "kafka-cluster failed to marshal persistMessage to json.") + } messagesSize.Value(buf.Len()) - payload := &sarama.ProducerMessage{ + key := c.bPool.Get() + key, err = partitioner.GetPartitionKey(&def, key) + if err != nil { + log.Fatal(4, "Unable to get partitionKey for metricDef with id %s. %s", def.Id, err) + } + kafkaMsg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(buf.Bytes()), + Key: sarama.ByteEncoder(key), } + payload = append(payload, kafkaMsg) + } + c.buf = nil + + go func() { + log.Debug("kafka-cluster sending %d batch metricPersist messages", len(payload)) sent := false for !sent { - // note: currently we don't do partitioning yet for cluster msgs, so no key needed - _, _, err := c.producer.SendMessage(payload) + err := c.producer.SendMessages(payload) if err != nil { - log.Warn("kafka-notifier publisher %s", err) + log.Warn("kafka-cluster publisher %s", err) } else { sent = true } time.Sleep(time.Second) } - messagesPublished.Inc() + messagesPublished.Add(len(payload)) + // put our buffers back in the bufferPool + for _, msg := range payload { + c.bPool.Put([]byte(msg.Key.(sarama.ByteEncoder))) + c.bPool.Put([]byte(msg.Value.(sarama.ByteEncoder))) + } }() } diff --git a/metrictank-sample.ini b/metrictank-sample.ini index b28661d022..39b68d499d 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -204,6 +204,10 @@ enabled = false brokers = kafka:9092 # kafka topic (only one) topic = metricpersist +# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. +partitions = * +# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries +partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration offset = last # save interval for offsets diff --git a/metrictank.go b/metrictank.go index 82e92f9f04..9a9e89b843 100644 --- a/metrictank.go +++ b/metrictank.go @@ -137,7 +137,6 @@ func main() { // load config for cluster handlers notifierNsq.ConfigSetup() - notifierKafka.ConfigSetup() // load config for metricIndexers memory.ConfigSetup() @@ -330,20 +329,6 @@ func main() { log.Warn("It is not recommended to run a mulitnode cluster with more then 1 input plugin.") } - /*********************************** - Initialize MetricPerrist notifiers - ***********************************/ - handlers := make([]mdata.NotifierHandler, 0) - if notifierKafka.Enabled { - handlers = append(handlers, notifierKafka.New(*instance, metrics)) - } - - if notifierNsq.Enabled { - handlers = append(handlers, notifierNsq.New(*instance, metrics)) - } - - mdata.InitPersistNotifier(handlers...) - /*********************************** Start the ClusterManager ***********************************/ @@ -401,6 +386,20 @@ func main() { } log.Info("metricIndex initialized in %s. starting data consumption", time.Now().Sub(pre)) + /*********************************** + Initialize MetricPerrist notifiers + ***********************************/ + handlers := make([]mdata.NotifierHandler, 0) + if notifierKafka.Enabled { + handlers = append(handlers, notifierKafka.New(*instance, metrics, metricIndex)) + } + + if notifierNsq.Enabled { + handlers = append(handlers, notifierNsq.New(*instance, metrics)) + } + + mdata.InitPersistNotifier(handlers...) + /*********************************** Initialize usage Reporting ***********************************/ diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index 0d283c41f0..878617e9df 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -199,6 +199,10 @@ enabled = false brokers = kafka:9092 # kafka topic (only one) topic = metricpersist +# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. +partitions = * +# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries +partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration offset = last # save interval for offsets diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index e7a01360cc..1ab8989b4e 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -187,6 +187,10 @@ enabled = false brokers = localhost:9092 # kafka topic (only one) topic = metricpersist +# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. +partitions = * +# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries +partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration offset = last # save interval for offsets