Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for SCRAM authentication for kafka metricbeat module, fixes #19648 #24810

Merged
merged 13 commits into from
Apr 1, 2021
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Move IIS module to GA and map fields. {issue}22609[22609] {pull}23024[23024]
- Apache: convert status.total_kbytes to status.total_bytes in fleet mode. {pull}23022[23022]
- Release MSSQL as GA {pull}23146[23146]
- Add support for SASL/SCRAM authentication to the Kafka module. {pull}24810[24810]

*Packetbeat*

Expand Down
69 changes: 69 additions & 0 deletions libbeat/common/kafka/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka
fholzer marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"strings"

"github.com/Shopify/sarama"
)

type SaslConfig struct {
SaslMechanism string `config:"mechanism"`
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func (c *SaslConfig) ConfigureSarama(config *sarama.Config) {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
// This should never happen because `SaslMechanism` is checked on `Validate()`, keeping a panic to detect it earlier if it happens.
panic(fmt.Sprintf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism))
fholzer marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (c *SaslConfig) Validate() error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "", saslTypePlaintext, saslTypeSCRAMSHA256, saslTypeSCRAMSHA512:
default:
return fmt.Errorf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}
return nil
}
File renamed without changes.
42 changes: 2 additions & 40 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,10 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl saslConfig `config:"sasl"`
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
}

type saslConfig struct {
SaslMechanism string `config:"mechanism"`
}

type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
Expand Down Expand Up @@ -140,36 +136,6 @@ func defaultConfig() kafkaConfig {
}
}

func (c *saslConfig) configureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
Expand Down Expand Up @@ -252,11 +218,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
err = config.Sasl.configureSarama(k)

if err != nil {
return nil, err
}
config.Sasl.ConfigureSarama(k)
}

// configure metadata update properties
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ metricbeat.modules:
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ metricbeat.modules:
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/kafka/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BrokerSettings struct {
TLS *tls.Config
Username, Password string
Version kafka.Version
Sasl kafka.SaslConfig
}

type GroupDescription struct {
Expand Down Expand Up @@ -91,6 +92,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
settings.Sasl.ConfigureSarama(cfg)
}
cfg.Version, _ = settings.Version.Get()

Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

Expand All @@ -31,6 +32,7 @@ type metricsetConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ClientID string `config:"client_id"`
Sasl kafka.SaslConfig `config:"sasl"`
}

var defaultConfig = metricsetConfig{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewMetricSet(base mb.BaseMetricSet, options MetricSetOptions) (*MetricSet,
Username: config.Username,
Password: config.Password,
Version: Version(options.Version),
Sasl: config.Sasl,
}

return &MetricSet{
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/modules.d/kafka.yml.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
4 changes: 4 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,10 @@ metricbeat.modules:
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down