Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #12867 to 7.x: Add "mechanism" in output.kafka to support SCRAM-SHA-512 and SCRAM-SHA-256 #23110

Merged
merged 5 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow embedding of CAs, Certificate of private keys for anything that support TLS in ouputs and inputs https://github.com/elastic/beats/pull/21179
- API address is a required setting in `add_cloudfoundry_metadata`. {pull}21759[21759]
- Update to ECS 1.7.0. {pull}22571[22571]
- Add support for SCRAM-SHA-512 and SCRAM-SHA-256 in Kafka output. {pull}12867[12867]

*Auditbeat*

Expand Down
1,241 changes: 618 additions & 623 deletions NOTICE.txt

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Auditbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ Kafka rebalance settings:
*`retry_backoff`*:: How long to wait after an unsuccessful rebalance attempt.
Defaults to 2s.

===== `kerberos`

beta[]

Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Filebeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ require (
github.com/ugorji/go/codec v1.1.8
github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
go.elastic.co/apm v1.8.1-0.20200909061013-2aef45b9cf4b
go.elastic.co/apm/module/apmelasticsearch v1.7.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,9 @@ github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 h1:NeNpIvfvaFOh0BH7
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/xanzy/go-gitlab v0.22.3 h1:/rNlZ2hquUWNc6rJdntVM03tEOoTmnZ1lcNyJCl0WlU=
github.com/xanzy/go-gitlab v0.22.3/go.mod h1:t4Bmvnxj7k37S4Y17lfLx+nLqkf/oQwT2HagfWKv5Og=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Heartbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Journalbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions libbeat/_meta/config/output-kafka.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version {{.BeatName | title}} is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
54 changes: 50 additions & 4 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl saslConfig `config:"sasl"`
}

type saslConfig struct {
SaslMechanism string `config:"mechanism"`
}

type metaConfig struct {
Expand All @@ -90,6 +95,12 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -125,6 +136,36 @@ func defaultConfig() kafkaConfig {
}
}

func (c *saslConfig) configureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
Expand Down Expand Up @@ -156,7 +197,6 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}

return nil
}

Expand All @@ -181,12 +221,14 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
if err != nil {
return nil, err
}

if tls != nil {
k.Net.TLS.Enable = true
k.Net.TLS.Config = tls.BuildModuleConfig("")
}

if config.Kerberos.IsEnabled() {
switch {
case config.Kerberos.IsEnabled():
cfgwarn.Beta("Kerberos authentication for Kafka is beta.")

k.Net.SASL.Enable = true
Expand All @@ -200,12 +242,16 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
Password: config.Kerberos.Password,
Realm: config.Kerberos.Realm,
}
}

if config.Username != "" {
case config.Username != "":
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
err = config.Sasl.configureSarama(k)

if err != nil {
return nil, err
}
}

// configure metadata update properties
Expand Down
34 changes: 32 additions & 2 deletions libbeat/outputs/kafka/docs/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
<titleabbrev>Kafka</titleabbrev>
++++

The Kafka output sends the events to Apache Kafka.
The Kafka output sends events to Apache Kafka.

To use this output, edit the {beatname_uc} configuration file to disable the {es}
output by commenting it out, and enable the Kafka output by uncommenting the
Kafka section.

Example configuration:

Expand Down Expand Up @@ -62,12 +66,29 @@ See <<kafka-compatibility>> for information on supported versions.
===== `username`

The username for connecting to Kafka. If username is configured, the password
must be configured as well. Only SASL/PLAIN is supported.
must be configured as well.

===== `password`

The password for connecting to Kafka.

===== `sasl.mechanism`

beta[]

The SASL mechanism to use when connecting to Kafka. It can be one of:

* `PLAIN` for SASL/PLAIN.
* `SCRAM-SHA-256` for SCRAM-SHA-256.
* `SCRAM-SHA-512` for SCRAM-SHA-512.

If `sasl.mechanism` is not set, `PLAIN` is used if `username` and `password`
are provided. Otherwise, SASL authentication is disabled.

To use `GSSAPI` mechanism to authenticate with Kerberos, you must leave this
field empty, and use the <<kerberos-option-kafka>> options.


[[topic-option-kafka]]
===== `topic`

Expand Down Expand Up @@ -277,3 +298,12 @@ Configuration options for SSL parameters like the root CA for Kafka connections.
`-keyalg RSA` argument to ensure it uses a cipher supported by
https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions#why-cant-sarama-connect-to-my-kafka-cluster-using-ssl[Filebeat's Kafka library].
See <<configuration-ssl>> for more information.

[[kerberos-option-kafka]]
===== `kerberos`

beta[]

Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.
54 changes: 54 additions & 0 deletions libbeat/outputs/kafka/scram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go
package kafka

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Metricbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Packetbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Winlogbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Auditbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3403,6 +3403,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Filebeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Heartbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
4 changes: 4 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,10 @@ output.elasticsearch:
#username: ''
#password: ''

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Kafka version Metricbeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'

Expand Down
Loading