Skip to content

Commit

Permalink
Add topic tag options to kafka output (influxdata#7142)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Mar 10, 2020
1 parent 9734910 commit fbf15a6
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 15 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"log"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -83,6 +85,7 @@ const (
defaultMaxUndeliveredMessages = 1000
defaultMaxMessageLen = 1000000
defaultConsumerGroup = "telegraf_metrics_consumers"
reconnectDelay = 5 * time.Second
)

type empty struct{}
Expand Down Expand Up @@ -259,6 +262,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
err := k.consumer.Consume(ctx, k.Topics, handler)
if err != nil {
acc.AddError(err)
internal.SleepContext(ctx, reconnectDelay)
}
}
err = k.consumer.Close()
Expand Down
7 changes: 7 additions & 0 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## Kafka topic for producer messages
topic = "telegraf"

## The value of this tag will be used as the topic. If not set the 'topic'
## option is used.
# topic_tag = ""

## If true, the 'topic_tag' will be removed from to the metric.
# exclude_topic_tag = false

## Optional Client id
# client_id = "Telegraf"

Expand Down
57 changes: 43 additions & 14 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ var zeroTime = time.Unix(0, 0)

type (
Kafka struct {
Brokers []string
Topic string
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
ClientID string `toml:"client_id"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
CompressionCodec int
RequiredAcks int
MaxRetry int
MaxMessageBytes int `toml:"max_message_bytes"`
CompressionCodec int `toml:"compression_codec"`
RequiredAcks int `toml:"required_acks"`
MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"`

Version string `toml:"version"`

Expand All @@ -57,7 +59,9 @@ type (
Log telegraf.Logger `toml:"-"`

tlsConfig tls.Config
producer sarama.SyncProducer

producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
producer sarama.SyncProducer

serializer serializers.Serializer
}
Expand Down Expand Up @@ -94,6 +98,13 @@ var sampleConfig = `
## Kafka topic for producer messages
topic = "telegraf"
## The value of this tag will be used as the topic. If not set the 'topic'
## option is used.
# topic_tag = ""
## If true, the 'topic_tag' will be removed from to the metric.
# exclude_topic_tag = false
## Optional Client id
# client_id = "Telegraf"
Expand Down Expand Up @@ -212,14 +223,29 @@ func ValidateTopicSuffixMethod(method string) error {
return fmt.Errorf("Unknown topic suffix method provided: %s", method)
}

func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) {
topic := k.Topic
if k.TopicTag != "" {
if t, ok := metric.GetTag(k.TopicTag); ok {
topic = t

// If excluding the topic tag, a copy is required to avoid modifying
// the metric buffer.
if k.ExcludeTopicTag {
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(k.TopicTag)
}
}
}

var topicName string
switch k.TopicSuffix.Method {
case "measurement":
topicName = k.Topic + k.TopicSuffix.Separator + metric.Name()
topicName = topic + k.TopicSuffix.Separator + metric.Name()
case "tags":
var topicNameComponents []string
topicNameComponents = append(topicNameComponents, k.Topic)
topicNameComponents = append(topicNameComponents, topic)
for _, tag := range k.TopicSuffix.Keys {
tagValue := metric.Tags()[tag]
if tagValue != "" {
Expand All @@ -228,9 +254,9 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
}
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
default:
topicName = k.Topic
topicName = topic
}
return topicName
return metric, topicName
}

func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
Expand Down Expand Up @@ -306,7 +332,7 @@ func (k *Kafka) Connect() error {
config.Net.SASL.Version = version
}

producer, err := sarama.NewSyncProducer(k.Brokers, config)
producer, err := k.producerFunc(k.Brokers, config)
if err != nil {
return err
}
Expand Down Expand Up @@ -348,14 +374,16 @@ func (k *Kafka) routingKey(metric telegraf.Metric) (string, error) {
func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for _, metric := range metrics {
metric, topic := k.GetTopicName(metric)

buf, err := k.serializer.Serialize(metric)
if err != nil {
k.Log.Debugf("Could not serialize metric: %v", err)
continue
}

m := &sarama.ProducerMessage{
Topic: k.GetTopicName(metric),
Topic: topic,
Value: sarama.ByteEncoder(buf),
}

Expand Down Expand Up @@ -403,6 +431,7 @@ func init() {
return &Kafka{
MaxRetry: 3,
RequiredAcks: -1,
producerFunc: sarama.NewSyncProducer,
}
})
}
146 changes: 145 additions & 1 deletion plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestTopicSuffixes(t *testing.T) {
TopicSuffix: topicSuffix,
}

topic := k.GetTopicName(metric)
_, topic := k.GetTopicName(metric)
require.Equal(t, expectedTopic, topic)
}
}
Expand Down Expand Up @@ -156,3 +157,146 @@ func TestRoutingKey(t *testing.T) {
})
}
}

type MockProducer struct {
sent []*sarama.ProducerMessage
}

func (p *MockProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
p.sent = append(p.sent, msg)
return 0, 0, nil
}

func (p *MockProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
p.sent = append(p.sent, msgs...)
return nil
}

func (p *MockProducer) Close() error {
return nil
}

func NewMockProducer(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) {
return &MockProducer{}, nil
}

func TestTopicTag(t *testing.T) {
tests := []struct {
name string
plugin *Kafka
input []telegraf.Metric
topic string
value string
}{
{
name: "static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "telegraf",
value: "cpu time_idle=42 0\n",
},
{
name: "topic tag overrides static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic": "xyzzy",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "xyzzy",
value: "cpu,topic=xyzzy time_idle=42 0\n",
},
{
name: "missing topic tag falls back to static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "telegraf",
value: "cpu time_idle=42 0\n",
},
{
name: "exclude topic tag removes tag",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
ExcludeTopicTag: true,
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic": "xyzzy",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "xyzzy",
value: "cpu time_idle=42 0\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
tt.plugin.SetSerializer(s)

err = tt.plugin.Connect()
require.NoError(t, err)

producer := &MockProducer{}
tt.plugin.producer = producer

err = tt.plugin.Write(tt.input)
require.NoError(t, err)

require.Equal(t, tt.topic, producer.sent[0].Topic)

encoded, err := producer.sent[0].Value.Encode()
require.NoError(t, err)
require.Equal(t, tt.value, string(encoded))
})
}
}

0 comments on commit fbf15a6

Please sign in to comment.