Skip to content

Commit

Permalink
chore: refactor scaler param getters to use generics
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com>
  • Loading branch information
wozniakjan committed Jan 16, 2024
1 parent 8b8a4f2 commit f03313c
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 167 deletions.
93 changes: 46 additions & 47 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"reflect"
"strings"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -120,26 +119,26 @@ func NewApacheKafkaScaler(ctx context.Context, config *ScalerConfig) (Scaler, er
func parseApacheKafkaAuthParams(config *ScalerConfig, meta *apacheKafkaMetadata) error {
meta.enableTLS = false
var enableTLS bool
tlsString, err := getParameterFromConfigV2(config, "tls", true, true, false, true, "disable", reflect.TypeOf(""))
tlsString, err := getParameterFromConfigV2(config, "tls", true, true, false, true, "disable")
if err != nil {
return fmt.Errorf("error incorrect TLS value given. %w", err)
}
tlsString = strings.TrimSpace(tlsString.(string))
switch tlsString.(string) {
tlsString = strings.TrimSpace(tlsString)
switch tlsString {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return fmt.Errorf("error incorrect TLS value given, got %s", tlsString.(string))
return fmt.Errorf("error incorrect TLS value given, got %s", tlsString)
}

if enableTLS {
certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, true, "", reflect.TypeOf(""))
certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, true, "")
if err != nil {
return err
}
keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, true, "", reflect.TypeOf(""))
keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, true, "")
if err != nil {
return err
}
Expand All @@ -149,47 +148,47 @@ func parseApacheKafkaAuthParams(config *ScalerConfig, meta *apacheKafkaMetadata)
if keyGiven == "" && certGiven != "" {
return errors.New("cert must be provided with key")
}
ca, err := getParameterFromConfigV2(config, "ca", false, true, false, true, "", reflect.TypeOf(""))
ca, err := getParameterFromConfigV2(config, "ca", false, true, false, true, "")
if err != nil {
return err
}
meta.ca = ca.(string)
meta.cert = certGiven.(string)
meta.key = keyGiven.(string)
keyPassword, err := getParameterFromConfigV2(config, "keyPassword", false, true, false, true, "", reflect.TypeOf(""))
meta.ca = ca
meta.cert = certGiven
meta.key = keyGiven
keyPassword, err := getParameterFromConfigV2(config, "keyPassword", false, true, false, true, "")
if err != nil {
return err
}
meta.keyPassword = keyPassword.(string)
meta.keyPassword = keyPassword
meta.enableTLS = true
}

meta.saslType = KafkaSASLTypeNone
saslAuthType, err := getParameterFromConfigV2(config, "sasl", true, true, false, true, "", reflect.TypeOf(""))
saslAuthType, err := getParameterFromConfigV2(config, "sasl", true, true, false, true, "")
if err != nil {
return err
}

