Skip to content

Commit

Permalink
Add ability to the Kafka output plugin to send data to different topi…
Browse files Browse the repository at this point in the history
…cs, based on the metric name or tags
  • Loading branch information
trueneu authored and Pavel Gurkov committed Sep 4, 2017
1 parent f43af72 commit 491b55f
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 44 deletions.
32 changes: 30 additions & 2 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,33 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"
## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to measurement's name
## tag - suffix equals to specified tag's value
## tags - suffix equals to specified tags' values
## interleaved with key_separator
## Suffix equals to measurement name to topic
# [outputs.kafka.topic_suffix]
# method = "measurement"
## Suffix equals to measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tag"
# key = "foo"
## Suffix equals to measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# key_separator = "_"
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -57,10 +84,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]`
* `topic`: The `kafka` topic to publish to.


### Optional parameters:

* `routing_tag`: if this tag exists, its value will be used as the routing key
* `routing_tag`: If this tag exists, its value will be used as the routing key
* `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression
* `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster.
* `max_retry`: Max number of times to retry failed write
Expand All @@ -69,3 +95,5 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
* `topic_suffix`: Which, if any, method of calculating `kafka` topic suffix to use.
For examples, please refer to sample configuration.
173 changes: 131 additions & 42 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"crypto/tls"
"fmt"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -12,54 +13,107 @@ import (
"github.com/Shopify/sarama"
)

type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer
const (
TOPIC_SUFFIX_METHOD_EMPTY uint8 = iota
TOPIC_SUFFIX_METHOD_MEASUREMENT
TOPIC_SUFFIX_METHOD_TAG
TOPIC_SUFFIX_METHOD_TAGS
)

var TopicSuffixMethodStringToUID = map[string]uint8{
"": TOPIC_SUFFIX_METHOD_EMPTY,
"measurement": TOPIC_SUFFIX_METHOD_MEASUREMENT,
"tag": TOPIC_SUFFIX_METHOD_TAG,
"tags": TOPIC_SUFFIX_METHOD_TAGS,
}

type (
Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer

topicSuffixMethodUID uint8
}
TopicSuffix struct {
Method string `toml:"method"`
Key string `toml:"key"`
Keys []string `toml:"keys"`
KeySeparator string `toml:"key_separator"`
}
)

var sampleConfig = `
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"
## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to measurement's name
## tag - suffix equals to specified tag's value
## tags - suffix equals to specified tags' values
## interleaved with key_separator
## Suffix equals to measurement name to topic
# [outputs.kafka.topic_suffix]
# method = "measurement"
## Suffix equals to measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tag"
# key = "foo"
## Suffix equals to measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# key_separator = "_"
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -108,11 +162,44 @@ var sampleConfig = `
data_format = "influx"
`

func GetTopicSuffixMethodUID(method string) (uint8, error) {
methodUID, ok := TopicSuffixMethodStringToUID[method]
if !ok {
return 0, fmt.Errorf("Unkown topic suffix method provided: %s", method)
}
return methodUID, nil
}

func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
var topicName string
switch k.topicSuffixMethodUID {
case TOPIC_SUFFIX_METHOD_MEASUREMENT:
topicName = k.Topic + metric.Name()
case TOPIC_SUFFIX_METHOD_TAG:
topicName = k.Topic + metric.Tags()[k.TopicSuffix.Key]
case TOPIC_SUFFIX_METHOD_TAGS:
var tags_values []string
for _, tag := range k.TopicSuffix.Keys {
tags_values = append(tags_values, metric.Tags()[tag])
}
topicName = k.Topic + strings.Join(tags_values, k.TopicSuffix.KeySeparator)
default:
topicName = k.Topic
}
return topicName
}

func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}

func (k *Kafka) Connect() error {
topicSuffixMethod, err := GetTopicSuffixMethodUID(k.TopicSuffix.Method)
if err != nil {
return err
}
k.topicSuffixMethodUID = topicSuffixMethod

config := sarama.NewConfig()

config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
Expand Down Expand Up @@ -175,8 +262,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
return err
}

topicName := k.GetTopicName(metric)

m := &sarama.ProducerMessage{
Topic: k.Topic,
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.Tags()[k.RoutingTag]; ok {
Expand Down
52 changes: 52 additions & 0 deletions plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/stretchr/testify/require"
)

type topicSuffixTestpair struct {
topicSuffix TopicSuffix
expectedTopic string
}

func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
Expand All @@ -28,4 +33,51 @@ func TestConnectAndWrite(t *testing.T) {
// Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockMetrics())
require.NoError(t, err)
k.Close()
}

func TestTopicSuffixes(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

brokers := []string{testutil.GetLocalHost() + ":9092"}
s, _ := serializers.NewInfluxSerializer()

topic := "Test_"

metric := testutil.TestMetric(1)
metricTagName := "tag1"
metricTagValue := metric.Tags()[metricTagName]
metricName := metric.Name()

var testcases = []topicSuffixTestpair{
{TopicSuffix{Method: "measurement"},
topic + metricName},
{TopicSuffix{Method: "tag", Key: metricTagName},
topic + metricTagValue},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, KeySeparator: "___"},
topic + metricTagValue + "___" + metricTagValue + "___" + metricTagValue},
// This ensures backward compatibility
{TopicSuffix{},
topic},
}

for _, testcase := range testcases {
topicSuffix := testcase.topicSuffix
expectedTopic := testcase.expectedTopic
k := &Kafka{
Brokers: brokers,
Topic: topic,
serializer: s,
TopicSuffix: topicSuffix,
}

err := k.Connect()
require.NoError(t, err)

topic := k.GetTopicName(metric)
require.Equal(t, expectedTopic, topic)
k.Close()
}
}

0 comments on commit 491b55f

Please sign in to comment.