Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka 0.9+ consumer support #2487

Merged
merged 1 commit into from
Jun 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Release Notes
### Features

- [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support
- [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin
- [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted
- [#2817](https://github.com/influxdata/telegraf/pull/2817): Added timeout option to IPMI sensor plugin
Expand Down
3 changes: 2 additions & 1 deletion Godeps
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/Shopify/sarama 574d3147eee384229bf96a5d12c207fe7b5234f3
github.com/Shopify/sarama c01858abb625b73a3af51d0798e4ad42c8147093
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
Expand Down Expand Up @@ -52,6 +52,7 @@ github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987
github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
github.com/bsm/sarama-cluster ccdc0803695fbce22f1706d04ded46cd518fd832
github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096
github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
Expand Down
28 changes: 18 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ prepare-windows:
# Run all docker containers necessary for unit tests
docker-run:
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
docker run --name kafka \
-e ADVERTISED_HOST=localhost \
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
--link zookeeper:zookeeper \
-e KAFKA_ADVERTISED_HOST_NAME=localhost \
-e KAFKA_ADVERTISED_PORT=9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CREATE_TOPICS="test:1:1" \
-p "9092:9092" \
-d wurstmeister/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql
docker run --name memcached -p "11211:11211" -d memcached
Expand All @@ -65,11 +69,15 @@ docker-run:
# Run docker containers necessary for CircleCI unit tests
docker-run-circle:
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
docker run --name kafka \
-e ADVERTISED_HOST=localhost \
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
--link zookeeper:zookeeper \
-e KAFKA_ADVERTISED_HOST_NAME=localhost \
-e KAFKA_ADVERTISED_PORT=9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CREATE_TOPICS="test:1:1" \
-p "9092:9092" \
-d wurstmeister/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
Expand All @@ -78,8 +86,8 @@ docker-run-circle:

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch

# Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run
Expand Down
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ works:
- github.com/aws/aws-sdk-go [APACHE](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt)
- github.com/beorn7/perks [MIT](https://github.com/beorn7/perks/blob/master/LICENSE)
- github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE)
- github.com/bsm/sarama-cluster [MIT](https://github.com/bsm/sarama-cluster/blob/master/LICENSE)
- github.com/cenkalti/backoff [MIT](https://github.com/cenkalti/backoff/blob/master/LICENSE)
- github.com/couchbase/go-couchbase [MIT](https://github.com/couchbase/go-couchbase/blob/master/LICENSE)
- github.com/couchbase/gomemcached [MIT](https://github.com/couchbase/gomemcached/blob/master/LICENSE)
Expand Down
35 changes: 33 additions & 2 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2198,11 +2198,42 @@
# ## 0 means to use the default of 65536 bytes (64 kibibytes)
# max_line_size = 0


# # Read metrics from Kafka topic(s)
# # Read metrics from Kafka 0.9+ topic(s)
# [[inputs.kafka_consumer]]
# ## topic(s) to consume
# topics = ["telegraf"]
# ## kafka servers
# brokers = ["localhost:9092"]
# ## the name of the consumer group
# consumer_group = "telegraf_metrics_consumers"
# ## Offset (must be either "oldest" or "newest")
# offset = "oldest"
#
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## Optional SASL Config
# # sasl_username = "kafka"
# # sasl_password = "secret"
#
# ## Data format to consume.
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
#
# ## Maximum length of a message to consume, in bytes (default 0/unlimited);
# ## larger messages are dropped
# max_message_len = 65536

# # Read metrics from Kafka (0.8 or less) topic(s)
# [[inputs.kafka_consumer_legacy]]
# ## topic(s) to consume
# topics = ["telegraf"]
# ## an array of Zookeeper connection strings
# zookeeper_peers = ["localhost:2181"]
# ## Zookeeper Chroot
Expand Down
20 changes: 16 additions & 4 deletions internal/config/testdata/telegraf-agent.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,31 @@
[[inputs.diskio]]
# no configuration

# read metrics from a Kafka topic
# read metrics from a Kafka 0.9+ topic
[[inputs.kafka_consumer]]
# topic(s) to consume
## kafka brokers
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"

# read metrics from a Kafka legacy topic
[[inputs.kafka_consumer_legacy]]
## topic(s) to consume
topics = ["telegraf"]
# an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
# the name of the consumer group
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
# Maximum number of points to buffer between collection intervals
point_buffer = 100000
# Offset (must be either "oldest" or "newest")
## Offset (must be either "oldest" or "newest")
offset = "oldest"


# Read metrics from a LeoFS Server via SNMP
[[inputs.leofs]]
# An array of URI to gather stats about LeoFS.
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
Expand Down
19 changes: 15 additions & 4 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,33 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.

For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin
and use the old zookeeper connection method.

## Configuration

```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
## topic(s) to consume
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = ""
brokers = ["localhost:9092"]
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
105 changes: 76 additions & 29 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,35 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
cluster "github.com/bsm/sarama-cluster"
)

type Kafka struct {
ConsumerGroup string
Topics []string
MaxMessageLen int
ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup
ConsumerGroup string
Topics []string
Brokers []string
MaxMessageLen int

Cluster *cluster.Consumer

// Verify Kafka SSL Certificate
InsecureSkipVerify bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

// Legacy metric buffer support
MetricBuffer int
Expand All @@ -47,12 +62,22 @@ type Kafka struct {
}

var sampleConfig = `
## kafka servers
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = ""

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"

## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
Expand Down Expand Up @@ -84,45 +109,67 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
var clusterErr error

k.acc = acc

config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
config := cluster.NewConfig()
config.Consumer.Return.Errors = true

tlsConfig, err := internal.GetTLSConfig(
k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
if err != nil {
return err
}

if tlsConfig != nil {
log.Printf("D! TLS Enabled")
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
if k.SASLUsername != "" && k.SASLPassword != "" {
log.Printf("D! Using SASL auth with username '%s',",
k.SASLUsername)
config.Net.SASL.User = k.SASLUsername
config.Net.SASL.Password = k.SASLPassword
config.Net.SASL.Enable = true
}

switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
if k.Cluster == nil {
k.Cluster, clusterErr = cluster.NewConsumer(
k.Brokers,
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr

if clusterErr != nil {
log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n",
k.Brokers, k.Topics)
return clusterErr
}

// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
k.in = k.Cluster.Messages()
k.errs = k.Cluster.Errors()
}

k.done = make(chan struct{})

// Start the kafka message reader
go k.receiver()
log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n",
k.Brokers, k.Topics)
return nil
}

Expand Down Expand Up @@ -156,7 +203,7 @@ func (k *Kafka) receiver() {
// TODO(cam) this locking can be removed if this PR gets merged:
// https://github.com/wvanbergen/kafka/pull/84
k.Lock()
k.Consumer.CommitUpto(msg)
k.Cluster.MarkOffset(msg, "")
k.Unlock()
}
}
Expand All @@ -167,7 +214,7 @@ func (k *Kafka) Stop() {
k.Lock()
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
if err := k.Cluster.Close(); err != nil {
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
}
}
Expand Down
Loading