Skip to content

Commit

Permalink
kafkaexporter: Add support for AWS_MSK_IAM SASL Auth (#5763)
Browse files Browse the repository at this point in the history
This allows for developers to use this the existing kafka exporter and use the newly minted AWS_MSK_IAM SASL auth.

**Link to tracking Issue:** #5009
In a very loose definition of related ticket.

**Testing:**
I have some rather basic testing locally to see what I can get done with this.
I had followed https://github.com/aws/aws-msk-iam-auth#details as close as I could with this.

**Documentation:**
I haven't added any new documentation for this since I hadn't have the chance to actually validate this in a production like setting so I am hoping to leave it as a dark feature for the time being.
  • Loading branch information
MovieStoreGuy authored Oct 26, 2021
1 parent f812bfd commit 67e165e
Show file tree
Hide file tree
Showing 11 changed files with 378 additions and 2 deletions.
22 changes: 20 additions & 2 deletions exporter/kafkaexporter/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/config/configtls"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/awsmsk"
)

// Authentication defines authentication.
Expand All @@ -43,8 +45,19 @@ type SASLConfig struct {
Username string `mapstructure:"username"`
// Password to be used on authentication
Password string `mapstructure:"password"`
// SASL Mechanism to be used, possible values are: (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512).
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, SCRAM-SHA-256 or SCRAM-SHA-512).
Mechanism string `mapstructure:"mechanism"`

AWSMSK AWSMSKConfig `mapstructure:"aws_msk"`
}

// AWSMSKConfig defines the additional SASL authentication
// measures needed to use AWS_MSK_IAM mechanism
type AWSMSKConfig struct {
// Region is the AWS region the MSK cluster is based in
Region string `mapstructure:"region"`
// BrokerAddr is the client is connecting to in order to perform the auth required
BrokerAddr string `mapstructure:"broker_addr"`
}

// KerberosConfig defines kereros configuration.
Expand Down Expand Up @@ -109,8 +122,13 @@ func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
case "PLAIN":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
case "AWS_MSK_IAM":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return awsmsk.NewIAMSASLClient(config.AWSMSK.BrokerAddr, config.AWSMSK.Region, saramaConfig.ClientID)
}
saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism
default:
return fmt.Errorf("invalid SASL Mechanism %q: can be either \"PLAIN\" , \"SCRAM-SHA-256\" or \"SCRAM-SHA-512\"", config.Mechanism)
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.17

