Skip to content

Commit

Permalink
feat: add support for SCRAM authentication for kafka metricbeat module,
Browse files Browse the repository at this point in the history
  • Loading branch information
fholzer committed Mar 29, 2021
1 parent 8184a91 commit 0bc5d21
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 36 deletions.
46 changes: 46 additions & 0 deletions libbeat/common/kafka/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kafka

import (
"fmt"
"strings"

"github.com/Shopify/sarama"
)

type SaslConfig struct {
SaslMechanism string `config:"mechanism"`
//SaslUsername string `config:"username"` //maybe use ssl.username ssl.password instead in future?
//SaslPassword string `config:"password"`
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func (c *SaslConfig) ConfigureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}
File renamed without changes.
38 changes: 2 additions & 36 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,10 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl saslConfig `config:"sasl"`
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
}

type saslConfig struct {
SaslMechanism string `config:"mechanism"`
}

type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
Expand Down Expand Up @@ -140,36 +136,6 @@ func defaultConfig() kafkaConfig {
}
}

func (c *saslConfig) configureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
Expand Down Expand Up @@ -252,7 +218,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
err = config.Sasl.configureSarama(k)
err = config.Sasl.ConfigureSarama(k)

if err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ type BrokerSettings struct {
TLS *tls.Config
Username, Password string
Version kafka.Version
Sasl kafka.SaslConfig
}

//const (
// saslTypePlaintext = sarama.SASLTypePlaintext
// saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
// saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
//)

type GroupDescription struct {
Members map[string]MemberDescription
}
Expand Down Expand Up @@ -91,6 +98,11 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
err := settings.Sasl.ConfigureSarama(cfg)

if err != nil {
return nil
}
}
cfg.Version, _ = settings.Version.Get()

Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/kafka"
)

type metricsetConfig struct {
Expand All @@ -31,6 +32,7 @@ type metricsetConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ClientID string `config:"client_id"`
Sasl kafka.SaslConfig `config:"sasl"`
}

var defaultConfig = metricsetConfig{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewMetricSet(base mb.BaseMetricSet, options MetricSetOptions) (*MetricSet,
Username: config.Username,
Password: config.Password,
Version: Version(options.Version),
Sasl: config.Sasl,
}

return &MetricSet{
Expand Down

0 comments on commit 0bc5d21

Please sign in to comment.