From 491b55f57be2e23450a5d558d6475ed850cbda4c Mon Sep 17 00:00:00 2001 From: Pavel Gurkov Date: Fri, 21 Jul 2017 11:36:15 +0200 Subject: [PATCH 1/5] Add ability to the Kafka output plugin to send data to different topics, based on the metric name or tags --- plugins/outputs/kafka/README.md | 32 ++++- plugins/outputs/kafka/kafka.go | 173 +++++++++++++++++++++------- plugins/outputs/kafka/kafka_test.go | 52 +++++++++ 3 files changed, 213 insertions(+), 44 deletions(-) diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index abd9c4921aec0..0d6e3a5142fd3 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -8,6 +8,33 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm brokers = ["localhost:9092"] ## Kafka topic for producer messages topic = "telegraf" + + ## Optional topic suffix configuration. + ## If the section is omitted, no suffix is used. + ## Following topic suffix methods are supported: + ## measurement - suffix equals to measurement's name + ## tag - suffix equals to specified tag's value + ## tags - suffix equals to specified tags' values + ## interleaved with key_separator + + ## Suffix equals to measurement name to topic + # [outputs.kafka.topic_suffix] + # method = "measurement" + + ## Suffix equals to measurement's "foo" tag value. + ## If there's no such a tag, suffix equals to an empty string + # [outputs.kafka.topic_suffix] + # method = "tag" + # key = "foo" + + ## Suffix equals to measurement's "foo" and "bar" + ## tag values, separated by "_". If there is no such tags, + ## their values treated as empty strings. + # [outputs.kafka.topic_suffix] + # method = "tags" + # keys = ["foo", "bar"] + # key_separator = "_" + ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" @@ -57,10 +84,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm * `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]` * `topic`: The `kafka` topic to publish to. - ### Optional parameters: -* `routing_tag`: if this tag exists, its value will be used as the routing key +* `routing_tag`: If this tag exists, its value will be used as the routing key * `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression * `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster. * `max_retry`: Max number of times to retry failed write @@ -69,3 +95,5 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm * `ssl_key`: SSL key * `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false) * `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md) +* `topic_suffix`: Which, if any, method of calculating `kafka` topic suffix to use. +For examples, please refer to sample configuration. \ No newline at end of file diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 421b5c2a1529d..120aebd472130 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "crypto/tls" "fmt" + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -12,54 +13,107 @@ import ( "github.com/Shopify/sarama" ) -type Kafka struct { - // Kafka brokers to send metrics to - Brokers []string - // Kafka topic - Topic string - // Routing Key Tag - RoutingTag string `toml:"routing_tag"` - // Compression Codec Tag - CompressionCodec int - // RequiredAcks Tag - RequiredAcks int - // MaxRetry Tag - MaxRetry int - - // Legacy SSL config options - // TLS client certificate - Certificate string - // TLS client key - Key string - // TLS certificate authority - CA string - - // Path to CA file - SSLCA string `toml:"ssl_ca"` - // Path to host cert file - SSLCert string `toml:"ssl_cert"` - // Path to cert key file - SSLKey string `toml:"ssl_key"` - - // Skip SSL verification - InsecureSkipVerify bool - - // SASL Username - SASLUsername string `toml:"sasl_username"` - // SASL Password - SASLPassword string `toml:"sasl_password"` - - tlsConfig tls.Config - producer sarama.SyncProducer - - serializer serializers.Serializer +const ( + TOPIC_SUFFIX_METHOD_EMPTY uint8 = iota + TOPIC_SUFFIX_METHOD_MEASUREMENT + TOPIC_SUFFIX_METHOD_TAG + TOPIC_SUFFIX_METHOD_TAGS +) + +var TopicSuffixMethodStringToUID = map[string]uint8{ + "": TOPIC_SUFFIX_METHOD_EMPTY, + "measurement": TOPIC_SUFFIX_METHOD_MEASUREMENT, + "tag": TOPIC_SUFFIX_METHOD_TAG, + "tags": TOPIC_SUFFIX_METHOD_TAGS, } +type ( + Kafka struct { + // Kafka brokers to send metrics to + Brokers []string + // Kafka topic + Topic string + // Kafka topic suffix option + TopicSuffix TopicSuffix `toml:"topic_suffix"` + // Routing Key Tag + RoutingTag string `toml:"routing_tag"` + // Compression Codec Tag + CompressionCodec int + // RequiredAcks Tag + RequiredAcks int + // MaxRetry Tag + MaxRetry int + + // Legacy SSL config options + // TLS client certificate + Certificate string + // TLS client key + Key string + // TLS certificate authority + CA string + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + + // Skip SSL verification + InsecureSkipVerify bool + + // SASL Username + SASLUsername string `toml:"sasl_username"` + // SASL Password + SASLPassword string `toml:"sasl_password"` + + tlsConfig tls.Config + producer sarama.SyncProducer + + serializer serializers.Serializer + + topicSuffixMethodUID uint8 + } + TopicSuffix struct { + Method string `toml:"method"` + Key string `toml:"key"` + Keys []string `toml:"keys"` + KeySeparator string `toml:"key_separator"` + } +) + var sampleConfig = ` ## URLs of kafka brokers brokers = ["localhost:9092"] ## Kafka topic for producer messages topic = "telegraf" + + ## Optional topic suffix configuration. + ## If the section is omitted, no suffix is used. + ## Following topic suffix methods are supported: + ## measurement - suffix equals to measurement's name + ## tag - suffix equals to specified tag's value + ## tags - suffix equals to specified tags' values + ## interleaved with key_separator + + ## Suffix equals to measurement name to topic + # [outputs.kafka.topic_suffix] + # method = "measurement" + + ## Suffix equals to measurement's "foo" tag value. + ## If there's no such a tag, suffix equals to an empty string + # [outputs.kafka.topic_suffix] + # method = "tag" + # key = "foo" + + ## Suffix equals to measurement's "foo" and "bar" + ## tag values, separated by "_". If there is no such tags, + ## their values treated as empty strings. + # [outputs.kafka.topic_suffix] + # method = "tags" + # keys = ["foo", "bar"] + # key_separator = "_" + ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" @@ -108,11 +162,44 @@ var sampleConfig = ` data_format = "influx" ` +func GetTopicSuffixMethodUID(method string) (uint8, error) { + methodUID, ok := TopicSuffixMethodStringToUID[method] + if !ok { + return 0, fmt.Errorf("Unkown topic suffix method provided: %s", method) + } + return methodUID, nil +} + +func (k *Kafka) GetTopicName(metric telegraf.Metric) string { + var topicName string + switch k.topicSuffixMethodUID { + case TOPIC_SUFFIX_METHOD_MEASUREMENT: + topicName = k.Topic + metric.Name() + case TOPIC_SUFFIX_METHOD_TAG: + topicName = k.Topic + metric.Tags()[k.TopicSuffix.Key] + case TOPIC_SUFFIX_METHOD_TAGS: + var tags_values []string + for _, tag := range k.TopicSuffix.Keys { + tags_values = append(tags_values, metric.Tags()[tag]) + } + topicName = k.Topic + strings.Join(tags_values, k.TopicSuffix.KeySeparator) + default: + topicName = k.Topic + } + return topicName +} + func (k *Kafka) SetSerializer(serializer serializers.Serializer) { k.serializer = serializer } func (k *Kafka) Connect() error { + topicSuffixMethod, err := GetTopicSuffixMethodUID(k.TopicSuffix.Method) + if err != nil { + return err + } + k.topicSuffixMethodUID = topicSuffixMethod + config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks) @@ -175,8 +262,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { return err } + topicName := k.GetTopicName(metric) + m := &sarama.ProducerMessage{ - Topic: k.Topic, + Topic: topicName, Value: sarama.ByteEncoder(buf), } if h, ok := metric.Tags()[k.RoutingTag]; ok { diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index f99e0ecea99fb..6be1a0fa87042 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -8,6 +8,11 @@ import ( "github.com/stretchr/testify/require" ) +type topicSuffixTestpair struct { + topicSuffix TopicSuffix + expectedTopic string +} + func TestConnectAndWrite(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -28,4 +33,51 @@ func TestConnectAndWrite(t *testing.T) { // Verify that we can successfully write data to the kafka broker err = k.Write(testutil.MockMetrics()) require.NoError(t, err) + k.Close() +} + +func TestTopicSuffixes(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + brokers := []string{testutil.GetLocalHost() + ":9092"} + s, _ := serializers.NewInfluxSerializer() + + topic := "Test_" + + metric := testutil.TestMetric(1) + metricTagName := "tag1" + metricTagValue := metric.Tags()[metricTagName] + metricName := metric.Name() + + var testcases = []topicSuffixTestpair{ + {TopicSuffix{Method: "measurement"}, + topic + metricName}, + {TopicSuffix{Method: "tag", Key: metricTagName}, + topic + metricTagValue}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, KeySeparator: "___"}, + topic + metricTagValue + "___" + metricTagValue + "___" + metricTagValue}, + // This ensures backward compatibility + {TopicSuffix{}, + topic}, + } + + for _, testcase := range testcases { + topicSuffix := testcase.topicSuffix + expectedTopic := testcase.expectedTopic + k := &Kafka{ + Brokers: brokers, + Topic: topic, + serializer: s, + TopicSuffix: topicSuffix, + } + + err := k.Connect() + require.NoError(t, err) + + topic := k.GetTopicName(metric) + require.Equal(t, expectedTopic, topic) + k.Close() + } } From 96c6bdf271868295079705f0a96cfc33b39b0717 Mon Sep 17 00:00:00 2001 From: Pavel Gurkov Date: Mon, 4 Sep 2017 17:14:44 +0200 Subject: [PATCH 2/5] Format kafka.go with go fmt --- plugins/outputs/kafka/kafka.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 120aebd472130..dc0238d836b94 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -14,7 +14,7 @@ import ( ) const ( - TOPIC_SUFFIX_METHOD_EMPTY uint8 = iota + TOPIC_SUFFIX_METHOD_EMPTY uint8 = iota TOPIC_SUFFIX_METHOD_MEASUREMENT TOPIC_SUFFIX_METHOD_TAG TOPIC_SUFFIX_METHOD_TAGS @@ -75,10 +75,10 @@ type ( topicSuffixMethodUID uint8 } TopicSuffix struct { - Method string `toml:"method"` - Key string `toml:"key"` + Method string `toml:"method"` + Key string `toml:"key"` Keys []string `toml:"keys"` - KeySeparator string `toml:"key_separator"` + KeySeparator string `toml:"key_separator"` } ) From db156e206dd462cb5404a1de0090e2015da091ef Mon Sep 17 00:00:00 2001 From: Pavel Gurkov Date: Wed, 6 Sep 2017 11:08:41 +0200 Subject: [PATCH 3/5] Remove tag topic suffix method Add test for ValidateTopicSuffixMethod Remove k.Connect() from topic suffixes test --- plugins/outputs/kafka/README.md | 9 ++--- plugins/outputs/kafka/kafka.go | 59 +++++++++++------------------ plugins/outputs/kafka/kafka_test.go | 27 +++++++------ 3 files changed, 42 insertions(+), 53 deletions(-) diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 0d6e3a5142fd3..dbd7f7efd9aab 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -13,9 +13,8 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: ## measurement - suffix equals to measurement's name - ## tag - suffix equals to specified tag's value ## tags - suffix equals to specified tags' values - ## interleaved with key_separator + ## interleaved with separator ## Suffix equals to measurement name to topic # [outputs.kafka.topic_suffix] @@ -24,8 +23,8 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## Suffix equals to measurement's "foo" tag value. ## If there's no such a tag, suffix equals to an empty string # [outputs.kafka.topic_suffix] - # method = "tag" - # key = "foo" + # method = "tags" + # keys = ["foo"] ## Suffix equals to measurement's "foo" and "bar" ## tag values, separated by "_". If there is no such tags, @@ -33,7 +32,7 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm # [outputs.kafka.topic_suffix] # method = "tags" # keys = ["foo", "bar"] - # key_separator = "_" + # separator = "_" ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index dc0238d836b94..2165821e07933 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -13,18 +13,10 @@ import ( "github.com/Shopify/sarama" ) -const ( - TOPIC_SUFFIX_METHOD_EMPTY uint8 = iota - TOPIC_SUFFIX_METHOD_MEASUREMENT - TOPIC_SUFFIX_METHOD_TAG - TOPIC_SUFFIX_METHOD_TAGS -) - -var TopicSuffixMethodStringToUID = map[string]uint8{ - "": TOPIC_SUFFIX_METHOD_EMPTY, - "measurement": TOPIC_SUFFIX_METHOD_MEASUREMENT, - "tag": TOPIC_SUFFIX_METHOD_TAG, - "tags": TOPIC_SUFFIX_METHOD_TAGS, +var ValidTopicSuffixMethods = []string{ + "", + "measurement", + "tags", } type ( @@ -71,14 +63,11 @@ type ( producer sarama.SyncProducer serializer serializers.Serializer - - topicSuffixMethodUID uint8 } TopicSuffix struct { - Method string `toml:"method"` - Key string `toml:"key"` - Keys []string `toml:"keys"` - KeySeparator string `toml:"key_separator"` + Method string `toml:"method"` + Keys []string `toml:"keys"` + Separator string `toml:"separator"` } ) @@ -92,9 +81,8 @@ var sampleConfig = ` ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: ## measurement - suffix equals to measurement's name - ## tag - suffix equals to specified tag's value ## tags - suffix equals to specified tags' values - ## interleaved with key_separator + ## interleaved with separator ## Suffix equals to measurement name to topic # [outputs.kafka.topic_suffix] @@ -103,8 +91,8 @@ var sampleConfig = ` ## Suffix equals to measurement's "foo" tag value. ## If there's no such a tag, suffix equals to an empty string # [outputs.kafka.topic_suffix] - # method = "tag" - # key = "foo" + # method = "tags" + # keys = ["foo"] ## Suffix equals to measurement's "foo" and "bar" ## tag values, separated by "_". If there is no such tags, @@ -112,7 +100,7 @@ var sampleConfig = ` # [outputs.kafka.topic_suffix] # method = "tags" # keys = ["foo", "bar"] - # key_separator = "_" + # separator = "_" ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key @@ -162,27 +150,26 @@ var sampleConfig = ` data_format = "influx" ` -func GetTopicSuffixMethodUID(method string) (uint8, error) { - methodUID, ok := TopicSuffixMethodStringToUID[method] - if !ok { - return 0, fmt.Errorf("Unkown topic suffix method provided: %s", method) +func ValidateTopicSuffixMethod(method string) error { + for _, validMethod := range ValidTopicSuffixMethods { + if method == validMethod { + return nil + } } - return methodUID, nil + return fmt.Errorf("Unkown topic suffix method provided: %s", method) } func (k *Kafka) GetTopicName(metric telegraf.Metric) string { var topicName string - switch k.topicSuffixMethodUID { - case TOPIC_SUFFIX_METHOD_MEASUREMENT: + switch k.TopicSuffix.Method { + case "measurement": topicName = k.Topic + metric.Name() - case TOPIC_SUFFIX_METHOD_TAG: - topicName = k.Topic + metric.Tags()[k.TopicSuffix.Key] - case TOPIC_SUFFIX_METHOD_TAGS: + case "tags": var tags_values []string for _, tag := range k.TopicSuffix.Keys { tags_values = append(tags_values, metric.Tags()[tag]) } - topicName = k.Topic + strings.Join(tags_values, k.TopicSuffix.KeySeparator) + topicName = k.Topic + strings.Join(tags_values, k.TopicSuffix.Separator) default: topicName = k.Topic } @@ -194,12 +181,10 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) { } func (k *Kafka) Connect() error { - topicSuffixMethod, err := GetTopicSuffixMethodUID(k.TopicSuffix.Method) + err := ValidateTopicSuffixMethod(k.TopicSuffix.Method) if err != nil { return err } - k.topicSuffixMethodUID = topicSuffixMethod - config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks) diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 6be1a0fa87042..1764cb8835bc2 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -41,9 +41,6 @@ func TestTopicSuffixes(t *testing.T) { t.Skip("Skipping integration test in short mode") } - brokers := []string{testutil.GetLocalHost() + ":9092"} - s, _ := serializers.NewInfluxSerializer() - topic := "Test_" metric := testutil.TestMetric(1) @@ -54,9 +51,9 @@ func TestTopicSuffixes(t *testing.T) { var testcases = []topicSuffixTestpair{ {TopicSuffix{Method: "measurement"}, topic + metricName}, - {TopicSuffix{Method: "tag", Key: metricTagName}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName}}, topic + metricTagValue}, - {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, KeySeparator: "___"}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, Separator: "___"}, topic + metricTagValue + "___" + metricTagValue + "___" + metricTagValue}, // This ensures backward compatibility {TopicSuffix{}, @@ -67,17 +64,25 @@ func TestTopicSuffixes(t *testing.T) { topicSuffix := testcase.topicSuffix expectedTopic := testcase.expectedTopic k := &Kafka{ - Brokers: brokers, Topic: topic, - serializer: s, TopicSuffix: topicSuffix, } - err := k.Connect() - require.NoError(t, err) - topic := k.GetTopicName(metric) require.Equal(t, expectedTopic, topic) - k.Close() + } +} + +func TestValidateTopicSuffixMethod(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + err := ValidateTopicSuffixMethod("invalid_topic_suffix_method") + require.Error(t, err, "Topic suffix method used should be invalid.") + + for _, method := range ValidTopicSuffixMethods { + err := ValidateTopicSuffixMethod(method) + require.NoError(t, err, "Topic suffix method used should be valid.") } } From 0223a17c9287d2f47b7a95b123565586862f3b4c Mon Sep 17 00:00:00 2001 From: Pavel Gurkov Date: Wed, 6 Sep 2017 11:33:11 +0200 Subject: [PATCH 4/5] Add separator between topic and suffix Modify existing testcases accordingly Add more testcases for topic names Modify README.md accordingly --- plugins/outputs/kafka/README.md | 12 +++++++----- plugins/outputs/kafka/kafka.go | 24 +++++++++++++++--------- plugins/outputs/kafka/kafka_test.go | 16 ++++++++++++---- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index dbd7f7efd9aab..b112c09cd8621 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -12,21 +12,23 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: - ## measurement - suffix equals to measurement's name - ## tags - suffix equals to specified tags' values + ## measurement - suffix equals to separator + measurement's name + ## tags - suffix equals to separator + specified tags' values ## interleaved with separator - ## Suffix equals to measurement name to topic + ## Suffix equals to "_" + measurement's name # [outputs.kafka.topic_suffix] # method = "measurement" + # separator = "_" - ## Suffix equals to measurement's "foo" tag value. + ## Suffix equals to "__" + measurement's "foo" tag value. ## If there's no such a tag, suffix equals to an empty string # [outputs.kafka.topic_suffix] # method = "tags" # keys = ["foo"] + # separator = "__" - ## Suffix equals to measurement's "foo" and "bar" + ## Suffix equals to "_" + measurement's "foo" and "bar" ## tag values, separated by "_". If there is no such tags, ## their values treated as empty strings. # [outputs.kafka.topic_suffix] diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 2165821e07933..c2f2e4460acb7 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -80,21 +80,23 @@ var sampleConfig = ` ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: - ## measurement - suffix equals to measurement's name - ## tags - suffix equals to specified tags' values + ## measurement - suffix equals to separator + measurement's name + ## tags - suffix equals to separator + specified tags' values ## interleaved with separator - ## Suffix equals to measurement name to topic + ## Suffix equals to "_" + measurement name # [outputs.kafka.topic_suffix] # method = "measurement" + # separator = "_" - ## Suffix equals to measurement's "foo" tag value. + ## Suffix equals to "__" + measurement's "foo" tag value. ## If there's no such a tag, suffix equals to an empty string # [outputs.kafka.topic_suffix] # method = "tags" # keys = ["foo"] + # separator = "__" - ## Suffix equals to measurement's "foo" and "bar" + ## Suffix equals to "_" + measurement's "foo" and "bar" ## tag values, separated by "_". If there is no such tags, ## their values treated as empty strings. # [outputs.kafka.topic_suffix] @@ -163,13 +165,17 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string { var topicName string switch k.TopicSuffix.Method { case "measurement": - topicName = k.Topic + metric.Name() + topicName = k.Topic + k.TopicSuffix.Separator + metric.Name() case "tags": - var tags_values []string + var topicNameComponents []string + topicNameComponents = append(topicNameComponents, k.Topic) for _, tag := range k.TopicSuffix.Keys { - tags_values = append(tags_values, metric.Tags()[tag]) + tagValue := metric.Tags()[tag] + if tagValue != "" { + topicNameComponents = append(topicNameComponents, tagValue) + } } - topicName = k.Topic + strings.Join(tags_values, k.TopicSuffix.Separator) + topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator) default: topicName = k.Topic } diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 1764cb8835bc2..aff91e91d761c 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -41,7 +41,7 @@ func TestTopicSuffixes(t *testing.T) { t.Skip("Skipping integration test in short mode") } - topic := "Test_" + topic := "Test" metric := testutil.TestMetric(1) metricTagName := "tag1" @@ -49,12 +49,20 @@ func TestTopicSuffixes(t *testing.T) { metricName := metric.Name() var testcases = []topicSuffixTestpair{ + // This ensures empty separator is okay {TopicSuffix{Method: "measurement"}, topic + metricName}, - {TopicSuffix{Method: "tags", Keys: []string{metricTagName}}, - topic + metricTagValue}, + {TopicSuffix{Method: "measurement", Separator: "sep"}, + topic + "sep" + metricName}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName}, Separator: "_"}, + topic + "_" + metricTagValue}, {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, Separator: "___"}, - topic + metricTagValue + "___" + metricTagValue + "___" + metricTagValue}, + topic + "___" + metricTagValue + "___" + metricTagValue + "___" + metricTagValue}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}}, + topic + metricTagValue + metricTagValue + metricTagValue}, + // This ensures non-existing tags are ignored + {TopicSuffix{Method: "tags", Keys: []string{metricTagName, "non_existing_tag"}, Separator: "___"}, + topic + "___" + metricTagValue}, // This ensures backward compatibility {TopicSuffix{}, topic}, From 0c7ed74ef1f97211ee3bc91c7c34b50b7349001d Mon Sep 17 00:00:00 2001 From: Pavel Gurkov Date: Wed, 6 Sep 2017 12:30:05 +0200 Subject: [PATCH 5/5] Add another testcase for non-existing tag --- plugins/outputs/kafka/kafka_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index aff91e91d761c..b18d9f15d9d4a 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -61,6 +61,8 @@ func TestTopicSuffixes(t *testing.T) { {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}}, topic + metricTagValue + metricTagValue + metricTagValue}, // This ensures non-existing tags are ignored + {TopicSuffix{Method: "tags", Keys: []string{"non_existing_tag", "non_existing_tag"}, Separator: "___"}, + topic}, {TopicSuffix{Method: "tags", Keys: []string{metricTagName, "non_existing_tag"}, Separator: "___"}, topic + "___" + metricTagValue}, // This ensures backward compatibility