Skip to content

Commit

Permalink
Add Kafka output plugin topic_suffix option (#3196)
Browse files Browse the repository at this point in the history
  • Loading branch information
trueneu authored and danielnelson committed Sep 6, 2017
1 parent ab1c11b commit 5d4eec6
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 44 deletions.
33 changes: 31 additions & 2 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,34 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"
## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to separator + measurement's name
## tags - suffix equals to separator + specified tags' values
## interleaved with separator
## Suffix equals to "_" + measurement's name
# [outputs.kafka.topic_suffix]
# method = "measurement"
# separator = "_"
## Suffix equals to "__" + measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo"]
# separator = "__"
## Suffix equals to "_" + measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# separator = "_"
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -57,10 +85,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]`
* `topic`: The `kafka` topic to publish to.


### Optional parameters:

* `routing_tag`: if this tag exists, its value will be used as the routing key
* `routing_tag`: If this tag exists, its value will be used as the routing key
* `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression
* `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster.
* `max_retry`: Max number of times to retry failed write
Expand All @@ -69,3 +96,5 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
* `topic_suffix`: Which, if any, method of calculating `kafka` topic suffix to use.
For examples, please refer to sample configuration.
164 changes: 122 additions & 42 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"crypto/tls"
"fmt"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -12,54 +13,97 @@ import (
"github.com/Shopify/sarama"
)

type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer
var ValidTopicSuffixMethods = []string{
"",
"measurement",
"tags",
}

type (
Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer
}
TopicSuffix struct {
Method string `toml:"method"`
Keys []string `toml:"keys"`
Separator string `toml:"separator"`
}
)

var sampleConfig = `
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"
## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to separator + measurement's name
## tags - suffix equals to separator + specified tags' values
## interleaved with separator
## Suffix equals to "_" + measurement name
# [outputs.kafka.topic_suffix]
# method = "measurement"
# separator = "_"
## Suffix equals to "__" + measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo"]
# separator = "__"
## Suffix equals to "_" + measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# separator = "_"
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -108,11 +152,45 @@ var sampleConfig = `
data_format = "influx"
`

func ValidateTopicSuffixMethod(method string) error {
for _, validMethod := range ValidTopicSuffixMethods {
if method == validMethod {
return nil
}
}
return fmt.Errorf("Unkown topic suffix method provided: %s", method)
}

func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
var topicName string
switch k.TopicSuffix.Method {
case "measurement":
topicName = k.Topic + k.TopicSuffix.Separator + metric.Name()
case "tags":
var topicNameComponents []string
topicNameComponents = append(topicNameComponents, k.Topic)
for _, tag := range k.TopicSuffix.Keys {
tagValue := metric.Tags()[tag]
if tagValue != "" {
topicNameComponents = append(topicNameComponents, tagValue)
}
}
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
default:
topicName = k.Topic
}
return topicName
}

func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}

func (k *Kafka) Connect() error {
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
if err != nil {
return err
}
config := sarama.NewConfig()

config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
Expand Down Expand Up @@ -175,8 +253,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
return err
}

topicName := k.GetTopicName(metric)

m := &sarama.ProducerMessage{
Topic: k.Topic,
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.Tags()[k.RoutingTag]; ok {
Expand Down
67 changes: 67 additions & 0 deletions plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/stretchr/testify/require"
)

type topicSuffixTestpair struct {
topicSuffix TopicSuffix
expectedTopic string
}

func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
Expand All @@ -28,4 +33,66 @@ func TestConnectAndWrite(t *testing.T) {
// Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockMetrics())
require.NoError(t, err)
k.Close()
}

func TestTopicSuffixes(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

topic := "Test"

metric := testutil.TestMetric(1)
metricTagName := "tag1"
metricTagValue := metric.Tags()[metricTagName]
metricName := metric.Name()

var testcases = []topicSuffixTestpair{
// This ensures empty separator is okay
{TopicSuffix{Method: "measurement"},
topic + metricName},
{TopicSuffix{Method: "measurement", Separator: "sep"},
topic + "sep" + metricName},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName}, Separator: "_"},
topic + "_" + metricTagValue},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, Separator: "___"},
topic + "___" + metricTagValue + "___" + metricTagValue + "___" + metricTagValue},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}},
topic + metricTagValue + metricTagValue + metricTagValue},
// This ensures non-existing tags are ignored
{TopicSuffix{Method: "tags", Keys: []string{"non_existing_tag", "non_existing_tag"}, Separator: "___"},
topic},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, "non_existing_tag"}, Separator: "___"},
topic + "___" + metricTagValue},
// This ensures backward compatibility
{TopicSuffix{},
topic},
}

for _, testcase := range testcases {
topicSuffix := testcase.topicSuffix
expectedTopic := testcase.expectedTopic
k := &Kafka{
Topic: topic,
TopicSuffix: topicSuffix,
}

topic := k.GetTopicName(metric)
require.Equal(t, expectedTopic, topic)
}
}

func TestValidateTopicSuffixMethod(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

err := ValidateTopicSuffixMethod("invalid_topic_suffix_method")
require.Error(t, err, "Topic suffix method used should be invalid.")

for _, method := range ValidTopicSuffixMethods {
err := ValidateTopicSuffixMethod(method)
require.NoError(t, err, "Topic suffix method used should be valid.")
}
}

0 comments on commit 5d4eec6

Please sign in to comment.