if saslAuthType != "" {
saslAuthType = strings.TrimSpace(saslAuthType.(string))
switch mode := kafkaSaslType(saslAuthType.(string)); mode {
saslAuthType = strings.TrimSpace(saslAuthType)
switch mode := kafkaSaslType(saslAuthType); mode {
case KafkaSASLTypeMskIam:
meta.saslType = mode
awsEndpoint, err := getParameterFromConfigV2(config, "awsEndpoint", true, false, false, true, "", reflect.TypeOf(""))
awsEndpoint, err := getParameterFromConfigV2(config, "awsEndpoint", true, false, false, true, "")
if err != nil {
return err
}
if awsEndpoint != "" {
meta.awsEndpoint = awsEndpoint.(string)
meta.awsEndpoint = awsEndpoint
}
if !meta.enableTLS {
return errors.New("TLS is required for MSK")
}
awsRegion, err := getParameterFromConfigV2(config, "awsRegion", true, false, false, false, "", reflect.TypeOf(""))
awsRegion, err := getParameterFromConfigV2(config, "awsRegion", true, false, false, false, "")
if err != nil {
return fmt.Errorf("%w. No awsRegion given", err)
}
meta.awsRegion = awsRegion.(string)
meta.awsRegion = awsRegion
auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
if err != nil {
return err
Expand All @@ -200,17 +199,17 @@ func parseApacheKafkaAuthParams(config *ScalerConfig, meta *apacheKafkaMetadata)
case KafkaSASLTypeSCRAMSHA256:
fallthrough
case KafkaSASLTypeSCRAMSHA512:
username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf(""))
username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "")
if err != nil {
return fmt.Errorf("%w. No username given", err)
}
meta.username = strings.TrimSpace(username.(string))
meta.username = strings.TrimSpace(username)

password, err := getParameterFromConfigV2(config, "password", false, true, false, false, "", reflect.TypeOf(""))
password, err := getParameterFromConfigV2(config, "password", false, true, false, false, "")
if err != nil {
return fmt.Errorf("%w. No password given", err)
}
meta.password = strings.TrimSpace(password.(string))
meta.password = strings.TrimSpace(password)
case KafkaSASLTypeOAuthbearer:
return errors.New("SASL/OAUTHBEARER is not implemented yet")
default:
Expand All @@ -223,19 +222,19 @@ func parseApacheKafkaAuthParams(config *ScalerConfig, meta *apacheKafkaMetadata)

func parseApacheKafkaMetadata(config *ScalerConfig, logger logr.Logger) (apacheKafkaMetadata, error) {
meta := apacheKafkaMetadata{}
bootstrapServers, err := getParameterFromConfigV2(config, "bootstrapServers", true, false, true, false, "", reflect.TypeOf(""))
bootstrapServers, err := getParameterFromConfigV2(config, "bootstrapServers", true, false, true, false, "")
if err != nil {
return meta, fmt.Errorf("no bootstrapServers given. %w", err)
}
meta.bootstrapServers = strings.Split(bootstrapServers.(string), ",")
meta.bootstrapServers = strings.Split(bootstrapServers, ",")

consumerGroup, err := getParameterFromConfigV2(config, "consumerGroup", true, false, true, false, "", reflect.TypeOf(""))
consumerGroup, err := getParameterFromConfigV2(config, "consumerGroup", true, false, true, false, "")
if err != nil {
return meta, fmt.Errorf("no consumer group given. %w", err)
}
meta.group = consumerGroup.(string)
meta.group = consumerGroup

topic, err := getParameterFromConfigV2(config, "topic", true, false, true, true, "", reflect.TypeOf(""))
topic, err := getParameterFromConfigV2(config, "topic", true, false, true, true, "")
if err != nil {
return meta, err
}
Expand All @@ -244,19 +243,19 @@ func parseApacheKafkaMetadata(config *ScalerConfig, logger logr.Logger) (apacheK
logger.V(1).Info(fmt.Sprintf("consumer group %q has no topics specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
} else {
meta.topic = strings.Split(topic.(string), ",")
meta.topic = strings.Split(topic, ",")
}

meta.partitionLimitation = nil
partitionLimitationMetadata, err := getParameterFromConfigV2(config, "partitionLimitation", true, false, false, true, "", reflect.TypeOf(""))
partitionLimitationMetadata, err := getParameterFromConfigV2(config, "partitionLimitation", true, false, false, true, "")
if err != nil {
return meta, err
}
if partitionLimitationMetadata != "" {
if meta.topic == nil || len(meta.topic) == 0 {
logger.V(1).Info("no specific topics set, ignoring partitionLimitation setting")
} else {
pattern := strings.TrimSpace(partitionLimitationMetadata.(string))
pattern := strings.TrimSpace(partitionLimitationMetadata)
parsed, err := kedautil.ParseInt32List(pattern)
if err != nil {
return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %w", pattern, err)
Expand All @@ -267,64 +266,64 @@ func parseApacheKafkaMetadata(config *ScalerConfig, logger logr.Logger) (apacheK
}

meta.offsetResetPolicy = defaultOffsetResetPolicy
offsetResetPolicyRaw, err := getParameterFromConfigV2(config, "offsetResetPolicy", true, false, false, true, "", reflect.TypeOf(""))
offsetResetPolicyRaw, err := getParameterFromConfigV2(config, "offsetResetPolicy", true, false, false, true, "")
if err != nil {
return meta, err
}
if offsetResetPolicyRaw != "" {
policy := offsetResetPolicy(offsetResetPolicyRaw.(string))
policy := offsetResetPolicy(offsetResetPolicyRaw)
if policy != earliest && policy != latest {
return meta, fmt.Errorf("err offsetResetPolicy policy %q given", offsetResetPolicyRaw)
}
meta.offsetResetPolicy = policy
}

meta.lagThreshold = defaultKafkaLagThreshold
lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64))
lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold)
if err != nil {
return meta, err
}
if lagThreshold.(int) <= 0 {
if lagThreshold <= 0 {
return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName)
}
meta.lagThreshold = int64(lagThreshold.(int))
meta.lagThreshold = lagThreshold

meta.activationLagThreshold = defaultKafkaActivationLagThreshold
activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, int64(defaultKafkaActivationLagThreshold), reflect.TypeOf(int64(64)))
activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, defaultKafkaActivationLagThreshold)
if err != nil {
return meta, err
}
if activationLagThreshold.(int64) < 0 {
if activationLagThreshold < 0 {
return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName)
}

if err := parseApacheKafkaAuthParams(config, &meta); err != nil {
return meta, err
}

allowIDConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false, reflect.TypeOf(true))
allowIDConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false)
if err != nil {
return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err)
}
meta.allowIdleConsumers = allowIDConsumers.(bool)
meta.allowIdleConsumers = allowIDConsumers

excludePersistentLag, err := getParameterFromConfigV2(config, "excludePersistentLag", true, false, false, true, false, reflect.TypeOf(true))
excludePersistentLag, err := getParameterFromConfigV2(config, "excludePersistentLag", true, false, false, true, false)
if err != nil {
return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err)
}
meta.excludePersistentLag = excludePersistentLag.(bool)
meta.excludePersistentLag = excludePersistentLag

scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2(config, "scaleToZeroOnInvalidOffset", true, false, false, true, false, reflect.TypeOf(true))
scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2(config, "scaleToZeroOnInvalidOffset", true, false, false, true, false)
if err != nil {
return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err)
}
meta.scaleToZeroOnInvalidOffset = scaleToZeroOnInvalidOffset.(bool)
meta.scaleToZeroOnInvalidOffset = scaleToZeroOnInvalidOffset

limitToPartitionsWithLag, err := getParameterFromConfigV2(config, "limitToPartitionsWithLag", true, false, false, true, false, reflect.TypeOf(true))
limitToPartitionsWithLag, err := getParameterFromConfigV2(config, "limitToPartitionsWithLag", true, false, false, true, false)
if err != nil {
return meta, err
}
meta.limitToPartitionsWithLag = limitToPartitionsWithLag.(bool)
meta.limitToPartitionsWithLag = limitToPartitionsWithLag
if meta.allowIdleConsumers && meta.limitToPartitionsWithLag {
return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously")
}
Expand Down
Loading

0 comments on commit f03313c

Please sign in to comment.