Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "mechanism" in output.kafka to support SCRAM-SHA-512 and SCRAM-SHA-256 #12867

Merged
merged 8 commits into from
Mar 2, 2020
60 changes: 60 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package kafka

import (
"crypto/sha256"
"crypto/sha512"
"errors"
"fmt"
"hash"
"strings"
"time"

Expand All @@ -34,8 +37,36 @@ import (
"github.com/elastic/beats/libbeat/monitoring/adapter"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var SHA256 should have comment or be unexported

var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var SHA512 should have comment or be unexported


type XDGSCRAMClient struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type XDGSCRAMClient should have comment or be unexported

*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method XDGSCRAMClient.Begin should have comment or be unexported

x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method XDGSCRAMClient.Step should have comment or be unexported

response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method XDGSCRAMClient.Done should have comment or be unexported

return x.ClientConversation.Done()
}

type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Expand All @@ -58,6 +89,7 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Mechanism string `config:"mechanism"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this calls for an enum like type (over string):

type saslMechanism string

const (
  saslTypePlaintext = sarama.SASLTypePlaintext
  ...
)

func (m *saslMechanism) Unpack(in string) error {
  in = strings.ToUpper(in)  // try not to force users to use all upper case
  switch in {
  case saslTypePlaintext, ...:
    *m = saslMechanism(in)
  default:
    return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", in)
  }
  return nil   
}

The unpack method will be called by (*Config).Unpack. If it fails the full config name and the config file the setting comes from will be reported.

There is another 'string' comparison in order to fill in the sarama SASL settings. Having magic strings in different places is a good way to introduce Bugs (future developer might have typo when adding another mechanism). Better use an enum and use contanstants (compiler will complain if there is a typo). By having our own type we can also do this (optional):

func (m saslMechanism) configureSarama(... <other input>?, config *sarama.Config) error {
  switch m {
  case saslScramSHA256:
    ...

  case sasl<type>:
    ...

  default:
    panic(<we shouldn't get here, assuming no developer messed up>)
  }
}

}

type metaConfig struct {
Expand All @@ -80,6 +112,12 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

var mechanismModes = map[string]sarama.SASLMechanism{
"PLAIN": sarama.SASLTypePlaintext,
"SCRAM-SHA-512": sarama.SASLTypeSCRAMSHA512,
"SCRAM-SHA-256": sarama.SASLTypeSCRAMSHA256,
}

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -107,6 +145,7 @@ func defaultConfig() kafkaConfig {
ChanBufferSize: 256,
Username: "",
Password: "",
Mechanism: "PLAIN",
}
}

Expand Down Expand Up @@ -135,6 +174,10 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("password must be set when username is configured")
}

if _, ok := mechanismModes[c.Mechanism]; !ok {
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.Mechanism)
}

if c.Compression == "gzip" {
lvl := c.CompressionLevel
if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) {
Expand Down Expand Up @@ -166,6 +209,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
if err != nil {
return nil, err
}

if tls != nil {
k.Net.TLS.Enable = true
k.Net.TLS.Config = tls.BuildModuleConfig("")
Expand All @@ -177,6 +221,22 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Net.SASL.Password = config.Password
}

mechanism := config.Mechanism

// SCRAM-SHA-512 mechanism
if mechanism == "SCRAM-SHA-512" {
k.Net.SASL.Handshake = true
k.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
k.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
}

// SCRAM-SHA-256 mechanism
if mechanism == "SCRAM-SHA-256" {
k.Net.SASL.Handshake = true
k.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
k.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
}

// configure metadata update properties
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
Expand Down