Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable kafka zstd compression and idempotent writes #8435

Merged
merged 12 commits into from
Nov 23, 2020
94 changes: 94 additions & 0 deletions plugins/common/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package kafka

import (
"log"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf/plugins/common/tls"
)

// ReadConfig for kafka clients meaning to read from Kafka.
type ReadConfig struct {
Config
}

// SetConfig on the sarama.Config object from the ReadConfig struct.
func (k *ReadConfig) SetConfig(config *sarama.Config) error {
config.Consumer.Return.Errors = true

return k.Config.SetConfig(config)
}

// WriteConfig for kafka clients meaning to write to kafka
type WriteConfig struct {
Config

RequiredAcks int `toml:"required_acks"`
MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"`
IdempotentWrites bool `toml:"idempotent_writes"`
}

// SetConfig on the sarama.Config object from the WriteConfig struct.
func (k *WriteConfig) SetConfig(config *sarama.Config) error {
config.Producer.Return.Successes = true
config.Producer.Idempotent = k.IdempotentWrites
config.Producer.Retry.Max = k.MaxRetry
if k.MaxMessageBytes > 0 {
config.Producer.MaxMessageBytes = k.MaxMessageBytes
}
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
return k.Config.SetConfig(config)
}

// Config common to all Kafka clients.
type Config struct {
SASLAuth
tls.ClientConfig

Version string `toml:"version"`
ClientID string `toml:"client_id"`
CompressionCodec int `toml:"compression_codec"`

// EnableTLS deprecated
EnableTLS *bool `toml:"enable_tls"`
}

// SetConfig on the sarama.Config object from the Config struct.
func (k *Config) SetConfig(config *sarama.Config) error {
if k.EnableTLS != nil {
log.Printf("W! [kafka] enable_tls is deprecated, and the setting does nothing, you can safely remove it from the config")
}
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}

config.Version = version
}

if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}

config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)

tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}

if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

if err := k.SetSASLConfig(config); err != nil {
return err
}

return nil
}
10 changes: 9 additions & 1 deletion plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ and use the old zookeeper connection method.
# insecure_skip_verify = false

## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled using the "enable_tls" option.
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"

Expand All @@ -62,6 +62,14 @@ and use the old zookeeper connection method.
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"

## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"

Expand Down
58 changes: 12 additions & 46 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
Expand All @@ -36,15 +35,14 @@ const sampleConfig = `
# version = ""

## Optional TLS Config
# enable_tls = true
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled using the "enable_tls" option.
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"

Expand All @@ -71,6 +69,15 @@ const sampleConfig = `
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"

## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0

## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"

Expand Down Expand Up @@ -110,20 +117,15 @@ type semaphore chan empty

type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
ClientID string `toml:"client_id"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
Version string `toml:"version"`

kafka.SASLAuth

EnableTLS *bool `toml:"enable_tls"`
tls.ClientConfig
kafka.ReadConfig

Log telegraf.Logger `toml:"-"`

Expand Down Expand Up @@ -173,50 +175,14 @@ func (k *KafkaConsumer) Init() error {
}

config := sarama.NewConfig()
config.Consumer.Return.Errors = true

// Kafka version 0.10.2.0 is required for consumer groups.
config.Version = sarama.V0_10_2_0

if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}

config.Version = version
}

if k.EnableTLS != nil && *k.EnableTLS {
config.Net.TLS.Enable = true
}

tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}

if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig

// To maintain backwards compatibility, if the enable_tls option is not
// set TLS is enabled if a non-default TLS config is used.
if k.EnableTLS == nil {
k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS")
ssoroka marked this conversation as resolved.
Show resolved Hide resolved
config.Net.TLS.Enable = true
}
}

if err := k.SetSASLConfig(config); err != nil {
if err := k.SetConfig(config); err != nil {
return err
}

if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}

switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Consumer.Offsets.Initial = sarama.OffsetOldest
Expand Down
54 changes: 32 additions & 22 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -68,8 +69,12 @@ func TestInit(t *testing.T) {
{
name: "parses valid version string",
plugin: &KafkaConsumer{
Version: "1.0.0",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
Version: "1.0.0",
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, plugin.config.Version, sarama.V1_0_0_0)
Expand All @@ -78,16 +83,24 @@ func TestInit(t *testing.T) {
{
name: "invalid version string",
plugin: &KafkaConsumer{
Version: "100",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
Version: "100",
},
},
Log: testutil.Logger{},
},
initError: true,
},
{
name: "custom client_id",
plugin: &KafkaConsumer{
ClientID: "custom",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientID: "custom",
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, plugin.config.ClientID, "custom")
Expand Down Expand Up @@ -123,8 +136,12 @@ func TestInit(t *testing.T) {
{
name: "default tls with a tls config",
plugin: &KafkaConsumer{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
},
},
},
Log: testutil.Logger{},
},
Expand All @@ -133,24 +150,17 @@ func TestInit(t *testing.T) {
},
},
{
name: "disable tls",
name: "Insecure tls",
plugin: &KafkaConsumer{
EnableTLS: func() *bool { v := false; return &v }(),
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
},
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.False(t, plugin.config.Net.TLS.Enable)
},
},
{
name: "enable tls",
plugin: &KafkaConsumer{
EnableTLS: func() *bool { v := true; return &v }(),
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.True(t, plugin.config.Net.TLS.Enable)
},
Expand Down
17 changes: 11 additions & 6 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,18 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## routing_key = "telegraf"
# routing_key = ""

## CompressionCodec represents the various compression codecs recognized by
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression
# compression_codec = 0
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0

## Idempotent Writes
## If enabled, exactly one copy of each message is written.
# idempotent_writes = false

## RequiredAcks is used in Produce Requests to tell the broker how many
## replica acknowledgements it must see before responding
Expand Down
Loading