require (
github.com/Shopify/sarama v1.30.0
github.com/aws/aws-sdk-go v1.41.2
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger v1.27.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.37.1
Expand Down Expand Up @@ -32,6 +33,7 @@ require (
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/knadh/koanf v1.2.4 // indirect
github.com/magiconair/properties v1.8.5 // indirect
Expand Down
4 changes: 4 additions & 0 deletions exporter/kafkaexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions exporter/kafkaexporter/internal/awsmsk/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package msk implements the required IAM auth used by AWS' managed Kafka platform
// to be used with the Surama kafka producer.
//
// Further details on how the SASL connector works can be viewed here:
// https://github.com/aws/aws-msk-iam-auth#details
package awsmsk
192 changes: 192 additions & 0 deletions exporter/kafkaexporter/internal/awsmsk/iam_scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package awsmsk

import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/aws/aws-sdk-go/aws/credentials"
sign "github.com/aws/aws-sdk-go/aws/signer/v4"
"go.uber.org/multierr"
)

const (
Mechanism = "AWS_MSK_IAM"

service = "kafka-cluster"
supportedVersion = "2020_10_22"
scopeFormat = `%s/%s/%s/kafka-cluster/aws4_request`
)

const (
_ int32 = iota // Ignoring the zero value to ensure we start up correctly
initMessage
serverResponse
complete
failed
)

var (
ErrFailedServerChallenge = errors.New("failed server challenge")
ErrBadChallenge = errors.New("invalid challenge data provided")
ErrInvalidStateReached = errors.New("invalid state reached")
)

type IAMSASLClient struct {
MSKHostname string
Region string
UserAgent string

signer *sign.StreamSigner

state int32
accessKey string
secretKey string
}

type payload struct {
Version string `json:"version"`
BrokerHost string `json:"host"`
UserAgent string `json:"user-agent"`
Action string `json:"action"`
Algorithm string `json:"x-amz-algorithm"`
Credentials string `json:"x-amz-credential"`
Date string `json:"x-amz-date"`
Expires string `json:"x-amz-expires"`
SignedHeaders string `json:"x-amz-signedheaders"`
Signature string `json:"x-amz-signature"`
}

type response struct {
Version string `json:"version"`
RequestID string `json:"request-id"`
}

var _ sarama.SCRAMClient = (*IAMSASLClient)(nil)

func NewIAMSASLClient(MSKHostname, region, useragent string) sarama.SCRAMClient {
return &IAMSASLClient{
MSKHostname: MSKHostname,
Region: region,
UserAgent: useragent,
}
}

func (sc *IAMSASLClient) Begin(username, password, _ string) error {
if sc.MSKHostname == "" {
return errors.New("missing required MSK Broker hostname")
}

if sc.Region == "" {
return errors.New("missing MSK cluster region")
}

if sc.UserAgent == "" {
return errors.New("missing value for MSK user agent")
}

sc.signer = sign.NewStreamSigner(
sc.Region,
service,
nil,
credentials.NewChainCredentials([]credentials.Provider{
&credentials.EnvProvider{},
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: username,
SecretAccessKey: password,
},
},
}),
)
sc.accessKey = username
sc.secretKey = password
sc.state = initMessage
return nil
}

func (sc *IAMSASLClient) Step(challenge string) (string, error) {
var resp string

switch sc.state {
case initMessage:
if challenge != "" {
sc.state = failed
return "", fmt.Errorf("challenge must be empty for initial request: %w", ErrBadChallenge)
}
payload, err := sc.getAuthPayload()
if err != nil {
sc.state = failed
return "", err
}
resp = string(payload)
sc.state = serverResponse
case serverResponse:
if challenge == "" {
sc.state = failed
return "", fmt.Errorf("challenge must not be empty for server resposne: %w", ErrBadChallenge)
}

var resp response
if err := json.NewDecoder(strings.NewReader(challenge)).Decode(&resp); err != nil {
sc.state = failed
return "", fmt.Errorf("unable to process msk challenge response: %w", multierr.Combine(err, ErrFailedServerChallenge))
}

if resp.Version != supportedVersion {
sc.state = failed
return "", fmt.Errorf("unknown version found in response: %w", ErrFailedServerChallenge)
}

sc.state = complete
default:
return "", fmt.Errorf("invalid invocation: %w", ErrInvalidStateReached)
}

return resp, nil
}

func (sc *IAMSASLClient) Done() bool { return sc.state == complete }

func (sc *IAMSASLClient) getAuthPayload() ([]byte, error) {
ts := time.Now().UTC()

headers := []byte("host:" + sc.MSKHostname)

sig, err := sc.signer.GetSignature(headers, nil, ts)
if err != nil {
return nil, err
}

// Creating a timestamp in the form of: yyyyMMdd'T'HHmmss'Z'
date := ts.Format("20060102T150405Z")

return json.Marshal(&payload{
Version: supportedVersion,
BrokerHost: sc.MSKHostname,
UserAgent: sc.UserAgent,
Action: "kafka-cluster:Connect",
Algorithm: "AWS4-HMAC-SHA256",
Credentials: fmt.Sprintf(scopeFormat, sc.accessKey, date[:8], sc.Region),
Date: date,
SignedHeaders: "host",
Expires: "300", // Seconds => 5 Minutes
Signature: string(sig),
})
}
Loading

0 comments on commit 67e165e

Please sign in to comment.