From e824fd0f26baeb988c12b45cff36812f603e9a54 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 30 May 2023 22:10:42 +0800 Subject: [PATCH] pkg/sink(ticdc): add oauth support for sarama Kafka sink (#8938) (#9095) close pingcap/tiflow#8865 Signed-off-by: hi-rustin Fix Signed-off-by: hi-rustin Fix tidy Signed-off-by: hi-rustin Update docs Signed-off-by: hi-rustin --- cdc/sink/mq/mq.go | 2 +- cdc/sink/mq/producer/kafka/config.go | 90 ++++++++- cdc/sink/mq/producer/kafka/config_test.go | 179 +++++++++++++++--- .../producer/kafka/oauth2_token_provider.go | 92 +++++++++ .../kafka/oauth2_token_provider_test.go | 74 ++++++++ docs/swagger/docs.go | 32 ++++ docs/swagger/swagger.json | 32 ++++ docs/swagger/swagger.yaml | 21 ++ go.mod | 14 +- go.sum | 27 +-- pkg/config/sink.go | 13 ++ pkg/security/sasl.go | 49 ++++- 12 files changed, 568 insertions(+), 57 deletions(-) create mode 100644 cdc/sink/mq/producer/kafka/oauth2_token_provider.go create mode 100644 cdc/sink/mq/producer/kafka/oauth2_token_provider_test.go diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index 1826b29c6a5..b58f2233545 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -408,7 +408,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, } baseConfig := kafka.NewConfig() - if err := baseConfig.Apply(sinkURI); err != nil { + if err := baseConfig.Apply(sinkURI, replicaConfig); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } diff --git a/cdc/sink/mq/producer/kafka/config.go b/cdc/sink/mq/producer/kafka/config.go index 6a0c2dcc6a3..423866f07d1 100644 --- a/cdc/sink/mq/producer/kafka/config.go +++ b/cdc/sink/mq/producer/kafka/config.go @@ -16,6 +16,7 @@ package kafka import ( "context" "crypto/tls" + "encoding/base64" "net/url" "strconv" "strings" @@ -101,8 +102,8 @@ func (c *Config) setPartitionNum(realPartitionCount int32) error { return nil } -// Apply the sinkURI to update Config -func (c *Config) Apply(sinkURI *url.URL) error { +// Apply the configuration to the sarama producer. +func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) error { c.BrokerEndpoints = strings.Split(sinkURI.Host, ",") params := sinkURI.Query() s := params.Get("partition-num") @@ -183,7 +184,7 @@ func (c *Config) Apply(sinkURI *url.URL) error { c.ReadTimeout = a } - err := c.applySASL(params) + err := c.applySASL(params, replicaConfig) if err != nil { return err } @@ -245,7 +246,7 @@ func (c *Config) applyTLS(params url.Values) error { return nil } -func (c *Config) applySASL(params url.Values) error { +func (c *Config) applySASL(params url.Values, replicaConfig *config.ReplicaConfig) error { s := params.Get("sasl-user") if s != "" { c.SASL.SASLUser = s @@ -263,6 +264,12 @@ func (c *Config) applySASL(params url.Values) error { return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } c.SASL.SASLMechanism = mechanism + } else if replicaConfig != nil && replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.SASLMechanism != nil { + mechanism, err := security.SASLMechanismFromString(*replicaConfig.Sink.KafkaConfig.SASLMechanism) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + c.SASL.SASLMechanism = mechanism } s = params.Get("sasl-gssapi-auth-type") @@ -313,6 +320,67 @@ func (c *Config) applySASL(params url.Values) error { c.SASL.GSSAPI.DisablePAFXFAST = disablePAFXFAST } + if replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil { + if replicaConfig.Sink.KafkaConfig.SASLOAuthClientID != nil { + clientID := *replicaConfig.Sink.KafkaConfig.SASLOAuthClientID + if clientID == "" { + return cerror.ErrKafkaInvalidConfig.GenWithStack("OAuth2 client ID cannot be empty") + } + c.SASL.OAuth2.ClientID = clientID + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthClientSecret != nil { + clientSecret := *replicaConfig.Sink.KafkaConfig.SASLOAuthClientSecret + if clientSecret == "" { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 client secret cannot be empty") + } + + // BASE64 decode the client secret + decodedClientSecret, err := base64.StdEncoding.DecodeString(clientSecret) + if err != nil { + log.Error("OAuth2 client secret is not base64 encoded", zap.Error(err)) + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 client secret is not base64 encoded") + } + c.SASL.OAuth2.ClientSecret = string(decodedClientSecret) + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthTokenURL != nil { + tokenURL := *replicaConfig.Sink.KafkaConfig.SASLOAuthTokenURL + if tokenURL == "" { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 token URL cannot be empty") + } + c.SASL.OAuth2.TokenURL = tokenURL + } + + if c.SASL.OAuth2.IsEnable() { + if c.SASL.SASLMechanism != security.OAuthMechanism { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "OAuth2 is only supported with SASL mechanism type OAUTHBEARER, but got %s", + c.SASL.SASLMechanism) + } + + if err := c.SASL.OAuth2.Validate(); err != nil { + return cerror.ErrKafkaInvalidConfig.Wrap(err) + } + c.SASL.OAuth2.SetDefault() + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthScopes != nil { + c.SASL.OAuth2.Scopes = replicaConfig.Sink.KafkaConfig.SASLOAuthScopes + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthGrantType != nil { + c.SASL.OAuth2.GrantType = *replicaConfig.Sink.KafkaConfig.SASLOAuthGrantType + } + + if replicaConfig.Sink.KafkaConfig.SASLOAuthAudience != nil { + c.SASL.OAuth2.Audience = *replicaConfig.Sink.KafkaConfig.SASLOAuthAudience + } + } + return nil } @@ -432,12 +500,14 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { } } - completeSaramaSASLConfig(config, c) + if err := completeSaramaSASLConfig(ctx, config, c); err != nil { + return nil, errors.Trace(err) + } return config, err } -func completeSaramaSASLConfig(config *sarama.Config, c *Config) { +func completeSaramaSASLConfig(ctx context.Context, config *sarama.Config, c *Config) error { if c.SASL != nil && c.SASL.SASLMechanism != "" { config.Net.SASL.Enable = true config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SASL.SASLMechanism) @@ -467,6 +537,14 @@ func completeSaramaSASLConfig(config *sarama.Config, c *Config) { case security.KeyTabAuth: config.Net.SASL.GSSAPI.KeyTabPath = c.SASL.GSSAPI.KeyTabPath } + case sarama.SASLTypeOAuth: + p, err := newTokenProvider(ctx, c) + if err != nil { + return errors.Trace(err) + } + config.Net.SASL.TokenProvider = p } + } + return nil } diff --git a/cdc/sink/mq/producer/kafka/config_test.go b/cdc/sink/mq/producer/kafka/config_test.go index b725c46f61b..967a8ea2f5b 100644 --- a/cdc/sink/mq/producer/kafka/config_test.go +++ b/cdc/sink/mq/producer/kafka/config_test.go @@ -106,7 +106,7 @@ func TestConfigTimeouts(t *testing.T) { sinkURI, err := url.Parse(uri) require.Nil(t, err) - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, config.GetDefaultReplicaConfig()) require.Nil(t, err) require.Equal(t, 5*time.Second, cfg.DialTimeout) @@ -121,6 +121,8 @@ func TestConfigTimeouts(t *testing.T) { } func TestCompleteConfigByOpts(t *testing.T) { + replicaCfg := config.GetDefaultReplicaConfig() + cfg := NewConfig() // Normal config. @@ -132,7 +134,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err := url.Parse(uri) require.Nil(t, err) - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Nil(t, err) require.Equal(t, int32(1), cfg.PartitionNum) require.Equal(t, int16(3), cfg.ReplicationFactor) @@ -144,7 +146,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Nil(t, err) require.Len(t, cfg.BrokerEndpoints, 3) @@ -153,7 +155,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) // Illegal max-message-bytes. @@ -161,7 +163,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) // Illegal partition-num. @@ -169,7 +171,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) // Out of range partition-num. @@ -177,7 +179,7 @@ func TestCompleteConfigByOpts(t *testing.T) { sinkURI, err = url.Parse(uri) require.Nil(t, err) cfg = NewConfig() - err = cfg.Apply(sinkURI) + err = cfg.Apply(sinkURI, replicaCfg) require.Regexp(t, ".*invalid partition num.*", errors.Cause(err)) } @@ -390,7 +392,7 @@ func TestConfigurationCombinations(t *testing.T) { require.Nil(t, err) baseConfig := NewConfig() - err = baseConfig.Apply(sinkURI) + err = baseConfig.Apply(sinkURI, config.GetDefaultReplicaConfig()) require.Nil(t, err) saramaConfig, err := NewSaramaConfig(context.Background(), baseConfig) @@ -428,26 +430,30 @@ func TestApplySASL(t *testing.T) { t.Parallel() tests := []struct { - name string - URI string - exceptErr string + name string + URI string + replicaConfig func() *config.ReplicaConfig + exceptErr string }{ { - name: "no params", - URI: "kafka://127.0.0.1:9092/abc", - exceptErr: "", + name: "no params", + URI: "kafka://127.0.0.1:9092/abc", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid PLAIN SASL", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-user=user&sasl-password=password&sasl-mechanism=plain", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid SCRAM SASL", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-user=user&sasl-password=password&sasl-mechanism=SCRAM-SHA-512", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid GSSAPI user auth SASL", @@ -457,7 +463,8 @@ func TestApplySASL(t *testing.T) { "&sasl-gssapi-service-name=a&sasl-gssapi-user=user" + "&sasl-gssapi-password=pwd" + "&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "valid GSSAPI keytab auth SASL", @@ -467,19 +474,136 @@ func TestApplySASL(t *testing.T) { "&sasl-gssapi-service-name=a&sasl-gssapi-user=user" + "&sasl-gssapi-keytab-path=/root/keytab" + "&sasl-gssapi-realm=realm&sasl-gssapi-disable-pafxfast=false", - exceptErr: "", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "", }, { name: "invalid mechanism", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-mechanism=a", - exceptErr: "unknown a SASL mechanism", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "unknown a SASL mechanism", }, { name: "invalid GSSAPI auth type", URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + "&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keyta1b", - exceptErr: "unknown keyta1b auth type", + replicaConfig: config.GetDefaultReplicaConfig, + exceptErr: "unknown keyta1b auth type", + }, + { + name: "valid OAUTHBEARER SASL", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "", + }, + { + name: "invalid OAUTHBEARER SASL: missing client id", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 client id is empty", + }, + { + name: "invalid OAUTHBEARER SASL: missing client secret", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 client secret is empty", + }, + { + name: "invalid OAUTHBEARER SASL: missing token url", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + } + return cfg + }, + exceptErr: "OAuth2 token url is empty", + }, + { + name: "invalid OAUTHBEARER SASL: non base64 client secret", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=OAUTHBEARER", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "client_secret" + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 client secret is not base64 encoded", + }, + { + name: "invalid OAUTHBEARER SASL: wrong mechanism", + URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&partition-num=0&sasl-mechanism=GSSAPI", + replicaConfig: func() *config.ReplicaConfig { + cfg := config.GetDefaultReplicaConfig() + oauthMechanism := string(security.OAuthMechanism) + clientID := "client_id" + clientSecret := "Y2xpZW50X3NlY3JldA==" // base64(client_secret) + tokenURL := "127.0.0.1:9093/token" + cfg.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: &oauthMechanism, + SASLOAuthClientID: &clientID, + SASLOAuthClientSecret: &clientSecret, + SASLOAuthTokenURL: &tokenURL, + } + return cfg + }, + exceptErr: "OAuth2 is only supported with SASL mechanism type OAUTHBEARER", }, } @@ -490,9 +614,11 @@ func TestApplySASL(t *testing.T) { sinkURI, err := url.Parse(test.URI) require.Nil(t, err) if test.exceptErr == "" { - require.Nil(t, cfg.applySASL(sinkURI.Query())) + require.Nil(t, cfg.applySASL( + sinkURI.Query(), test.replicaConfig())) } else { - require.Regexp(t, test.exceptErr, cfg.applySASL(sinkURI.Query()).Error()) + require.Regexp(t, test.exceptErr, cfg.applySASL( + sinkURI.Query(), test.replicaConfig()).Error()) } }) } @@ -567,6 +693,7 @@ func TestApplyTLS(t *testing.T) { func TestCompleteSaramaSASLConfig(t *testing.T) { t.Parallel() + ctx := context.Background() // Test that SASL is turned on correctly. cfg := NewConfig() cfg.SASL = &security.SASL{ @@ -576,10 +703,10 @@ func TestCompleteSaramaSASLConfig(t *testing.T) { GSSAPI: security.GSSAPI{}, } saramaConfig := sarama.NewConfig() - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.False(t, saramaConfig.Net.SASL.Enable) cfg.SASL.SASLMechanism = "plain" - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.True(t, saramaConfig.Net.SASL.Enable) // Test that the SCRAMClientGeneratorFunc is set up correctly. cfg = NewConfig() @@ -590,9 +717,9 @@ func TestCompleteSaramaSASLConfig(t *testing.T) { GSSAPI: security.GSSAPI{}, } saramaConfig = sarama.NewConfig() - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.Nil(t, saramaConfig.Net.SASL.SCRAMClientGeneratorFunc) cfg.SASL.SASLMechanism = "SCRAM-SHA-512" - completeSaramaSASLConfig(saramaConfig, cfg) + completeSaramaSASLConfig(ctx, saramaConfig, cfg) require.NotNil(t, saramaConfig.Net.SASL.SCRAMClientGeneratorFunc) } diff --git a/cdc/sink/mq/producer/kafka/oauth2_token_provider.go b/cdc/sink/mq/producer/kafka/oauth2_token_provider.go new file mode 100644 index 00000000000..268d33bc3ba --- /dev/null +++ b/cdc/sink/mq/producer/kafka/oauth2_token_provider.go @@ -0,0 +1,92 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "net/url" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +// tsokenProvider is a user-defined callback for generating +// access tokens for SASL/OAUTHBEARER auth. +type tokenProvider struct { + tokenSource oauth2.TokenSource +} + +var _ sarama.AccessTokenProvider = (*tokenProvider)(nil) + +// Token implements the sarama.AccessTokenProvider interface. +// Token returns an access token. The implementation should ensure token +// reuse so that multiple calls at connect time do not create multiple +// tokens. The implementation should also periodically refresh the token in +// order to guarantee that each call returns an unexpired token. This +// method should not block indefinitely--a timeout error should be returned +// after a short period of inactivity so that the broker connection logic +// can log debugging information and retry. +func (t *tokenProvider) Token() (*sarama.AccessToken, error) { + token, err := t.tokenSource.Token() + if err != nil { + // Errors will result in Sarama retrying the broker connection and logging + // the transient error, with a Broker connection error surfacing after retry + // attempts have been exhausted. + return nil, err + } + + return &sarama.AccessToken{Token: token.AccessToken}, nil +} + +func newTokenProvider(ctx context.Context, + kafkaConfig *Config, +) (sarama.AccessTokenProvider, error) { + // grant_type is by default going to be set to 'client_credentials' by the + // clientcredentials library as defined by the spec, however non-compliant + // auth server implementations may want a custom type + var endpointParams url.Values + if kafkaConfig.SASL.OAuth2.GrantType != "" { + if endpointParams == nil { + endpointParams = url.Values{} + } + endpointParams.Set("grant_type", kafkaConfig.SASL.OAuth2.GrantType) + } + + // audience is an optional parameter that can be used to specify the + // intended audience of the token. + if kafkaConfig.SASL.OAuth2.Audience != "" { + if endpointParams == nil { + endpointParams = url.Values{} + } + endpointParams.Set("audience", kafkaConfig.SASL.OAuth2.Audience) + } + + tokenURL, err := url.Parse(kafkaConfig.SASL.OAuth2.TokenURL) + if err != nil { + return nil, errors.Trace(err) + } + + cfg := clientcredentials.Config{ + ClientID: kafkaConfig.SASL.OAuth2.ClientID, + ClientSecret: kafkaConfig.SASL.OAuth2.ClientSecret, + TokenURL: tokenURL.String(), + EndpointParams: endpointParams, + Scopes: kafkaConfig.SASL.OAuth2.Scopes, + } + return &tokenProvider{ + tokenSource: cfg.TokenSource(ctx), + }, nil +} diff --git a/cdc/sink/mq/producer/kafka/oauth2_token_provider_test.go b/cdc/sink/mq/producer/kafka/oauth2_token_provider_test.go new file mode 100644 index 00000000000..f986c22ba07 --- /dev/null +++ b/cdc/sink/mq/producer/kafka/oauth2_token_provider_test.go @@ -0,0 +1,74 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "testing" + + "github.com/pingcap/tiflow/pkg/security" + "github.com/stretchr/testify/require" +) + +func TestNewTokenProvider(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + config *Config + expectedErr string + }{ + { + name: "valid", + config: &Config{ + SASL: &security.SASL{ + OAuth2: security.OAuth2{ + ClientID: "client-id", + ClientSecret: "client-secret", + TokenURL: "http://localhost:8080/oauth2/token", + Scopes: []string{"scope1", "scope2"}, + GrantType: "client_credentials", + }, + }, + }, + }, + { + name: "invalid token URL", + config: &Config{ + SASL: &security.SASL{ + OAuth2: security.OAuth2{ + ClientID: "client-id", + ClientSecret: "client-secret", + TokenURL: "http://test.com/Segment%%2815197306101420000%29", + Scopes: []string{"scope1", "scope2"}, + GrantType: "client_credentials", + }, + }, + }, + expectedErr: "invalid URL escape", + }, + } { + ts := test + t.Run(ts.name, func(t *testing.T) { + t.Parallel() + _, err := newTokenProvider(context.TODO(), ts.config) + if ts.expectedErr == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), ts.expectedErr) + } + }) + } +} diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 8093ce171f1..add1b698ff1 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -741,6 +741,35 @@ var doc = `{ } } }, + "config.KafkaConfig": { + "type": "object", + "properties": { + "sasl-mechanism": { + "type": "string" + }, + "sasl-oauth-audience": { + "type": "string" + }, + "sasl-oauth-client-id": { + "type": "string" + }, + "sasl-oauth-client-secret": { + "type": "string" + }, + "sasl-oauth-grant-type": { + "type": "string" + }, + "sasl-oauth-scopes": { + "type": "array", + "items": { + "type": "string" + } + }, + "sasl-oauth-token-url": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -759,6 +788,9 @@ var doc = `{ "encoder-concurrency": { "type": "integer" }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index bbf8545643b..6643d9597db 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -722,6 +722,35 @@ } } }, + "config.KafkaConfig": { + "type": "object", + "properties": { + "sasl-mechanism": { + "type": "string" + }, + "sasl-oauth-audience": { + "type": "string" + }, + "sasl-oauth-client-id": { + "type": "string" + }, + "sasl-oauth-client-secret": { + "type": "string" + }, + "sasl-oauth-grant-type": { + "type": "string" + }, + "sasl-oauth-scopes": { + "type": "array", + "items": { + "type": "string" + } + }, + "sasl-oauth-token-url": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -740,6 +769,9 @@ "encoder-concurrency": { "type": "integer" }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 9dd4b9a4ebb..75425ae16af 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -27,6 +27,25 @@ definitions: topic: type: string type: object + config.KafkaConfig: + properties: + sasl-mechanism: + type: string + sasl-oauth-audience: + type: string + sasl-oauth-client-id: + type: string + sasl-oauth-client-secret: + type: string + sasl-oauth-grant-type: + type: string + sasl-oauth-scopes: + items: + type: string + type: array + sasl-oauth-token-url: + type: string + type: object config.SinkConfig: properties: column-selectors: @@ -39,6 +58,8 @@ definitions: type: array encoder-concurrency: type: integer + kafka-config: + $ref: '#/definitions/config.KafkaConfig' protocol: type: string schema-registry: diff --git a/go.mod b/go.mod index d2cd42873ae..fef5c38743c 100644 --- a/go.mod +++ b/go.mod @@ -88,12 +88,13 @@ require ( go.uber.org/goleak v1.1.12 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 - golang.org/x/net v0.0.0-20221014081412-f15817d10f9b + golang.org/x/net v0.2.0 + golang.org/x/oauth2 v0.2.0 golang.org/x/sync v0.1.0 - golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab + golang.org/x/sys v0.2.0 golang.org/x/text v0.4.0 golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 - golang.org/x/tools v0.1.12 // indirect + golang.org/x/tools v0.2.0 // indirect google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c google.golang.org/grpc v1.50.1 gopkg.in/yaml.v2 v2.4.0 @@ -265,10 +266,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect - golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect - golang.org/x/exp v0.0.0-20220516143420-24438e51023a // indirect - golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect - golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 // indirect + golang.org/x/crypto v0.1.0 // indirect + golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 // indirect + golang.org/x/term v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/api v0.103.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index b09506e50bf..8e497d6baed 100644 --- a/go.sum +++ b/go.sum @@ -1390,8 +1390,9 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= +golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1407,9 +1408,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 h1:rxKZ2gOnYxjfmakvUUqh9Gyb6KXfrj7JWTxORTYqb0E= golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= -golang.org/x/exp v0.0.0-20220516143420-24438e51023a h1:tiLLxEjKNE6Hrah/Dp/cyHvsyjDLcMFSocOHO5XDmOM= -golang.org/x/exp v0.0.0-20220516143420-24438e51023a/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1441,7 +1441,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1509,8 +1509,8 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= -golang.org/x/net v0.0.0-20221014081412-f15817d10f9b h1:tvrvnPFcdzp294diPnrdZZZ8XUt2Tyj7svb7X52iDuU= -golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= +golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1527,8 +1527,8 @@ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= +golang.org/x/oauth2 v0.2.0 h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU= +golang.org/x/oauth2 v0.2.0/go.mod h1:Cwn6afJ8jrQwYMxQDTpISoXmXW9I6qF6vDeuuoX3Ibs= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1645,13 +1645,14 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 h1:EH1Deb8WZJ0xc0WK//leUHXcX9aLE5SymusoTmMZye8= -golang.org/x/term v0.0.0-20220411215600-e5f449aeb171/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.2.0 h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM= +golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1756,8 +1757,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 7275077049d..25af977ffb8 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -88,6 +88,19 @@ type SinkConfig struct { ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"` SchemaRegistry string `toml:"schema-registry" json:"schema-registry"` EncoderConcurrency int `toml:"encoder-concurrency" json:"encoder-concurrency"` + + KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` +} + +// KafkaConfig represents a kafka sink configuration +type KafkaConfig struct { + SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` + SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` } // DispatchRule represents partition rule for a table. diff --git a/pkg/security/sasl.go b/pkg/security/sasl.go index 8077589d547..3be79701b54 100644 --- a/pkg/security/sasl.go +++ b/pkg/security/sasl.go @@ -35,6 +35,8 @@ const ( SCRAM512Mechanism SASLMechanism = sarama.SASLTypeSCRAMSHA512 // GSSAPIMechanism means the SASL mechanism is GSSAPI. GSSAPIMechanism SASLMechanism = sarama.SASLTypeGSSAPI + // OAuthMechanism means the SASL mechanism is OAuth2. + OAuthMechanism SASLMechanism = sarama.SASLTypeOAuth ) // SASLMechanismFromString converts the string to SASL mechanism. @@ -48,6 +50,8 @@ func SASLMechanismFromString(s string) (SASLMechanism, error) { return SCRAM512Mechanism, nil case "gssapi": return GSSAPIMechanism, nil + case "oauthbearer": + return OAuthMechanism, nil default: return UnknownMechanism, errors.Errorf("unknown %s SASL mechanism", s) } @@ -55,10 +59,47 @@ func SASLMechanismFromString(s string) (SASLMechanism, error) { // SASL holds necessary path parameter to support sasl-scram type SASL struct { - SASLUser string `toml:"sasl-user" json:"sasl-user"` - SASLPassword string `toml:"sasl-password" json:"sasl-password"` - SASLMechanism SASLMechanism `toml:"sasl-mechanism" json:"sasl-mechanism"` - GSSAPI GSSAPI `toml:"sasl-gssapi" json:"sasl-gssapi"` + SASLUser string + SASLPassword string + SASLMechanism SASLMechanism + GSSAPI GSSAPI + OAuth2 OAuth2 +} + +// OAuth2 holds necessary parameters to support sasl-oauth2. +type OAuth2 struct { + ClientID string + ClientSecret string + TokenURL string + Scopes []string + GrantType string + Audience string +} + +// Validate validates the parameters of OAuth2. +// Some parameters are required, some are optional. +func (o *OAuth2) Validate() error { + if len(o.ClientID) == 0 { + return errors.New("OAuth2 client id is empty") + } + if len(o.ClientSecret) == 0 { + return errors.New("OAuth2 client secret is empty") + } + if len(o.TokenURL) == 0 { + return errors.New("OAuth2 token url is empty") + } + return nil +} + +// SetDefault sets the default value of OAuth2. +func (o *OAuth2) SetDefault() { + o.GrantType = "client_credentials" +} + +// IsEnable checks whether the OAuth2 is enabled. +// One of values of ClientID, ClientSecret and TokenURL is not empty means enabled. +func (o *OAuth2) IsEnable() bool { + return len(o.ClientID) > 0 || len(o.ClientSecret) > 0 || len(o.TokenURL) > 0 } // GSSAPIAuthType defines the type of GSSAPI authentication.