diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 2ee4b2181ef..e0cca75718c 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -58,6 +58,13 @@ type kafkaConfig struct { Username string `config:"username"` Password string `config:"password"` Codec codec.Config `config:"codec"` + Sasl saslConfig `config:"sasl"` +} + +type saslConfig struct { + SaslMechanism string `config:"mechanism"` + //SaslUsername string `config:"username"` //maybe use ssl.username ssl.password instead in future? + //SaslPassword string `config:"password"` } type metaConfig struct { @@ -83,6 +90,12 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } +const ( + saslTypePlaintext = sarama.SASLTypePlaintext + saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256 + saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512 +) + func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, @@ -113,6 +126,32 @@ func defaultConfig() kafkaConfig { } } +func (c *saslConfig) configureSarama(config *sarama.Config) error { + switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case + case "": + // SASL is not enabled + return nil + case saslTypePlaintext: + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) + case saslTypeSCRAMSHA256: + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA256} + } + case saslTypeSCRAMSHA512: + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA512} + } + default: + return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism) + } + + return nil +} + func readConfig(cfg *common.Config) (*kafkaConfig, error) { c := defaultConfig() if err := cfg.Unpack(&c); err != nil { @@ -144,7 +183,6 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("compression_level must be between 0 and 9") } } - return nil } @@ -169,6 +207,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { if err != nil { return nil, err } + if tls != nil { k.Net.TLS.Enable = true k.Net.TLS.Config = tls.BuildModuleConfig("") @@ -178,6 +217,11 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Net.SASL.Enable = true k.Net.SASL.User = config.Username k.Net.SASL.Password = config.Password + err = config.Sasl.configureSarama(k) + + if err != nil { + return nil, err + } } // configure metadata update properties diff --git a/libbeat/outputs/kafka/scram.go b/libbeat/outputs/kafka/scram.go new file mode 100644 index 00000000000..3f70959ffed --- /dev/null +++ b/libbeat/outputs/kafka/scram.go @@ -0,0 +1,54 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go +package kafka + +import ( + "crypto/sha256" + "crypto/sha512" + "hash" + + "github.com/xdg/scram" +) + +var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +}