Skip to content

Commit

Permalink
[kafka/internal, kafkaexporter, kafkareceiver] Add SASL mechanism "AW…
Browse files Browse the repository at this point in the history
…S_MSK_IAM_OAUTHBEARER" to kafkaexporter (open-telemetry#32500)

**Description:** 
This PR added the SASL mechanism "AWS_MSK_IAM_OAUTHBEARER" to
kafkaexporter and kafkareceiver. This mechanism use the AWS MSK IAM SASL
Signer for Go https://github.com/aws/aws-msk-iam-sasl-signer-go. This
mechanism is added because the "AWS_MSK_IAM" is not working in our
cluster and also in this
[issue](open-telemetry#19747).
We added an new mechanism instead of replace the existing one because we
want to keep the backward compatibility just in case someone is using
"AWS_MSK_IAM".

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:** 

[19747](open-telemetry#19747)

**Testing:** 
We built the images and tested the SASL mechanism in our team.
We added related unit tests.

**Documentation:** 
We updated the kafkaexporter and kafakreciever README on the SASL
mechanism.

---------

Co-authored-by: Sean Marciniak <30928402+MovieStoreGuy@users.noreply.github.com>
  • Loading branch information
2 people authored and ZenoCC-Peng committed Dec 6, 2024
1 parent 4e91998 commit cd0e067
Show file tree
Hide file tree
Showing 18 changed files with 251 additions and 30 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-exporter-aws-iam-oauth-bearer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter, kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a new mechanism "AWS_MSK_IAM_OAUTHBEARER" for kafka exporter and kafka receiver. This mechanism use the AWS MSK IAM SASL Signer for Go https://github.com/aws/aws-msk-iam-sasl-signer-go.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [19747]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ The following settings can be optionally configured:
- `sasl`
- `username`: The username to use.
- `password`: The password to use
- `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN)
- `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN)
- `version` (default = 0): The SASL protocol version to use (0 or 1)
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism
- `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ func validateSASLConfig(c *kafka.SASLConfig) error {
}

switch c.Mechanism {
case "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256", "SCRAM-SHA-512":
case "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512":
// Do nothing, valid mechanism
default:
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
}

if c.Version < 0 || c.Version > 1 {
Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestValidate_sasl_mechanism(t *testing.T) {
}

err := config.Validate()
assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE")
assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE")
}

func TestValidate_sasl_version(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,20 @@ require (

require (
github.com/apache/thrift v0.21.0 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
Expand Down
27 changes: 27 additions & 0 deletions exporter/kafkaexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *kafkaTracesProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) error {
func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[ptrace.Marshaler](
host,
Expand All @@ -83,7 +83,7 @@ func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) erro
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
producer, err := newSaramaProducer(ctx, e.cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (e *kafkaMetricsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) error {
func (e *kafkaMetricsProducer) start(ctx context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[pmetric.Marshaler](
host,
Expand All @@ -141,7 +141,7 @@ func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) err
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
producer, err := newSaramaProducer(ctx, e.cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (e *kafkaLogsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error {
func (e *kafkaLogsProducer) start(ctx context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[plog.Marshaler](
host,
Expand All @@ -199,15 +199,15 @@ func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
producer, err := newSaramaProducer(ctx, e.cfg)
if err != nil {
return err
}
e.producer = producer
return nil
}

func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
func newSaramaProducer(ctx context.Context, config Config) (sarama.SyncProducer, error) {
c := sarama.NewConfig()

c.ClientID = config.ClientID
Expand Down Expand Up @@ -236,7 +236,7 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
c.Version = version
}

if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil {
if err := kafka.ConfigureAuthentication(ctx, config.Authentication, c); err != nil {
return nil, err
}

Expand Down
34 changes: 26 additions & 8 deletions internal/kafka/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"context"
"crypto/sha256"
"crypto/sha512"
"crypto/tls"
"fmt"

"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
"go.opentelemetry.io/collector/config/configtls"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/awsmsk"
Expand All @@ -35,7 +37,7 @@ type SASLConfig struct {
Username string `mapstructure:"username"`
// Password to be used on authentication
Password string `mapstructure:"password"`
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, SCRAM-SHA-256 or SCRAM-SHA-512).
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512).
Mechanism string `mapstructure:"mechanism"`
// SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0.
Version int `mapstructure:"version"`
Expand All @@ -44,12 +46,21 @@ type SASLConfig struct {
}

// AWSMSKConfig defines the additional SASL authentication
// measures needed to use AWS_MSK_IAM mechanism
// measures needed to use AWS_MSK_IAM and AWS_MSK_IAM_OAUTHBEARER mechanism
type AWSMSKConfig struct {
// Region is the AWS region the MSK cluster is based in
Region string `mapstructure:"region"`
// BrokerAddr is the client is connecting to in order to perform the auth required
BrokerAddr string `mapstructure:"broker_addr"`
// Context
ctx context.Context
}

// Token return the AWS session token for the AWS_MSK_IAM_OAUTHBEARER mechanism
func (c *AWSMSKConfig) Token() (*sarama.AccessToken, error) {
token, _, err := signer.GenerateAuthToken(c.ctx, c.Region)

return &sarama.AccessToken{Token: token}, err
}

// KerberosConfig defines kerberos configuration.
Expand All @@ -65,7 +76,7 @@ type KerberosConfig struct {
}

// ConfigureAuthentication configures authentication in sarama.Config.
func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error {
func ConfigureAuthentication(ctx context.Context, config Authentication, saramaConfig *sarama.Config) error {
if config.PlainText != nil {
configurePlaintext(*config.PlainText, saramaConfig)
}
Expand All @@ -75,7 +86,7 @@ func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config)
}
}
if config.SASL != nil {
if err := configureSASL(*config.SASL, saramaConfig); err != nil {
if err := configureSASL(ctx, *config.SASL, saramaConfig); err != nil {
return err
}
}
Expand All @@ -92,12 +103,12 @@ func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Password = config.Password
}

func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
if config.Username == "" {
func configureSASL(ctx context.Context, config SASLConfig, saramaConfig *sarama.Config) error {
if config.Username == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
return fmt.Errorf("username have to be provided")
}

if config.Password == "" {
if config.Password == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
return fmt.Errorf("password have to be provided")
}

Expand All @@ -119,8 +130,15 @@ func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
return awsmsk.NewIAMSASLClient(config.AWSMSK.BrokerAddr, config.AWSMSK.Region, saramaConfig.ClientID)
}
saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism
case "AWS_MSK_IAM_OAUTHBEARER":
config.AWSMSK.ctx = ctx
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaConfig.Net.SASL.TokenProvider = &config.AWSMSK
tlsConfig := tls.Config{}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = &tlsConfig
default:
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
}

switch config.Version {
Expand Down
17 changes: 16 additions & 1 deletion internal/kafka/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kafka

import (
"context"
"crypto/tls"
"testing"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -51,6 +52,16 @@ func TestAuthentication(t *testing.T) {
require.NoError(t, err)
saramaTLSCfg.Net.TLS.Config = tlscfg

ctx := context.Background()
saramaSASLAWSIAMOATUHConfig := &sarama.Config{}
saramaSASLAWSIAMOATUHConfig.Net.SASL.Enable = true
saramaSASLAWSIAMOATUHConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaSASLAWSIAMOATUHConfig.Net.SASL.TokenProvider = &AWSMSKConfig{Region: "region", ctx: ctx}

tlsConfig := tls.Config{}
saramaSASLAWSIAMOATUHConfig.Net.TLS.Enable = true
saramaSASLAWSIAMOATUHConfig.Net.TLS.Config = &tlsConfig

saramaKerberosCfg := &sarama.Config{}
saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaKerberosCfg.Net.SASL.Enable = true
Expand Down Expand Up @@ -129,6 +140,10 @@ func TestAuthentication(t *testing.T) {
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "PLAIN"}},
saramaConfig: saramaSASLPLAINConfig,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "", Password: "", Mechanism: "AWS_MSK_IAM_OAUTHBEARER", AWSMSK: AWSMSKConfig{Region: "region"}}},
saramaConfig: saramaSASLAWSIAMOATUHConfig,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-222"}},
saramaConfig: saramaSASLSCRAM512Config,
Expand All @@ -153,7 +168,7 @@ func TestAuthentication(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
config := &sarama.Config{}
err := ConfigureAuthentication(test.auth, config)
err := ConfigureAuthentication(context.Background(), test.auth, config)
if test.err != "" {
assert.ErrorContains(t, err, test.err)
} else {
Expand Down
13 changes: 13 additions & 0 deletions internal/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/IBM/sarama v1.43.3
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0
github.com/aws/aws-sdk-go v1.55.5
github.com/stretchr/testify v1.10.0
github.com/xdg-go/scram v1.1.2
Expand All @@ -13,6 +14,18 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
Expand Down
Loading

0 comments on commit cd0e067

Please sign in to comment.