diff --git a/Godeps b/Godeps index 6cbe9efa72c19..56e57693af1c9 100644 --- a/Godeps +++ b/Godeps @@ -50,6 +50,7 @@ github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee +github.com/bsm/sarama-cluster 5d8c11085c875b3155870da9ba6be706429a95dc github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096 github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 diff --git a/Makefile b/Makefile index d2bad656d0abc..36160567d4399 100644 --- a/Makefile +++ b/Makefile @@ -46,11 +46,14 @@ prepare-windows: # Run all docker containers necessary for unit tests docker-run: docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 + docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper docker run --name kafka \ - -e ADVERTISED_HOST=localhost \ - -e ADVERTISED_PORT=9092 \ - -p "2181:2181" -p "9092:9092" \ - -d spotify/kafka + --link zookeeper:zookeeper \ + -e KAFKA_ADVERTISED_HOST_NAME=localhost \ + -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ + -e KAFKA_CREATE_TOPICS="test:1:1" \ + -p "9092:9092" \ + -d wurstmeister/kafka docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql docker run --name memcached -p "11211:11211" -d memcached @@ -65,11 +68,14 @@ docker-run: # Run docker containers necessary for CircleCI unit tests docker-run-circle: docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 + docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper docker run --name kafka \ - -e ADVERTISED_HOST=localhost \ - -e ADVERTISED_PORT=9092 \ - -p "2181:2181" -p "9092:9092" \ - -d spotify/kafka + --link zookeeper:zookeeper \ + -e KAFKA_ADVERTISED_HOST_NAME=localhost \ + -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ + -e KAFKA_CREATE_TOPICS="test:1:1" \ + -p "9092:9092" \ + -d wurstmeister/kafka docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index f4176edd38b7e..537afbe31be27 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -6,19 +6,34 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/Shopify/sarama" - "github.com/wvanbergen/kafka/consumergroup" + cluster "github.com/bsm/sarama-cluster" ) type Kafka struct { - ConsumerGroup string - Topics []string - ZookeeperPeers []string - ZookeeperChroot string - Consumer *consumergroup.ConsumerGroup + ConsumerGroup string + Topics []string + Brokers []string + + Cluster *cluster.Consumer + + // Verify Kafka SSL Certificate + InsecureSkipVerify bool + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + + // SASL Username + SASLUsername string `toml:"sasl_username"` + // SASL Password + SASLPassword string `toml:"sasl_password"` // Legacy metric buffer support MetricBuffer int @@ -45,12 +60,22 @@ type Kafka struct { } var sampleConfig = ` + ## kafka servers + brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] - ## an array of Zookeeper connection strings - zookeeper_peers = ["localhost:2181"] - ## Zookeeper Chroot - zookeeper_chroot = "" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Optional SASL Config + # sasl_username = "kafka" + # sasl_password = "secret" + ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -78,45 +103,67 @@ func (k *Kafka) SetParser(parser parsers.Parser) { func (k *Kafka) Start(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() - var consumerErr error + var clusterErr error k.acc = acc - config := consumergroup.NewConfig() - config.Zookeeper.Chroot = k.ZookeeperChroot + config := cluster.NewConfig() + config.Consumer.Return.Errors = true + + tlsConfig, err := internal.GetTLSConfig( + k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify) + if err != nil { + return err + } + + if tlsConfig != nil { + log.Printf("D! TLS Enabled") + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + if k.SASLUsername != "" && k.SASLPassword != "" { + log.Printf("D! Using SASL auth with username '%s',", + k.SASLUsername) + config.Net.SASL.User = k.SASLUsername + config.Net.SASL.Password = k.SASLPassword + config.Net.SASL.Enable = true + } + switch strings.ToLower(k.Offset) { case "oldest", "": - config.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetOldest case "newest": - config.Offsets.Initial = sarama.OffsetNewest + config.Consumer.Offsets.Initial = sarama.OffsetNewest default: log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", k.Offset) - config.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetOldest } - if k.Consumer == nil || k.Consumer.Closed() { - k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + if k.Cluster == nil { + k.Cluster, clusterErr = cluster.NewConsumer( + k.Brokers, k.ConsumerGroup, k.Topics, - k.ZookeeperPeers, config, ) - if consumerErr != nil { - return consumerErr + + if clusterErr != nil { + log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n", + k.Brokers, k.Topics) + return clusterErr } // Setup message and error channels - k.in = k.Consumer.Messages() - k.errs = k.Consumer.Errors() + k.in = k.Cluster.Messages() + k.errs = k.Cluster.Errors() } k.done = make(chan struct{}) - // Start the kafka message reader go k.receiver() - log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n", - k.ZookeeperPeers, k.Topics) + log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n", + k.Brokers, k.Topics) return nil } @@ -146,7 +193,7 @@ func (k *Kafka) receiver() { // TODO(cam) this locking can be removed if this PR gets merged: // https://github.com/wvanbergen/kafka/pull/84 k.Lock() - k.Consumer.CommitUpto(msg) + k.Cluster.MarkOffset(msg, "") k.Unlock() } } @@ -157,7 +204,7 @@ func (k *Kafka) Stop() { k.Lock() defer k.Unlock() close(k.done) - if err := k.Consumer.Close(); err != nil { + if err := k.Cluster.Close(); err != nil { log.Printf("E! Error closing kafka consumer: %s\n", err.Error()) } } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index c1c93e7cb4155..8d55ec3de46a0 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -19,7 +19,6 @@ func TestReadsMetricsFromKafka(t *testing.T) { } brokerPeers := []string{testutil.GetLocalHost() + ":9092"} - zkPeers := []string{testutil.GetLocalHost() + ":2181"} testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()) // Send a Kafka message to the kafka host @@ -36,11 +35,11 @@ func TestReadsMetricsFromKafka(t *testing.T) { // Start the Kafka Consumer k := &Kafka{ - ConsumerGroup: "telegraf_test_consumers", - Topics: []string{testTopic}, - ZookeeperPeers: zkPeers, - PointBuffer: 100000, - Offset: "oldest", + ConsumerGroup: "telegraf_test_consumers", + Topics: []string{testTopic}, + Brokers: brokerPeers, + PointBuffer: 100000, + Offset: "oldest", } p, _ := parsers.NewInfluxParser() k.SetParser(p) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index c4936974f239f..f5830225ddd2c 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -23,7 +23,7 @@ func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { k := Kafka{ ConsumerGroup: "test", Topics: []string{"telegraf"}, - ZookeeperPeers: []string{"localhost:2181"}, + Brokers: []string{"localhost:9092"}, Offset: "oldest", in: in, doNotCommitMsgs: true,