Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #472 from raintank/partitionedNotifyKafka
Browse files Browse the repository at this point in the history
Make kafka-cluster chunk save messages partition aware.
  • Loading branch information
Anthony Woods authored Jan 18, 2017
2 parents 5c77d2c + a5e0f31 commit 4c79325
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 106 deletions.
4 changes: 4 additions & 0 deletions docker/docker-cluster/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
WAIT_HOSTS: kafka:9092,cassandra:9042
WAIT_TIMEOUT: 30
MT_KAFKA_MDM_IN_PARTITIONS: 0,1,2,3
MT_KAFKA_CLUSTER_PARTITIONS: 0,1,2,3
MT_INSTANCE: metrictank0
MT_LOG_LEVEL: 2
MT_CLUSTER_MODE: multi
Expand All @@ -33,6 +34,7 @@ services:
WAIT_HOSTS: kafka:9092,cassandra:9042,metrictank0:6060
WAIT_TIMEOUT: 30
MT_KAFKA_MDM_IN_PARTITIONS: 0,1,2,3
MT_KAFKA_CLUSTER_PARTITIONS: 0,1,2,3
MT_INSTANCE: metrictank1
MT_LOG_LEVEL: 2
MT_CLUSTER_MODE: multi
Expand All @@ -54,6 +56,7 @@ services:
WAIT_HOSTS: kafka:9092,cassandra:9042,metrictank0:6060
WAIT_TIMEOUT: 30
MT_KAFKA_MDM_IN_PARTITIONS: 4,5,6,7
MT_KAFKA_CLUSTER_PARTITIONS: 4,5,6,7
MT_INSTANCE: metrictank2
MT_LOG_LEVEL: 2
MT_CLUSTER_MODE: multi
Expand All @@ -76,6 +79,7 @@ services:
WAIT_HOSTS: kafka:9092,cassandra:9042,metrictank0:6060
WAIT_TIMEOUT: 30
MT_KAFKA_MDM_IN_PARTITIONS: 4,5,6,7
MT_KAFKA_CLUSTER_PARTITIONS: 4,5,6,7
MT_INSTANCE: metrictank3
MT_LOG_LEVEL: 2
MT_CLUSTER_MODE: multi
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ enabled = true
brokers = kafka:9092
# kafka topic (only one)
topic = metricpersist
# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
offset = last
# save interval for offsets
Expand Down
4 changes: 4 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ enabled = true
brokers = kafka:9092
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
offset = last
# save interval for offsets
Expand Down
4 changes: 4 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ enabled = false
brokers = kafka:9092
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for paritioning metrics. This should match the settings of tsdb-gw. One of byOrg|bySeries
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
offset = last
# save interval for offsets
Expand Down
65 changes: 17 additions & 48 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,6 @@ func ConfigProcess(instance string) {
brokers = strings.Split(brokerStr, ",")
topics = strings.Split(topicStr, ",")

if partitionStr != "*" {
parts := strings.Split(partitionStr, ",")
for _, part := range parts {
i, err := strconv.Atoi(part)
if err != nil {
log.Fatal(4, "could not parse partition %q. partitions must be '*' or a comma separated list of id's", part)
}
partitions = append(partitions, int32(i))
}
}

config = sarama.NewConfig()

config.ClientID = instance + "-mdm"
Expand All @@ -139,51 +128,31 @@ func ConfigProcess(instance string) {
}
defer client.Close()

partitionCount := 0
for i, topic := range topics {
availParts, err := client.Partitions(topic)
if err != nil {
log.Fatal(4, "kafka-mdm: Faild to get partitions for topic %s. %s", topic, err)
}
if len(availParts) == 0 {
log.Fatal(4, "kafka-mdm: No partitions returned for topic %s", topic)
}
log.Info("kafka-mdm: available partitions: %v", availParts)
if i > 0 {
if len(availParts) != partitionCount {
log.Fatal(4, "kafka-mdm: configured topics have different partition counts, this is not supported")
availParts, err := kafka.GetPartitions(client, topics)
if err != nil {
log.Fatal(4, "kafka-mdm: %s", err.Error())
}
log.Info("kafka-mdm: available partitions %v", availParts)
if partitionStr == "*" {
partitions = availParts
} else {
parts := strings.Split(partitionStr, ",")
for _, part := range parts {
i, err := strconv.Atoi(part)
if err != nil {
log.Fatal(4, "could not parse partition %q. partitions must be '*' or a comma separated list of id's", part)
}
continue
partitions = append(partitions, int32(i))
}
partitionCount = len(availParts)
if partitionStr == "*" {
partitions = availParts
} else {
missing := diffPartitions(partitions, availParts)
if len(missing) > 0 {
log.Fatal(4, "kafka-mdm: configured partitions not in list of available partitions. missing %v", missing)
}
missing := kafka.DiffPartitions(partitions, availParts)
if len(missing) > 0 {
log.Fatal(4, "kafka-mdm: configured partitions not in list of available partitions. missing %v", missing)
}
}
// record our partitions so others (MetricIdx) can use the partitioning information.
cluster.Manager.SetPartitions(partitions)
}

// setDiff returns elements that are in a but not in b
func diffPartitions(a []int32, b []int32) []int32 {
var diff []int32
Iter:
for _, eA := range a {
for _, eB := range b {
if eA == eB {
continue Iter
}
}
diff = append(diff, eA)
}
return diff
}

func New() *KafkaMdm {
client, err := sarama.NewClient(brokers, config)
if err != nil {
Expand Down
45 changes: 45 additions & 0 deletions kafka/partitions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kafka

import (
"fmt"

"github.com/Shopify/sarama"
)

// returns elements that are in a but not in b
func DiffPartitions(a []int32, b []int32) []int32 {
var diff []int32
Iter:
for _, eA := range a {
for _, eB := range b {
if eA == eB {
continue Iter
}
}
diff = append(diff, eA)
}
return diff
}

func GetPartitions(client sarama.Client, topics []string) ([]int32, error) {
partitionCount := 0
partitions := make([]int32, 0)
var err error
for i, topic := range topics {
partitions, err = client.Partitions(topic)
if err != nil {
return nil, fmt.Errorf("Failed to get partitions for topic %s. %s", topic, err)
}
if len(partitions) == 0 {
return nil, fmt.Errorf("No partitions returned for topic %s", topic)
}
if i > 0 {
if len(partitions) != partitionCount {
return nil, fmt.Errorf("Configured topics have different partition counts, this is not supported")
}
continue
}
partitionCount = len(partitions)
}
return partitions, nil
}
56 changes: 52 additions & 4 deletions mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package notifierKafka
import (
"flag"
"log"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/kafka"
"github.com/raintank/metrictank/stats"
"github.com/rakyll/globalconf"
)
Expand All @@ -20,15 +23,24 @@ var dataDir string
var config *sarama.Config
var offsetDuration time.Duration
var offsetCommitInterval time.Duration
var partitionStr string
var partitions []int32
var partitioner *cluster.KafkaPartitioner
var partitionScheme string

var messagesPublished *stats.Counter32
var messagesSize *stats.Meter32
// metric cluster.notifier.kafka.messages-published is a counter of messages published to the kafka cluster notifier
var messagesPublished = stats.NewCounter32("cluster.notifier.kafka.messages-published")

func ConfigSetup() {
// metric cluster.notifier.kafka.message_size is the sizes seen of messages through the kafka cluster notifier
var messagesSize = stats.NewMeter32("cluster.notifier.kafka.message_size", false)

func init() {
fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
fs.BoolVar(&Enabled, "enabled", false, "")
fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for paritioning metrics. (byOrg|bySeries)")
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index")
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
Expand Down Expand Up @@ -57,9 +69,45 @@ func ConfigProcess(instance string) {
config.Version = sarama.V0_10_0_0
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Compression = sarama.CompressionNone
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Return.Successes = true
err = config.Validate()
if err != nil {
log.Fatal(2, "kafka-cluster invalid consumer config: %s", err)
}

partitioner, err = cluster.NewKafkaPartitioner(partitionScheme)
if err != nil {
log.Fatal(4, "kafka-cluster: failed to initialize partitioner. %s", err)
}

if partitionStr != "*" {
parts := strings.Split(partitionStr, ",")
for _, part := range parts {
i, err := strconv.Atoi(part)
if err != nil {
log.Fatal(4, "kafka-cluster: could not parse partition %q. partitions must be '*' or a comma separated list of id's", part)
}
partitions = append(partitions, int32(i))
}
}
// validate our partitions
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatal(4, "kafka-cluster failed to create client. %s", err)
}
defer client.Close()

availParts, err := kafka.GetPartitions(client, []string{topic})
if err != nil {
log.Fatal(4, "kafka-cluster: %s", err.Error())
}
if partitionStr == "*" {
partitions = availParts
} else {
missing := kafka.DiffPartitions(partitions, availParts)
if len(missing) > 0 {
log.Fatal(4, "kafka-cluster: configured partitions not in list of available partitions. missing %v", missing)
}
}
}
Loading

0 comments on commit 4c79325

Please sign in to comment.