Skip to content

Commit

Permalink
Add "mechanism" in output.kafka to support SCRAM-SHA-512 and SCRAM-SH…
Browse files Browse the repository at this point in the history
…A-256 (elastic#12867)
  • Loading branch information
zvictorino authored Mar 2, 2020
1 parent 755227d commit e935b26
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
46 changes: 45 additions & 1 deletion libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl saslConfig `config:"sasl"`
}

type saslConfig struct {
SaslMechanism string `config:"mechanism"`
//SaslUsername string `config:"username"` //maybe use ssl.username ssl.password instead in future?
//SaslPassword string `config:"password"`
}

type metaConfig struct {
Expand All @@ -83,6 +90,12 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -113,6 +126,32 @@ func defaultConfig() kafkaConfig {
}
}

func (c *saslConfig) configureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
Expand Down Expand Up @@ -144,7 +183,6 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}

return nil
}

Expand All @@ -169,6 +207,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
if err != nil {
return nil, err
}

if tls != nil {
k.Net.TLS.Enable = true
k.Net.TLS.Config = tls.BuildModuleConfig("")
Expand All @@ -178,6 +217,11 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
err = config.Sasl.configureSarama(k)

if err != nil {
return nil, err
}
}

// configure metadata update properties
Expand Down
54 changes: 54 additions & 0 deletions libbeat/outputs/kafka/scram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go
package kafka

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

0 comments on commit e935b26

Please sign in to comment.