Skip to content

Commit

Permalink
Cherry-pick #12867 to 7.x: Add "mechanism" in output.kafka to support…
Browse files Browse the repository at this point in the history
… SCRAM-SHA-512 and SCRAM-SHA-256 (#23110)

Add "mechanism" in output.kafka to support  SCRAM-SHA-512 and  SCRAM-SHA-256.

(cherry picked from commit e935b26)
(cherry picked from commit 87ff5c0)

Co-authored-by: zvictorino <zvictorino@gmail.com>
  • Loading branch information
jsoriano and zvictorino authored Dec 14, 2020
1 parent 42c5abf commit 35f1bd1
Show file tree
Hide file tree
Showing 22 changed files with 822 additions and 629 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow embedding of CAs, Certificate of private keys for anything that support TLS in ouputs and inputs https://github.com/elastic/beats/pull/21179
- API address is a required setting in `add_cloudfoundry_metadata`. {pull}21759[21759]
- Update to ECS 1.7.0. {pull}22571[22571]
- Add support for SCRAM-SHA-512 and SCRAM-SHA-256 in Kafka output. {pull}12867[12867]

*Auditbeat*

Expand Down
1,241 changes: 618 additions & 623 deletions NOTICE.txt

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Auditbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ Kafka rebalance settings:
*`retry_backoff`*:: How long to wait after an unsuccessful rebalance attempt.
Defaults to 2s.

===== `kerberos`

beta[]

Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Filebeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ require (
github.com/ugorji/go/codec v1.1.8
github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
go.elastic.co/apm v1.8.1-0.20200909061013-2aef45b9cf4b
go.elastic.co/apm/module/apmelasticsearch v1.7.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,9 @@ github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 h1:NeNpIvfvaFOh0BH7
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/xanzy/go-gitlab v0.22.3 h1:/rNlZ2hquUWNc6rJdntVM03tEOoTmnZ1lcNyJCl0WlU=
github.com/xanzy/go-gitlab v0.22.3/go.mod h1:t4Bmvnxj7k37S4Y17lfLx+nLqkf/oQwT2HagfWKv5Og=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Heartbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Journalbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions libbeat/_meta/config/output-kafka.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version {{.BeatName | title}} is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
54 changes: 50 additions & 4 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl saslConfig `config:"sasl"`
}

type saslConfig struct {
SaslMechanism string `config:"mechanism"`
}

type metaConfig struct {
Expand All @@ -90,6 +95,12 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -125,6 +136,36 @@ func defaultConfig() kafkaConfig {
}
}

func (c *saslConfig) configureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
Expand Down Expand Up @@ -156,7 +197,6 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}

return nil
}

Expand All @@ -181,12 +221,14 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
if err != nil {
return nil, err
}

if tls != nil {
k.Net.TLS.Enable = true
k.Net.TLS.Config = tls.BuildModuleConfig("")
}

if config.Kerberos.IsEnabled() {
switch {
case config.Kerberos.IsEnabled():
cfgwarn.Beta("Kerberos authentication for Kafka is beta.")

k.Net.SASL.Enable = true
Expand All @@ -200,12 +242,16 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
Password: config.Kerberos.Password,
Realm: config.Kerberos.Realm,
}
}

if config.Username != "" {
case config.Username != "":
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
err = config.Sasl.configureSarama(k)

if err != nil {
return nil, err
}
}

// configure metadata update properties
Expand Down
34 changes: 32 additions & 2 deletions libbeat/outputs/kafka/docs/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
<titleabbrev>Kafka</titleabbrev>
++++

The Kafka output sends the events to Apache Kafka.
The Kafka output sends events to Apache Kafka.

To use this output, edit the {beatname_uc} configuration file to disable the {es}
output by commenting it out, and enable the Kafka output by uncommenting the
Kafka section.

Example configuration:

Expand Down Expand Up @@ -62,12 +66,29 @@ See <<kafka-compatibility>> for information on supported versions.
===== `username`

The username for connecting to Kafka. If username is configured, the password
must be configured as well. Only SASL/PLAIN is supported.
must be configured as well.

===== `password`

The password for connecting to Kafka.

===== `sasl.mechanism`

beta[]

The SASL mechanism to use when connecting to Kafka. It can be one of:

* `PLAIN` for SASL/PLAIN.
* `SCRAM-SHA-256` for SCRAM-SHA-256.
* `SCRAM-SHA-512` for SCRAM-SHA-512.

If `sasl.mechanism` is not set, `PLAIN` is used if `username` and `password`
are provided. Otherwise, SASL authentication is disabled.

To use `GSSAPI` mechanism to authenticate with Kerberos, you must leave this
field empty, and use the <<kerberos-option-kafka>> options.


[[topic-option-kafka]]
===== `topic`

Expand Down Expand Up @@ -277,3 +298,12 @@ Configuration options for SSL parameters like the root CA for Kafka connections.
`-keyalg RSA` argument to ensure it uses a cipher supported by
https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions#why-cant-sarama-connect-to-my-kafka-cluster-using-ssl[Filebeat's Kafka library].
See <<configuration-ssl>> for more information.

[[kerberos-option-kafka]]
===== `kerberos`

beta[]

Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.
54 changes: 54 additions & 0 deletions libbeat/outputs/kafka/scram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go
package kafka

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Metricbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Packetbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Winlogbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Auditbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3405,6 +3405,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Filebeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Heartbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Metricbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
Loading

0 comments on commit 35f1bd1

Please sign in to comment.