diff --git a/apis/keda/v1alpha1/triggerauthentication_types.go b/apis/keda/v1alpha1/triggerauthentication_types.go index 9483abedb6b..8d4bab2464d 100644 --- a/apis/keda/v1alpha1/triggerauthentication_types.go +++ b/apis/keda/v1alpha1/triggerauthentication_types.go @@ -118,9 +118,9 @@ const ( PodIdentityProviderAzure PodIdentityProvider = "azure" PodIdentityProviderAzureWorkload PodIdentityProvider = "azure-workload" PodIdentityProviderGCP PodIdentityProvider = "gcp" - PodIdentityProviderSpiffe PodIdentityProvider = "spiffe" PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks" PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam" + PodIdentityProviderAws PodIdentityProvider = "aws" ) // PodIdentityAnnotationEKS specifies aws role arn for aws-eks Identity Provider @@ -133,9 +133,15 @@ const ( // AuthPodIdentity allows users to select the platform native identity // mechanism type AuthPodIdentity struct { + // +kubebuilder:validation:Enum=azure;azure-workload;gcp;aws;aws-eks;aws-kiam Provider PodIdentityProvider `json:"provider"` // +optional IdentityID *string `json:"identityId"` + // +optional + RoleArn string `json:"roleArn"` + // +kubebuilder:validation:Enum=keda;workload + // +optional + IdentityOwner *string `json:"identityOwner"` } func (a *AuthPodIdentity) GetIdentityID() string { @@ -145,6 +151,13 @@ func (a *AuthPodIdentity) GetIdentityID() string { return *a.IdentityID } +func (a *AuthPodIdentity) IsKedaIdentityOwner() bool { + if a.IdentityOwner == nil { + return true + } + return *a.IdentityOwner == "keda" +} + // AuthConfigMapTargetRef is used to authenticate using a reference to a config map type AuthConfigMapTargetRef AuthTargetRef diff --git a/apis/keda/v1alpha1/triggerauthentication_webhook.go b/apis/keda/v1alpha1/triggerauthentication_webhook.go index 72b14e1b388..3bd1944839e 100644 --- a/apis/keda/v1alpha1/triggerauthentication_webhook.go +++ b/apis/keda/v1alpha1/triggerauthentication_webhook.go @@ -113,6 +113,10 @@ func validateSpec(spec *TriggerAuthenticationSpec) (admission.Warnings, error) { if spec.PodIdentity.IdentityID != nil && *spec.PodIdentity.IdentityID == "" { return nil, fmt.Errorf("identityid of PodIdentity should not be empty. If it's set, identityId has to be different than \"\"") } + case PodIdentityProviderAws: + if spec.PodIdentity.RoleArn != "" && !spec.PodIdentity.IsKedaIdentityOwner() { + return nil, fmt.Errorf("identityid of PodIdentity should not be empty. If it's set, identityId has to be different than \"\"") + } default: return nil, nil } diff --git a/apis/keda/v1alpha1/triggerauthentication_webhook_test.go b/apis/keda/v1alpha1/triggerauthentication_webhook_test.go index b18585ff97b..f625afc7830 100644 --- a/apis/keda/v1alpha1/triggerauthentication_webhook_test.go +++ b/apis/keda/v1alpha1/triggerauthentication_webhook_test.go @@ -24,13 +24,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var _ = It("validate triggerauthentication when IdentityID is nil", func() { +var _ = It("validate triggerauthentication when IdentityID is nil, roleArn is empty and identityOwner is nil", func() { namespaceName := "nilidentityid" namespace := createNamespace(namespaceName) err := k8sClient.Create(context.Background(), namespace) Expect(err).ToNot(HaveOccurred()) - spec := createTriggerAuthenticationSpecWithPodIdentity(nil) + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", nil, nil) ta := createTriggerAuthentication("nilidentityidta", namespaceName, "TriggerAuthentication", spec) Eventually(func() error { return k8sClient.Create(context.Background(), ta) @@ -44,7 +44,7 @@ var _ = It("validate triggerauthentication when IdentityID is empty", func() { Expect(err).ToNot(HaveOccurred()) identityID := "" - spec := createTriggerAuthenticationSpecWithPodIdentity(&identityID) + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", &identityID, nil) ta := createTriggerAuthentication("emptyidentityidta", namespaceName, "TriggerAuthentication", spec) Eventually(func() error { return k8sClient.Create(context.Background(), ta) @@ -58,7 +58,76 @@ var _ = It("validate triggerauthentication when IdentityID is not empty", func() Expect(err).ToNot(HaveOccurred()) identityID := "12345" - spec := createTriggerAuthenticationSpecWithPodIdentity(&identityID) + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", &identityID, nil) + ta := createTriggerAuthentication("identityidta", namespaceName, "TriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +var _ = It("validate triggerauthentication when RoleArn is not empty and IdentityOwner is nil", func() { + namespaceName := "rolearn" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "Helo", nil, nil) + ta := createTriggerAuthentication("identityidta", namespaceName, "TriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +var _ = It("validate triggerauthentication when RoleArn is not empty and IdentityOwner is keda", func() { + namespaceName := "rolearnandkedaowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "keda" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "Helo", nil, &identityOwner) + ta := createTriggerAuthentication("identityidta", namespaceName, "TriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +var _ = It("validate triggerauthentication when RoleArn is not empty and IdentityOwner is workload", func() { + namespaceName := "rolearnandworkloadowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "workload" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "Helo", nil, &identityOwner) + ta := createTriggerAuthentication("identityidta", namespaceName, "TriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).Should(HaveOccurred()) +}) + +var _ = It("validate triggerauthentication when RoleArn is empty and IdentityOwner is keda", func() { + namespaceName := "kedaowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "keda" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", nil, &identityOwner) + ta := createTriggerAuthentication("identityidta", namespaceName, "TriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +var _ = It("validate triggerauthentication when RoleArn is not empty and IdentityOwner is workload", func() { + namespaceName := "workloadowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "workload" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", nil, &identityOwner) ta := createTriggerAuthentication("identityidta", namespaceName, "TriggerAuthentication", spec) Eventually(func() error { return k8sClient.Create(context.Background(), ta) @@ -71,7 +140,7 @@ var _ = It("validate clustertriggerauthentication when IdentityID is nil", func( err := k8sClient.Create(context.Background(), namespace) Expect(err).ToNot(HaveOccurred()) - spec := createTriggerAuthenticationSpecWithPodIdentity(nil) + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", nil, nil) ta := createTriggerAuthentication("clusternilidentityidta", namespaceName, "ClusterTriggerAuthentication", spec) Eventually(func() error { return k8sClient.Create(context.Background(), ta) @@ -85,7 +154,7 @@ var _ = It("validate clustertriggerauthentication when IdentityID is empty", fun Expect(err).ToNot(HaveOccurred()) identityID := "" - spec := createTriggerAuthenticationSpecWithPodIdentity(&identityID) + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", &identityID, nil) ta := createTriggerAuthentication("clusteremptyidentityidta", namespaceName, "ClusterTriggerAuthentication", spec) Eventually(func() error { return k8sClient.Create(context.Background(), ta) @@ -99,18 +168,89 @@ var _ = It("validate clustertriggerauthentication when IdentityID is not empty", Expect(err).ToNot(HaveOccurred()) identityID := "12345" - spec := createTriggerAuthenticationSpecWithPodIdentity(&identityID) + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", &identityID, nil) ta := createTriggerAuthentication("clusteridentityidta", namespaceName, "ClusterTriggerAuthentication", spec) Eventually(func() error { return k8sClient.Create(context.Background(), ta) }).ShouldNot(HaveOccurred()) }) -func createTriggerAuthenticationSpecWithPodIdentity(identityID *string) TriggerAuthenticationSpec { +var _ = It("validate clustertriggerauthentication when RoleArn is not empty and IdentityOwner is nil", func() { + namespaceName := "clusterrolearn" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "Helo", nil, nil) + ta := createTriggerAuthentication("clusteridentityidta", namespaceName, "ClusterTriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +var _ = It("validate clustertriggerauthentication when RoleArn is not empty and IdentityOwner is keda", func() { + namespaceName := "clusterrolearnandkedaowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "keda" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "Helo", nil, &identityOwner) + ta := createTriggerAuthentication("clusteridentityidta", namespaceName, "ClusterTriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +var _ = It("validate clustertriggerauthentication when RoleArn is not empty and IdentityOwner is workload", func() { + namespaceName := "clusterrolearnandworkloadowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "workload" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "Helo", nil, &identityOwner) + ta := createTriggerAuthentication("clusteridentityidta", namespaceName, "ClusterTriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).Should(HaveOccurred()) +}) + +var _ = It("validate clustertriggerauthentication when RoleArn is empty and IdentityOwner is keda", func() { + namespaceName := "clusterandkedaowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "keda" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", nil, &identityOwner) + ta := createTriggerAuthentication("clusteridentityidta", namespaceName, "ClusterTriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +var _ = It("validate clustertriggerauthentication when RoleArn is not empty and IdentityOwner is workload", func() { + namespaceName := "clusterandworkloadowner" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + identityOwner := "workload" + spec := createTriggerAuthenticationSpecWithPodIdentity(PodIdentityProviderAzure, "", nil, &identityOwner) + ta := createTriggerAuthentication("clusteridentityidta", namespaceName, "TriggerAuthentication", spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ta) + }).ShouldNot(HaveOccurred()) +}) + +func createTriggerAuthenticationSpecWithPodIdentity(provider PodIdentityProvider, roleArn string, identityID, identityOwner *string) TriggerAuthenticationSpec { return TriggerAuthenticationSpec{ PodIdentity: &AuthPodIdentity{ - Provider: PodIdentityProviderAzure, - IdentityID: identityID, + Provider: PodIdentityProviderAzure, + IdentityID: identityID, + RoleArn: roleArn, + IdentityOwner: identityOwner, }, } } diff --git a/apis/keda/v1alpha1/zz_generated.deepcopy.go b/apis/keda/v1alpha1/zz_generated.deepcopy.go index 9435a9c4829..cd2b665aea9 100755 --- a/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -86,6 +86,11 @@ func (in *AuthPodIdentity) DeepCopyInto(out *AuthPodIdentity) { *out = new(string) **out = **in } + if in.IdentityOwner != nil { + in, out := &in.IdentityOwner, &out.IdentityOwner + *out = new(string) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthPodIdentity. diff --git a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml index cc9cacc688f..a0df3b0e3c3 100644 --- a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml +++ b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml @@ -111,8 +111,22 @@ spec: properties: identityId: type: string + identityOwner: + enum: + - keda + - workload + type: string provider: description: PodIdentityProvider contains the list of providers + enum: + - azure + - azure-workload + - gcp + - aws + - aws-eks + - aws-kiam + type: string + roleArn: type: string required: - provider @@ -243,8 +257,22 @@ spec: properties: identityId: type: string + identityOwner: + enum: + - keda + - workload + type: string provider: description: PodIdentityProvider contains the list of providers + enum: + - azure + - azure-workload + - gcp + - aws + - aws-eks + - aws-kiam + type: string + roleArn: type: string required: - provider diff --git a/config/crd/bases/keda.sh_triggerauthentications.yaml b/config/crd/bases/keda.sh_triggerauthentications.yaml index 6589a44301b..d981aea0705 100644 --- a/config/crd/bases/keda.sh_triggerauthentications.yaml +++ b/config/crd/bases/keda.sh_triggerauthentications.yaml @@ -110,8 +110,22 @@ spec: properties: identityId: type: string + identityOwner: + enum: + - keda + - workload + type: string provider: description: PodIdentityProvider contains the list of providers + enum: + - azure + - azure-workload + - gcp + - aws + - aws-eks + - aws-kiam + type: string + roleArn: type: string required: - provider @@ -242,8 +256,22 @@ spec: properties: identityId: type: string + identityOwner: + enum: + - keda + - workload + type: string provider: description: PodIdentityProvider contains the list of providers + enum: + - azure + - azure-workload + - gcp + - aws + - aws-eks + - aws-kiam + type: string + roleArn: type: string required: - provider diff --git a/pkg/metricsservice/api/metrics.pb.go b/pkg/metricsservice/api/metrics.pb.go index 919cb333713..50d57a32a09 100644 --- a/pkg/metricsservice/api/metrics.pb.go +++ b/pkg/metricsservice/api/metrics.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.31.0 -// protoc v4.23.2 +// protoc v4.24.4 // source: metrics.proto package api diff --git a/pkg/metricsservice/api/metrics_grpc.pb.go b/pkg/metricsservice/api/metrics_grpc.pb.go index 9eae639dc04..935372927f9 100644 --- a/pkg/metricsservice/api/metrics_grpc.pb.go +++ b/pkg/metricsservice/api/metrics_grpc.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.2 +// - protoc v4.24.4 // source: metrics.proto package api diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 5d70d2ee660..15310c2dec4 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -196,7 +196,7 @@ func parseApacheKafkaAuthParams(config *ScalerConfig, meta *apacheKafkaMetadata) } else { return errors.New("no awsRegion given") } - auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + auth, err := getAwsAuthorization(config) if err != nil { return err } diff --git a/pkg/scalers/aws_cloudwatch_scaler.go b/pkg/scalers/aws_cloudwatch_scaler.go index deea194e4eb..597615f8788 100644 --- a/pkg/scalers/aws_cloudwatch_scaler.go +++ b/pkg/scalers/aws_cloudwatch_scaler.go @@ -230,7 +230,7 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e meta.awsEndpoint = val } - awsAuthorization, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + awsAuthorization, err := getAwsAuthorization(config) if err != nil { return nil, err } diff --git a/pkg/scalers/aws_common.go b/pkg/scalers/aws_common.go index 9cad15ce3d3..30a003275d6 100644 --- a/pkg/scalers/aws_common.go +++ b/pkg/scalers/aws_common.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials/stscreds" "github.com/aws/aws-sdk-go-v2/service/sts" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) // ErrAwsNoAccessKey is returned when awsAccessKeyID is missing. @@ -23,6 +24,10 @@ type awsAuthorizationMetadata struct { awsSessionToken string podIdentityOwner bool + // Pod identity owner is confusing + // and it'll be removed when we get + // rid of the old aws podIdentities + usingPodIdentity bool } type awsConfigMetadata struct { @@ -42,7 +47,9 @@ func getAwsConfig(ctx context.Context, awsRegion string, awsAuthorization awsAut if err != nil { return nil, err } - if !metadata.awsAuthorization.podIdentityOwner { + + if !metadata.awsAuthorization.usingPodIdentity && + !metadata.awsAuthorization.podIdentityOwner { return &cfg, nil } if metadata.awsAuthorization.awsAccessKeyID != "" && metadata.awsAuthorization.awsSecretAccessKey != "" { @@ -59,36 +66,46 @@ func getAwsConfig(ctx context.Context, awsRegion string, awsAuthorization awsAut return &cfg, err } -func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (awsAuthorizationMetadata, error) { +func getAwsAuthorization(config *ScalerConfig) (awsAuthorizationMetadata, error) { meta := awsAuthorizationMetadata{} - if metadata["identityOwner"] == "operator" { + if config.PodIdentity.Provider == kedav1alpha1.PodIdentityProviderAws { + meta.usingPodIdentity = true + if val, ok := config.AuthParams["awsRoleArn"]; ok && val != "" { + meta.awsRoleArn = val + } + return meta, nil + } + // TODO, remove all the logic below and just keep the logic for + // parsing awsAccessKeyID, awsSecretAccessKey and awsSessionToken + + if config.TriggerMetadata["identityOwner"] == "operator" { meta.podIdentityOwner = false - } else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" { + } else if config.TriggerMetadata["identityOwner"] == "" || config.TriggerMetadata["identityOwner"] == "pod" { meta.podIdentityOwner = true switch { - case authParams["awsRoleArn"] != "": - meta.awsRoleArn = authParams["awsRoleArn"] - case (authParams["awsAccessKeyID"] != "" || authParams["awsAccessKeyId"] != "") && authParams["awsSecretAccessKey"] != "": - meta.awsAccessKeyID = authParams["awsAccessKeyID"] + case config.AuthParams["awsRoleArn"] != "": + meta.awsRoleArn = config.AuthParams["awsRoleArn"] + case (config.AuthParams["awsAccessKeyID"] != "" || config.AuthParams["awsAccessKeyId"] != "") && config.AuthParams["awsSecretAccessKey"] != "": + meta.awsAccessKeyID = config.AuthParams["awsAccessKeyID"] if meta.awsAccessKeyID == "" { - meta.awsAccessKeyID = authParams["awsAccessKeyId"] + meta.awsAccessKeyID = config.AuthParams["awsAccessKeyId"] } - meta.awsSecretAccessKey = authParams["awsSecretAccessKey"] - meta.awsSessionToken = authParams["awsSessionToken"] + meta.awsSecretAccessKey = config.AuthParams["awsSecretAccessKey"] + meta.awsSessionToken = config.AuthParams["awsSessionToken"] default: - if metadata["awsAccessKeyID"] != "" { - meta.awsAccessKeyID = metadata["awsAccessKeyID"] - } else if metadata["awsAccessKeyIDFromEnv"] != "" { - meta.awsAccessKeyID = resolvedEnv[metadata["awsAccessKeyIDFromEnv"]] + if config.TriggerMetadata["awsAccessKeyID"] != "" { + meta.awsAccessKeyID = config.TriggerMetadata["awsAccessKeyID"] + } else if config.TriggerMetadata["awsAccessKeyIDFromEnv"] != "" { + meta.awsAccessKeyID = config.ResolvedEnv[config.TriggerMetadata["awsAccessKeyIDFromEnv"]] } if len(meta.awsAccessKeyID) == 0 { return meta, ErrAwsNoAccessKey } - if metadata["awsSecretAccessKeyFromEnv"] != "" { - meta.awsSecretAccessKey = resolvedEnv[metadata["awsSecretAccessKeyFromEnv"]] + if config.TriggerMetadata["awsSecretAccessKeyFromEnv"] != "" { + meta.awsSecretAccessKey = config.ResolvedEnv[config.TriggerMetadata["awsSecretAccessKeyFromEnv"]] } if len(meta.awsSecretAccessKey) == 0 { diff --git a/pkg/scalers/aws_dynamodb_scaler.go b/pkg/scalers/aws_dynamodb_scaler.go index 05b173fd1ab..98987a02c17 100644 --- a/pkg/scalers/aws_dynamodb_scaler.go +++ b/pkg/scalers/aws_dynamodb_scaler.go @@ -170,7 +170,7 @@ func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error meta.activationTargetValue = 0 } - auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + auth, err := getAwsAuthorization(config) if err != nil { return nil, err } diff --git a/pkg/scalers/aws_dynamodb_streams_scaler.go b/pkg/scalers/aws_dynamodb_streams_scaler.go index 40c5b32a641..c1973680662 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler.go @@ -111,7 +111,7 @@ func parseAwsDynamoDBStreamsMetadata(config *ScalerConfig, logger logr.Logger) ( } } - auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + auth, err := getAwsAuthorization(config) if err != nil { return nil, err } diff --git a/pkg/scalers/aws_kinesis_stream_scaler.go b/pkg/scalers/aws_kinesis_stream_scaler.go index 95ff02643a1..bdb9694f88e 100644 --- a/pkg/scalers/aws_kinesis_stream_scaler.go +++ b/pkg/scalers/aws_kinesis_stream_scaler.go @@ -116,7 +116,7 @@ func parseAwsKinesisStreamMetadata(config *ScalerConfig, logger logr.Logger) (*a meta.awsEndpoint = val } - auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + auth, err := getAwsAuthorization(config) if err != nil { return nil, err } diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 40e39217d10..051c64d7d89 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -175,7 +175,7 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs meta.awsEndpoint = val } - auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + auth, err := getAwsAuthorization(config) if err != nil { return nil, err } diff --git a/pkg/scalers/externalscaler/externalscaler.pb.go b/pkg/scalers/externalscaler/externalscaler.pb.go index a8515ecc162..459c61bfec9 100644 --- a/pkg/scalers/externalscaler/externalscaler.pb.go +++ b/pkg/scalers/externalscaler/externalscaler.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.31.0 -// protoc v4.23.2 +// protoc v4.24.4 // source: externalscaler.proto package externalscaler diff --git a/pkg/scalers/externalscaler/externalscaler_grpc.pb.go b/pkg/scalers/externalscaler/externalscaler_grpc.pb.go index 5489ae58ac4..503b5695932 100644 --- a/pkg/scalers/externalscaler/externalscaler_grpc.pb.go +++ b/pkg/scalers/externalscaler/externalscaler_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.2 +// - protoc v4.24.4 // source: externalscaler.proto package externalscaler diff --git a/pkg/scalers/liiklus/LiiklusService.pb.go b/pkg/scalers/liiklus/LiiklusService.pb.go index 54a8f7b33c8..0ceb91196ef 100644 --- a/pkg/scalers/liiklus/LiiklusService.pb.go +++ b/pkg/scalers/liiklus/LiiklusService.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.31.0 -// protoc v4.23.2 +// protoc v4.24.4 // source: LiiklusService.proto package liiklus diff --git a/pkg/scalers/liiklus/LiiklusService_grpc.pb.go b/pkg/scalers/liiklus/LiiklusService_grpc.pb.go index 51480c39dae..1910dd6b90a 100644 --- a/pkg/scalers/liiklus/LiiklusService_grpc.pb.go +++ b/pkg/scalers/liiklus/LiiklusService_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.2 +// - protoc v4.24.4 // source: LiiklusService.proto package liiklus diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index 221e0ccd8c4..1c4c23d470f 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -187,6 +187,27 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log } switch podIdentity.Provider { + case kedav1alpha1.PodIdentityProviderAws: + if podIdentity.RoleArn != "" { + if !podIdentity.IsKedaIdentityOwner() { + return nil, kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, + fmt.Errorf("roleArn can't be set if KEDA isn't identity owner, current value: '%s'", *podIdentity.IdentityOwner) + } + authParams["awsRoleArn"] = podIdentity.RoleArn + } + if !podIdentity.IsKedaIdentityOwner() { + serviceAccountName := defaultServiceAccount + if podTemplateSpec.Spec.ServiceAccountName != "" { + serviceAccountName = podTemplateSpec.Spec.ServiceAccountName + } + serviceAccount := &corev1.ServiceAccount{} + err := client.Get(ctx, types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount) + if err != nil { + return nil, kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, + fmt.Errorf("error getting service account: '%s', error: %w", serviceAccountName, err) + } + authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS] + } case kedav1alpha1.PodIdentityProviderAwsEKS: serviceAccountName := defaultServiceAccount if podTemplateSpec.Spec.ServiceAccountName != "" { @@ -202,10 +223,6 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log case kedav1alpha1.PodIdentityProviderAwsKiam: authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam] case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: - if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzure { - // FIXME: Delete this for v2.15 - logger.Info("WARNING: Azure AD Pod Identity has been archived (https://github.com/Azure/aad-pod-identity#-announcement) and will be removed from KEDA on v2.15") - } if podIdentity.IdentityID != nil && *podIdentity.IdentityID == "" { return nil, kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, fmt.Errorf("IdentityID of PodIdentity should not be empty") } diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 19af188a264..603b9c6b8bf 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -68,6 +68,19 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp } authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace, h.secretsLister) + switch podIdentity.Provider { + case kedav1alpha1.PodIdentityProviderAzure: + // FIXME: Delete this for v2.15 + logger.Info("WARNING: Azure AD Pod Identity has been archived (https://github.com/Azure/aad-pod-identity#-announcement) and will be removed from KEDA on v2.15") + case kedav1alpha1.PodIdentityProviderAwsKiam: + // FIXME: Delete this for v2.15 + logger.Info("WARNING: AWS Kiam Identity has been abandoned (https://github.com/uswitch/kiam) and will be removed from KEDA on v2.15") + case kedav1alpha1.PodIdentityProviderAwsEKS: + // FIXME: Delete this for v3 + logger.Info("WARNING: AWS EKS Identity has been deprecated in favor of AWS Identity and will be removed from KEDA on v3") + default: + } + if err != nil { return nil, nil, err } diff --git a/tests/scalers/aws/aws_cloudwatch_pod_identity/aws_cloudwatch_pod_identity_test.go b/tests/scalers/aws/aws_cloudwatch_pod_identity/aws_cloudwatch_pod_identity_test.go index 04697f58af3..12bba970c6a 100644 --- a/tests/scalers/aws/aws_cloudwatch_pod_identity/aws_cloudwatch_pod_identity_test.go +++ b/tests/scalers/aws/aws_cloudwatch_pod_identity/aws_cloudwatch_pod_identity_test.go @@ -51,7 +51,7 @@ metadata: namespace: {{.TestNamespace}} spec: podIdentity: - provider: aws-eks + provider: aws ` deploymentTemplate = ` diff --git a/tests/scalers/aws/aws_cloudwatch_pod_identity_eks/aws_cloudwatch_pod_identity_eks_test.go b/tests/scalers/aws/aws_cloudwatch_pod_identity_eks/aws_cloudwatch_pod_identity_eks_test.go new file mode 100644 index 00000000000..dd372b71adb --- /dev/null +++ b/tests/scalers/aws/aws_cloudwatch_pod_identity_eks/aws_cloudwatch_pod_identity_eks_test.go @@ -0,0 +1,225 @@ +//go:build e2e +// +build e2e + +package aws_cloudwatch_pod_identity_eks_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "aws-cloudwatch-pod-identity-eks-test" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + SecretName string + AwsAccessKeyID string + AwsSecretAccessKey string + AwsRegion string + CloudWatchMetricName string + CloudWatchMetricNamespace string + CloudWatchMetricDimensionName string + CloudWatchMetricDimensionValue string +} + +const ( + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws-eks +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 2 + minReplicaCount: 0 + cooldownPeriod: 1 + triggers: + - type: aws-cloudwatch + authenticationRef: + name: keda-trigger-auth-aws-credentials + metadata: + awsRegion: {{.AwsRegion}} + namespace: {{.CloudWatchMetricNamespace}} + dimensionName: {{.CloudWatchMetricDimensionName}} + dimensionValue: {{.CloudWatchMetricDimensionValue}} + metricName: {{.CloudWatchMetricName}} + targetMetricValue: "1" + activationTargetMetricValue: "5" + minMetricValue: "0" + metricCollectionTime: "120" + metricStatPeriod: "30" + identityOwner: operator +` +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + secretName = fmt.Sprintf("%s-secret", testName) + cloudwatchMetricName = fmt.Sprintf("cw-identity-%d", GetRandomNumber()) + awsAccessKeyID = os.Getenv("TF_AWS_ACCESS_KEY") + awsSecretAccessKey = os.Getenv("TF_AWS_SECRET_KEY") + awsRegion = os.Getenv("TF_AWS_REGION") + cloudwatchMetricNamespace = "KEDA" + cloudwatchMetricDimensionName = "dimensionName" + cloudwatchMetricDimensionValue = "dimensionValue" + maxReplicaCount = 2 + minReplicaCount = 0 +) + +func TestCloudWatchScaler(t *testing.T) { + // setup cloudwatch + cloudwatchClient := createCloudWatchClient() + setCloudWatchCustomMetric(t, cloudwatchClient, 0) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 1), + "replica count should be %d after 1 minute", minReplicaCount) + + // test scaling + testActivation(t, kc, cloudwatchClient) + testScaleOut(t, kc, cloudwatchClient) + testScaleIn(t, kc, cloudwatchClient) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) + + setCloudWatchCustomMetric(t, cloudwatchClient, 0) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, cloudwatchClient *cloudwatch.Client) { + t.Log("--- testing activation ---") + setCloudWatchCustomMetric(t, cloudwatchClient, 3) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, cloudwatchClient *cloudwatch.Client) { + t.Log("--- testing scale out ---") + setCloudWatchCustomMetric(t, cloudwatchClient, 10) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, cloudwatchClient *cloudwatch.Client) { + t.Log("--- testing scale in ---") + + setCloudWatchCustomMetric(t, cloudwatchClient, 0) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +func setCloudWatchCustomMetric(t *testing.T, cloudwatchClient *cloudwatch.Client, value float64) { + _, err := cloudwatchClient.PutMetricData(context.Background(), &cloudwatch.PutMetricDataInput{ + MetricData: []types.MetricDatum{ + { + MetricName: aws.String(cloudwatchMetricName), + Dimensions: []types.Dimension{ + { + Name: aws.String(cloudwatchMetricDimensionName), + Value: aws.String(cloudwatchMetricDimensionValue), + }, + }, + Unit: types.StandardUnitNone, + Value: aws.Float64(value), + }, + }, + Namespace: aws.String(cloudwatchMetricNamespace), + }) + assert.NoErrorf(t, err, "failed to set cloudwatch metric - %s", err) +} + +func createCloudWatchClient() *cloudwatch.Client { + configOptions := make([]func(*config.LoadOptions) error, 0) + configOptions = append(configOptions, config.WithRegion(awsRegion)) + cfg, _ := config.LoadDefaultConfig(context.TODO(), configOptions...) + cfg.Credentials = credentials.NewStaticCredentialsProvider(awsAccessKeyID, awsSecretAccessKey, "") + return cloudwatch.NewFromConfig(cfg) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + AwsAccessKeyID: base64.StdEncoding.EncodeToString([]byte(awsAccessKeyID)), + AwsSecretAccessKey: base64.StdEncoding.EncodeToString([]byte(awsSecretAccessKey)), + AwsRegion: awsRegion, + CloudWatchMetricName: cloudwatchMetricName, + CloudWatchMetricNamespace: cloudwatchMetricNamespace, + CloudWatchMetricDimensionName: cloudwatchMetricDimensionName, + CloudWatchMetricDimensionValue: cloudwatchMetricDimensionValue, + }, []Template{ + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/scalers/aws/aws_dynamodb_pod_identity/aws_dynamodb_pod_identity_test.go b/tests/scalers/aws/aws_dynamodb_pod_identity/aws_dynamodb_pod_identity_test.go index ab5cc6cd7d5..aea9e215e76 100644 --- a/tests/scalers/aws/aws_dynamodb_pod_identity/aws_dynamodb_pod_identity_test.go +++ b/tests/scalers/aws/aws_dynamodb_pod_identity/aws_dynamodb_pod_identity_test.go @@ -53,7 +53,7 @@ metadata: namespace: {{.TestNamespace}} spec: podIdentity: - provider: aws-eks + provider: aws ` deploymentTemplate = ` diff --git a/tests/scalers/aws/aws_dynamodb_pod_identity_eks/aws_dynamodb_pod_identity_eks_test.go b/tests/scalers/aws/aws_dynamodb_pod_identity_eks/aws_dynamodb_pod_identity_eks_test.go new file mode 100644 index 00000000000..70a9c43a27b --- /dev/null +++ b/tests/scalers/aws/aws_dynamodb_pod_identity_eks/aws_dynamodb_pod_identity_eks_test.go @@ -0,0 +1,277 @@ +//go:build e2e +// +build e2e + +package aws_dynamodb_pod_identity_eks_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "aws-dynamodb-pod-identity-eks-test" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + SecretName string + AwsAccessKeyID string + AwsSecretAccessKey string + AwsRegion string + DynamoDBTableName string + ExpressionAttributeNames string + KeyConditionExpression string + ExpressionAttributeValues string +} + +const ( + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws-eks +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 2 + minReplicaCount: 0 + cooldownPeriod: 1 + triggers: + - type: aws-dynamodb + authenticationRef: + name: keda-trigger-auth-aws-credentials + metadata: + awsRegion: {{.AwsRegion}} + tableName: {{.DynamoDBTableName}} + expressionAttributeNames: '{{.ExpressionAttributeNames}}' + keyConditionExpression: '{{.KeyConditionExpression}}' + expressionAttributeValues: '{{.ExpressionAttributeValues}}' + targetValue: '1' + activationTargetValue: '4' + identityOwner: operator +` +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + secretName = fmt.Sprintf("%s-secret", testName) + dynamoDBTableName = fmt.Sprintf("table-identity-%d", GetRandomNumber()) + awsAccessKeyID = os.Getenv("TF_AWS_ACCESS_KEY") + awsSecretAccessKey = os.Getenv("TF_AWS_SECRET_KEY") + awsRegion = os.Getenv("TF_AWS_REGION") + expressionAttributeNames = "{ \"#k\" : \"event_type\"}" + keyConditionExpression = "#k = :key" + expressionAttributeValues = "{ \":key\" : {\"S\":\"scaling_event\"}}" + maxReplicaCount = 2 + minReplicaCount = 0 +) + +func TestDynamoDBScaler(t *testing.T) { + // setup dynamodb + dynamodbClient := createDynamoDBClient() + createDynamoDBTable(t, dynamodbClient) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 1), + "replica count should be %d after 1 minute", minReplicaCount) + + // test scaling + testActivation(t, kc, dynamodbClient) + testScaleOut(t, kc, dynamodbClient) + testScaleIn(t, kc, dynamodbClient) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) + cleanupTable(t, dynamodbClient) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, dynamodbClient *dynamodb.Client) { + t.Log("--- testing activation ---") + addMessages(t, dynamodbClient, 3) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, dynamodbClient *dynamodb.Client) { + t.Log("--- testing scale out ---") + addMessages(t, dynamodbClient, 6) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, dynamodbClient *dynamodb.Client) { + t.Log("--- testing scale in ---") + + for i := 0; i < 6; i++ { + _, err := dynamodbClient.DeleteItem(context.Background(), &dynamodb.DeleteItemInput{ + TableName: aws.String(dynamoDBTableName), + Key: map[string]types.AttributeValue{ + "event_type": &types.AttributeValueMemberS{ + Value: "scaling_event", + }, + "event_id": &types.AttributeValueMemberS{ + Value: strconv.Itoa(i), + }, + }, + }) + assert.NoErrorf(t, err, "failed to delete item - %s", err) + } + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +func addMessages(t *testing.T, dynamodbClient *dynamodb.Client, messages int) { + for i := 0; i < messages; i++ { + _, err := dynamodbClient.PutItem(context.Background(), &dynamodb.PutItemInput{ + TableName: aws.String(dynamoDBTableName), + Item: map[string]types.AttributeValue{ + "event_type": &types.AttributeValueMemberS{ + Value: "scaling_event", + }, + "event_id": &types.AttributeValueMemberS{ + Value: strconv.Itoa(i), + }, + }, + }) + t.Log("Message enqueued") + assert.NoErrorf(t, err, "failed to create item - %s", err) + } +} + +func createDynamoDBTable(t *testing.T, dynamodbClient *dynamodb.Client) { + _, err := dynamodbClient.CreateTable(context.Background(), &dynamodb.CreateTableInput{ + TableName: aws.String(dynamoDBTableName), + KeySchema: []types.KeySchemaElement{ + {AttributeName: aws.String("event_type"), KeyType: types.KeyTypeHash}, + {AttributeName: aws.String("event_id"), KeyType: types.KeyTypeRange}, + }, + AttributeDefinitions: []types.AttributeDefinition{ + {AttributeName: aws.String("event_type"), AttributeType: types.ScalarAttributeTypeS}, + {AttributeName: aws.String("event_id"), AttributeType: types.ScalarAttributeTypeS}, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(5), + WriteCapacityUnits: aws.Int64(5), + }, + }) + assert.NoErrorf(t, err, "failed to create table - %s", err) + done := waitForTableActiveStatus(t, dynamodbClient) + if !done { + assert.True(t, true, "failed to create dynamodb") + } +} + +func waitForTableActiveStatus(t *testing.T, dynamodbClient *dynamodb.Client) bool { + for i := 0; i < 30; i++ { + describe, _ := dynamodbClient.DescribeTable(context.Background(), &dynamodb.DescribeTableInput{ + TableName: aws.String(dynamoDBTableName), + }) + t.Logf("Waiting for table ACTIVE status. current status - %s", describe.Table.TableStatus) + if describe.Table.TableStatus == "ACTIVE" { + return true + } + time.Sleep(time.Second * 2) + } + return false +} + +func cleanupTable(t *testing.T, dynamodbClient *dynamodb.Client) { + t.Log("--- cleaning up ---") + _, err := dynamodbClient.DeleteTable(context.Background(), &dynamodb.DeleteTableInput{ + TableName: aws.String(dynamoDBTableName), + }) + assert.NoErrorf(t, err, "cannot delete stream - %s", err) +} + +func createDynamoDBClient() *dynamodb.Client { + configOptions := make([]func(*config.LoadOptions) error, 0) + configOptions = append(configOptions, config.WithRegion(awsRegion)) + cfg, _ := config.LoadDefaultConfig(context.TODO(), configOptions...) + cfg.Credentials = credentials.NewStaticCredentialsProvider(awsAccessKeyID, awsSecretAccessKey, "") + return dynamodb.NewFromConfig(cfg) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + AwsAccessKeyID: base64.StdEncoding.EncodeToString([]byte(awsAccessKeyID)), + AwsSecretAccessKey: base64.StdEncoding.EncodeToString([]byte(awsSecretAccessKey)), + AwsRegion: awsRegion, + DynamoDBTableName: dynamoDBTableName, + ExpressionAttributeNames: expressionAttributeNames, + KeyConditionExpression: keyConditionExpression, + ExpressionAttributeValues: expressionAttributeValues, + }, []Template{ + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/scalers/aws/aws_dynamodb_streams_pod_identity/aws_dynamodb_streams_pod_identity_test.go b/tests/scalers/aws/aws_dynamodb_streams_pod_identity/aws_dynamodb_streams_pod_identity_test.go index f6e937f62ff..ad2b93db98b 100644 --- a/tests/scalers/aws/aws_dynamodb_streams_pod_identity/aws_dynamodb_streams_pod_identity_test.go +++ b/tests/scalers/aws/aws_dynamodb_streams_pod_identity/aws_dynamodb_streams_pod_identity_test.go @@ -91,7 +91,7 @@ metadata: namespace: {{.TestNamespace}} spec: podIdentity: - provider: aws-eks + provider: aws ` scaledObjectTemplate = ` diff --git a/tests/scalers/aws/aws_dynamodb_streams_pod_identity_eks/aws_dynamodb_streams_pod_identity_eks_test.go b/tests/scalers/aws/aws_dynamodb_streams_pod_identity_eks/aws_dynamodb_streams_pod_identity_eks_test.go new file mode 100644 index 00000000000..3d000d0db1d --- /dev/null +++ b/tests/scalers/aws/aws_dynamodb_streams_pod_identity_eks/aws_dynamodb_streams_pod_identity_eks_test.go @@ -0,0 +1,294 @@ +//go:build e2e +// +build e2e + +package aws_dynamodb_streams_pod_identity_eks_test + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "os" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + dynamodbTypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "aws-dynamodb-streams-pod-identity-eks-test" +) + +var ( + awsRegion = os.Getenv("TF_AWS_REGION") + awsAccessKey = os.Getenv("TF_AWS_ACCESS_KEY") + awsSecretKey = os.Getenv("TF_AWS_SECRET_KEY") + testNamespace = fmt.Sprintf("%s-ns", testName) + secretName = fmt.Sprintf("%s-secret", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + triggerAuthName = fmt.Sprintf("%s-ta", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + tableName = fmt.Sprintf("stream-identity-%d", GetRandomNumber()) + shardCount = 2 // default count + activationShardCount = 0 // default count +) + +type templateData struct { + TestNamespace string + SecretName string + AwsRegion string + AwsAccessKey string + AwsSecretKey string + DeploymentName string + TriggerAuthName string + ScaledObjectName string + TableName string + ShardCount int64 + ActivationShardCount int64 +} + +const ( + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: nginxinc/nginx-unprivileged +` + + triggerAuthTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthName}} + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws-eks +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + deploymentName: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 2 + minReplicaCount: 0 + pollingInterval: 5 # Optional. Default: 30 seconds + cooldownPeriod: 1 # Optional. Default: 300 seconds + triggers: + - type: aws-dynamodb-streams + authenticationRef: + name: {{.TriggerAuthName}} + metadata: + awsRegion: {{.AwsRegion}} # Required + tableName: {{.TableName}} # Required + shardCount: "{{.ShardCount}}" # Optional. Default: 2 + activationShardCount: "{{.ActivationShardCount}}" # Optional. Default: 0 + identityOwner: operator +` +) + +func TestScaler(t *testing.T) { + t.Log("--- setting up ---") + require.NotEmpty(t, awsAccessKey, "AWS_ACCESS_KEY env variable is required for dynamodb streams tests") + require.NotEmpty(t, awsSecretKey, "AWS_SECRET_KEY env variable is required for dynamodb streams tests") + data, templates := getTemplateData() + + // Create DynamoDB table and the latest stream Arn for the table + dbClient, dbStreamsClient := setupDynamoDBStreams(t) + streamArn, err := getLatestStreamArn(dbClient) + assert.NoErrorf(t, err, "cannot get latest stream arn for the table - %s", err) + time.Sleep(10 * time.Second) + + // Get Shard Count + shardCount, err := getDynamoDBStreamShardCount(dbStreamsClient, streamArn) + assert.True(t, shardCount >= 2, "dynamodb stream shard count should be 2 or higher - %s", err) + + // Deploy nginx, secret, and triggerAuth + kc := GetKubernetesClient(t) + CreateNamespace(t, kc, testNamespace) + KubectlApplyWithTemplate(t, data, "deploymentTemplate", deploymentTemplate) + KubectlApplyWithTemplate(t, data, "triggerAuthTemplate", triggerAuthTemplate) + + // Wait for nginx to load + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 30, 3), + "replica count should start out as 0") + + // test scaling + testActivation(t, kc, data) + testScaleOut(t, kc, data, shardCount) + testScaleIn(t, kc, data, shardCount) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) + cleanupDynamoDBTable(t, dbClient) +} + +func setupDynamoDBStreams(t *testing.T) (*dynamodb.Client, *dynamodbstreams.Client) { + var dbClient *dynamodb.Client + var dbStreamClient *dynamodbstreams.Client + + configOptions := make([]func(*config.LoadOptions) error, 0) + configOptions = append(configOptions, config.WithRegion(awsRegion)) + cfg, _ := config.LoadDefaultConfig(context.TODO(), configOptions...) + cfg.Credentials = credentials.NewStaticCredentialsProvider(awsAccessKey, awsSecretKey, "") + + dbClient = dynamodb.NewFromConfig(cfg) + dbStreamClient = dynamodbstreams.NewFromConfig(cfg) + + err := createTable(dbClient) + assert.NoErrorf(t, err, "cannot create dynamodb table - %s", err) + + return dbClient, dbStreamClient +} + +func createTable(db *dynamodb.Client) error { + keySchema := []dynamodbTypes.KeySchemaElement{ + { + AttributeName: aws.String("id"), + KeyType: dynamodbTypes.KeyTypeHash, + }, + } + attributeDefinitions := []dynamodbTypes.AttributeDefinition{ + { + AttributeName: aws.String("id"), + AttributeType: dynamodbTypes.ScalarAttributeTypeS, + }, + } + streamSpecification := &dynamodbTypes.StreamSpecification{ + StreamEnabled: aws.Bool(true), + StreamViewType: dynamodbTypes.StreamViewTypeNewImage, + } + _, err := db.CreateTable(context.Background(), &dynamodb.CreateTableInput{ + TableName: &tableName, + KeySchema: keySchema, + AttributeDefinitions: attributeDefinitions, + BillingMode: dynamodbTypes.BillingModePayPerRequest, + StreamSpecification: streamSpecification, + }) + return err +} + +func getLatestStreamArn(db *dynamodb.Client) (*string, error) { + input := dynamodb.DescribeTableInput{ + TableName: &tableName, + } + tableInfo, err := db.DescribeTable(context.Background(), &input) + if err != nil { + return nil, err + } + if nil == tableInfo.Table.LatestStreamArn { + return nil, errors.New("empty table stream arn") + } + return tableInfo.Table.LatestStreamArn, nil +} + +func getDynamoDBStreamShardCount(dbs *dynamodbstreams.Client, streamArn *string) (int64, error) { + input := dynamodbstreams.DescribeStreamInput{ + StreamArn: streamArn, + } + des, err := dbs.DescribeStream(context.Background(), &input) + if err != nil { + return -1, err + } + return int64(len(des.StreamDescription.Shards)), nil +} + +func getTemplateData() (templateData, []Template) { + base64AwsAccessKey := base64.StdEncoding.EncodeToString([]byte(awsAccessKey)) + base64AwsSecretKey := base64.StdEncoding.EncodeToString([]byte(awsSecretKey)) + + return templateData{ + TestNamespace: testNamespace, + SecretName: secretName, + AwsRegion: awsRegion, + AwsAccessKey: base64AwsAccessKey, + AwsSecretKey: base64AwsSecretKey, + DeploymentName: deploymentName, + TriggerAuthName: triggerAuthName, + ScaledObjectName: scaledObjectName, + TableName: tableName, + ShardCount: int64(shardCount), + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthTemplate", Config: triggerAuthTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing activation ---") + data.ActivationShardCount = 10 + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData, shardCount int64) { + t.Log("--- testing scale out ---") + // Deploy scalerObject with its target shardCount = the current dynamodb streams shard count and check if replicas scale out to 1 + t.Log("replicas should scale out to 1") + data.ShardCount = shardCount + data.ActivationShardCount = int64(activationShardCount) + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 180, 1), + "replica count should increase to 1") + + // Deploy scalerObject with its shardCount = 1 and check if replicas scale out to 2 (maxReplicaCount) + t.Log("then, replicas should scale out to 2") + data.ShardCount = 1 + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 180, 1), + "replica count should increase to 2") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, data templateData, shardCount int64) { + t.Log("--- testing scale in ---") + // Deploy scalerObject with its target shardCount = the current dynamodb streams shard count and check if replicas scale in to 1 + data.ShardCount = shardCount + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 330, 1), + "replica count should decrease to 1 in 330 seconds") +} + +func cleanupDynamoDBTable(t *testing.T, db *dynamodb.Client) { + t.Log("--- cleaning up ---") + _, err := db.DeleteTable(context.Background(), + &dynamodb.DeleteTableInput{ + TableName: &tableName, + }) + assert.NoErrorf(t, err, "cannot delete dynamodb table - %s", err) +} diff --git a/tests/scalers/aws/aws_kinesis_stream_pod_identity/aws_kinesis_stream_pod_identity_test.go b/tests/scalers/aws/aws_kinesis_stream_pod_identity/aws_kinesis_stream_pod_identity_test.go index 69bfd38de08..49c3fde5c26 100644 --- a/tests/scalers/aws/aws_kinesis_stream_pod_identity/aws_kinesis_stream_pod_identity_test.go +++ b/tests/scalers/aws/aws_kinesis_stream_pod_identity/aws_kinesis_stream_pod_identity_test.go @@ -49,7 +49,7 @@ metadata: namespace: {{.TestNamespace}} spec: podIdentity: - provider: aws-eks + provider: aws ` deploymentTemplate = ` diff --git a/tests/scalers/aws/aws_kinesis_stream_pod_identity_eks/aws_kinesis_stream_pod_identity_eks_test.go b/tests/scalers/aws/aws_kinesis_stream_pod_identity_eks/aws_kinesis_stream_pod_identity_eks_test.go new file mode 100644 index 00000000000..245770f98e6 --- /dev/null +++ b/tests/scalers/aws/aws_kinesis_stream_pod_identity_eks/aws_kinesis_stream_pod_identity_eks_test.go @@ -0,0 +1,239 @@ +//go:build e2e +// +build e2e + +package aws_kinesis_stream_pod_identity_eks_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "aws-kinesis-stream-pod-identity-eks-test" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + SecretName string + AwsAccessKeyID string + AwsSecretAccessKey string + AwsRegion string + KinesisStream string +} + +const ( + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws-eks +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 2 + minReplicaCount: 0 + cooldownPeriod: 1 + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: aws-kinesis-stream + authenticationRef: + name: keda-trigger-auth-aws-credentials + metadata: + awsRegion: {{.AwsRegion}} + streamName: {{.KinesisStream}} + shardCount: "3" + activationShardCount: "4" + identityOwner: operator +` +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + secretName = fmt.Sprintf("%s-secret", testName) + kinesisStreamName = fmt.Sprintf("kinesis-identity-%d", GetRandomNumber()) + awsAccessKeyID = os.Getenv("TF_AWS_ACCESS_KEY") + awsSecretAccessKey = os.Getenv("TF_AWS_SECRET_KEY") + awsRegion = os.Getenv("TF_AWS_REGION") + maxReplicaCount = 2 + minReplicaCount = 0 +) + +func TestKiensisScaler(t *testing.T) { + // setup kinesis + kinesisClient := createKinesisClient() + createKinesisStream(t, kinesisClient) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 1), + "replica count should be %d after 1 minute", minReplicaCount) + + // test scaling + testActivation(t, kc, kinesisClient) + testScaleOut(t, kc, kinesisClient) + testScaleIn(t, kc, kinesisClient) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) + cleanupStream(t, kinesisClient) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, kinesisClient *kinesis.Client) { + t.Log("--- testing activation ---") + updateShardCount(t, kinesisClient, 3) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, kinesisClient *kinesis.Client) { + t.Log("--- testing scale out ---") + updateShardCount(t, kinesisClient, 6) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, kinesisClient *kinesis.Client) { + t.Log("--- testing scale in ---") + updateShardCount(t, kinesisClient, 3) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +func updateShardCount(t *testing.T, kinesisClient *kinesis.Client, shardCount int64) { + done := waitForStreamActiveStatus(t, kinesisClient) + if done { + _, err := kinesisClient.UpdateShardCount(context.Background(), &kinesis.UpdateShardCountInput{ + StreamName: &kinesisStreamName, + TargetShardCount: aws.Int32(int32(shardCount)), + ScalingType: types.ScalingTypeUniformScaling, + }) + assert.NoErrorf(t, err, "cannot update shard count - %s", err) + } + assert.True(t, true, "failed to update shard count") +} + +func createKinesisStream(t *testing.T, kinesisClient *kinesis.Client) { + _, err := kinesisClient.CreateStream(context.Background(), &kinesis.CreateStreamInput{ + StreamName: &kinesisStreamName, + ShardCount: aws.Int32(2), + }) + assert.NoErrorf(t, err, "failed to create stream - %s", err) + done := waitForStreamActiveStatus(t, kinesisClient) + if !done { + assert.True(t, true, "failed to create kinesis") + } +} + +func waitForStreamActiveStatus(t *testing.T, kinesisClient *kinesis.Client) bool { + for i := 0; i < 30; i++ { + describe, _ := kinesisClient.DescribeStream(context.Background(), &kinesis.DescribeStreamInput{ + StreamName: &kinesisStreamName, + }) + t.Logf("Waiting for stream ACTIVE status. current status - %s", describe.StreamDescription.StreamStatus) + if describe.StreamDescription.StreamStatus == "ACTIVE" { + return true + } + time.Sleep(time.Second * 2) + } + return false +} + +func cleanupStream(t *testing.T, kinesisClient *kinesis.Client) { + t.Log("--- cleaning up ---") + _, err := kinesisClient.DeleteStream(context.Background(), &kinesis.DeleteStreamInput{ + StreamName: &kinesisStreamName, + }) + assert.NoErrorf(t, err, "cannot delete stream - %s", err) +} + +func createKinesisClient() *kinesis.Client { + configOptions := make([]func(*config.LoadOptions) error, 0) + configOptions = append(configOptions, config.WithRegion(awsRegion)) + cfg, _ := config.LoadDefaultConfig(context.TODO(), configOptions...) + cfg.Credentials = credentials.NewStaticCredentialsProvider(awsAccessKeyID, awsSecretAccessKey, "") + return kinesis.NewFromConfig(cfg) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + AwsAccessKeyID: base64.StdEncoding.EncodeToString([]byte(awsAccessKeyID)), + AwsSecretAccessKey: base64.StdEncoding.EncodeToString([]byte(awsSecretAccessKey)), + AwsRegion: awsRegion, + KinesisStream: kinesisStreamName, + }, []Template{ + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/scalers/aws/aws_sqs_queue_pod_identity/aws_sqs_queue_pod_identity_test.go b/tests/scalers/aws/aws_sqs_queue_pod_identity/aws_sqs_queue_pod_identity_test.go index 08f2a0b4dab..a47c20989a4 100644 --- a/tests/scalers/aws/aws_sqs_queue_pod_identity/aws_sqs_queue_pod_identity_test.go +++ b/tests/scalers/aws/aws_sqs_queue_pod_identity/aws_sqs_queue_pod_identity_test.go @@ -47,7 +47,7 @@ metadata: namespace: {{.TestNamespace}} spec: podIdentity: - provider: aws-eks + provider: aws ` deploymentTemplate = ` diff --git a/tests/scalers/aws/aws_sqs_queue_pod_identity_eks/aws_sqs_queue_pod_identity_eks_test.go b/tests/scalers/aws/aws_sqs_queue_pod_identity_eks/aws_sqs_queue_pod_identity_eks_test.go new file mode 100644 index 00000000000..0288c0a169f --- /dev/null +++ b/tests/scalers/aws/aws_sqs_queue_pod_identity_eks/aws_sqs_queue_pod_identity_eks_test.go @@ -0,0 +1,219 @@ +//go:build e2e +// +build e2e + +package aws_sqs_queue_pod_identity_eks_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "aws-sqs-queue-pod-identity-eks-test" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + SecretName string + AwsAccessKeyID string + AwsSecretAccessKey string + AwsRegion string + SqsQueue string +} + +const ( + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws-eks +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 2 + minReplicaCount: 0 + cooldownPeriod: 1 + triggers: + - type: aws-sqs-queue + authenticationRef: + name: keda-trigger-auth-aws-credentials + metadata: + awsRegion: {{.AwsRegion}} + queueURL: {{.SqsQueue}} + queueLength: "1" + activationQueueLength: "5" + identityOwner: operator +` +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + secretName = fmt.Sprintf("%s-secret", testName) + sqsQueueName = fmt.Sprintf("queue-identity-%d", GetRandomNumber()) + awsAccessKeyID = os.Getenv("TF_AWS_ACCESS_KEY") + awsSecretAccessKey = os.Getenv("TF_AWS_SECRET_KEY") + awsRegion = os.Getenv("TF_AWS_REGION") + maxReplicaCount = 2 + minReplicaCount = 0 +) + +func TestSqsScaler(t *testing.T) { + // setup SQS + sqsClient := createSqsClient() + queue := createSqsQueue(t, sqsClient) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData(*queue.QueueUrl) + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 1), + "replica count should be 0 after 1 minute") + + // test scaling + testActivation(t, kc, sqsClient, queue.QueueUrl) + testScaleOut(t, kc, sqsClient, queue.QueueUrl) + testScaleIn(t, kc, sqsClient, queue.QueueUrl) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) + cleanupQueue(t, sqsClient, queue.QueueUrl) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- testing activation ---") + addMessages(t, sqsClient, queueURL, 4) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- testing scale out ---") + addMessages(t, sqsClient, queueURL, 6) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 180, 1), + "replica count should be 2 after 3 minutes") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- testing scale in ---") + _, err := sqsClient.PurgeQueue(context.Background(), &sqs.PurgeQueueInput{ + QueueUrl: queueURL, + }) + assert.NoErrorf(t, err, "cannot clear queue - %s", err) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 180, 1), + "replica count should be 0 after 3 minutes") +} + +func addMessages(t *testing.T, sqsClient *sqs.Client, queueURL *string, messages int) { + for i := 0; i < messages; i++ { + msg := fmt.Sprintf("Message - %d", i) + _, err := sqsClient.SendMessage(context.Background(), &sqs.SendMessageInput{ + QueueUrl: queueURL, + MessageBody: aws.String(msg), + DelaySeconds: 10, + }) + assert.NoErrorf(t, err, "cannot send message - %s", err) + } +} + +func createSqsQueue(t *testing.T, sqsClient *sqs.Client) *sqs.CreateQueueOutput { + queue, err := sqsClient.CreateQueue(context.Background(), &sqs.CreateQueueInput{ + QueueName: &sqsQueueName, + Attributes: map[string]string{ + "DelaySeconds": "60", + "MessageRetentionPeriod": "86400", + }}) + assert.NoErrorf(t, err, "failed to create queue - %s", err) + return queue +} + +func cleanupQueue(t *testing.T, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- cleaning up ---") + _, err := sqsClient.DeleteQueue(context.Background(), &sqs.DeleteQueueInput{ + QueueUrl: queueURL, + }) + assert.NoErrorf(t, err, "cannot delete queue - %s", err) +} + +func createSqsClient() *sqs.Client { + configOptions := make([]func(*config.LoadOptions) error, 0) + configOptions = append(configOptions, config.WithRegion(awsRegion)) + cfg, _ := config.LoadDefaultConfig(context.TODO(), configOptions...) + cfg.Credentials = credentials.NewStaticCredentialsProvider(awsAccessKeyID, awsSecretAccessKey, "") + return sqs.NewFromConfig(cfg) +} + +func getTemplateData(sqsQueue string) (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + AwsAccessKeyID: base64.StdEncoding.EncodeToString([]byte(awsAccessKeyID)), + AwsSecretAccessKey: base64.StdEncoding.EncodeToString([]byte(awsSecretAccessKey)), + AwsRegion: awsRegion, + SqsQueue: sqsQueue, + }, []Template{ + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/secret-providers/aws_identity_assume_role/aws_identity_assume_role_test.go b/tests/secret-providers/aws_identity_assume_role/aws_identity_assume_role_test.go new file mode 100644 index 00000000000..79ef515685b --- /dev/null +++ b/tests/secret-providers/aws_identity_assume_role/aws_identity_assume_role_test.go @@ -0,0 +1,272 @@ +//go:build e2e +// +build e2e + +package aws_identity_assume_role_test + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "aws-identity-assume-role-test" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + SecretName string + AwsRegion string + RoleArn string + SqsQueue string +} + +const ( + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws +` + + triggerAuthTemplateWithRoleArn = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws + roleArn: {{.RoleArn}} +` + + triggerAuthTemplateWithIdentityOwner = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: aws + identityOwner: workload +` + + serviceAccountTemplate = `apiVersion: v1 +kind: ServiceAccount +metadata: + name: workload + namespace: {{.TestNamespace}} + annotations: + eks.amazonaws.com/role-arn: {{.RoleArn}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + serviceAccountName: workload + containers: + - name: nginx + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 1 + minReplicaCount: 0 + pollingInterval: 5 + cooldownPeriod: 1 + triggers: + - type: aws-sqs-queue + authenticationRef: + name: keda-trigger-auth-aws-credentials + metadata: + awsRegion: {{.AwsRegion}} + queueURL: {{.SqsQueue}} + queueLength: "1" +` +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + secretName = fmt.Sprintf("%s-secret", testName) + sqsQueueName = fmt.Sprintf("asume-role-queue-%d", GetRandomNumber()) + awsAccessKeyID = os.Getenv("TF_AWS_ACCESS_KEY") + awsSecretAccessKey = os.Getenv("TF_AWS_SECRET_KEY") + awsRegion = os.Getenv("TF_AWS_REGION") + awsRoleArn = os.Getenv("TF_AWS_WORKLOAD_ROLE") + maxReplicaCount = 1 + minReplicaCount = 0 +) + +func TestSqsScaler(t *testing.T) { + // setup SQS + sqsClient := createSqsClient() + queue := createSqsQueue(t, sqsClient) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData(*queue.QueueUrl) + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 1), + "replica count should be 0 after 1 minute") + + // test scaling using KEDA identity + testScaleWithKEDAIdentity(t, kc, data, sqsClient, queue.QueueUrl) + // test scaling using correct identity provided via podIdentity.RoleArn + testScaleWithExplicitRoleArn(t, kc, data, sqsClient, queue.QueueUrl) + // test scaling using correct identity provided via workload + testScaleWithWorkloadArn(t, kc, data, sqsClient, queue.QueueUrl) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) + cleanupQueue(t, sqsClient, queue.QueueUrl) +} + +// testScaleWithKEDAIdentity checks that we don't scale out because KEDA identity +// doesn't have access to the queue, so even though there are messages, the workload +// won't scale +func testScaleWithKEDAIdentity(t *testing.T, kc *kubernetes.Clientset, data templateData, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- testing scalig out with KEDA role ---") + addMessages(t, sqsClient, queueURL, 10) + // replicas shouldn't change + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) + testScaleIn(t, kc, sqsClient, queueURL) + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + KubectlDeleteWithTemplate(t, data, "triggerAuthTemplate", triggerAuthenticationTemplate) +} + +func testScaleWithExplicitRoleArn(t *testing.T, kc *kubernetes.Clientset, data templateData, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- testing scalig out with explicit arn role ---") + KubectlApplyWithTemplate(t, data, "triggerAuthTemplateWithIdentityID", triggerAuthTemplateWithRoleArn) + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + addMessages(t, sqsClient, queueURL, 10) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 180, 1), + "replica count should be 2 after 3 minutes") + testScaleIn(t, kc, sqsClient, queueURL) + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + KubectlDeleteWithTemplate(t, data, "triggerAuthTemplate", triggerAuthTemplateWithRoleArn) +} + +func testScaleWithWorkloadArn(t *testing.T, kc *kubernetes.Clientset, data templateData, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- testing scalig out with workload arn role ---") + KubectlApplyWithTemplate(t, data, "triggerAuthTemplateWithIdentityID", triggerAuthTemplateWithIdentityOwner) + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + addMessages(t, sqsClient, queueURL, 10) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 180, 1), + "replica count should be 2 after 3 minutes") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- testing scalig in ---") + _, err := sqsClient.PurgeQueue(context.Background(), &sqs.PurgeQueueInput{ + QueueUrl: queueURL, + }) + assert.NoErrorf(t, err, "cannot clear queue - %s", err) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 180, 1), + "replica count should be 0 after 3 minutes") +} + +func addMessages(t *testing.T, sqsClient *sqs.Client, queueURL *string, messages int) { + for i := 0; i < messages; i++ { + msg := fmt.Sprintf("Message - %d", i) + _, err := sqsClient.SendMessage(context.Background(), &sqs.SendMessageInput{ + QueueUrl: queueURL, + MessageBody: aws.String(msg), + DelaySeconds: 10, + }) + assert.NoErrorf(t, err, "cannot send message - %s", err) + } +} + +func createSqsQueue(t *testing.T, sqsClient *sqs.Client) *sqs.CreateQueueOutput { + queue, err := sqsClient.CreateQueue(context.Background(), &sqs.CreateQueueInput{ + QueueName: &sqsQueueName, + Attributes: map[string]string{ + "DelaySeconds": "60", + "MessageRetentionPeriod": "86400", + }}) + assert.NoErrorf(t, err, "failed to create queue - %s", err) + return queue +} + +func cleanupQueue(t *testing.T, sqsClient *sqs.Client, queueURL *string) { + t.Log("--- cleaning up ---") + _, err := sqsClient.DeleteQueue(context.Background(), &sqs.DeleteQueueInput{ + QueueUrl: queueURL, + }) + assert.NoErrorf(t, err, "cannot delete queue - %s", err) +} + +func createSqsClient() *sqs.Client { + configOptions := make([]func(*config.LoadOptions) error, 0) + configOptions = append(configOptions, config.WithRegion(awsRegion)) + cfg, _ := config.LoadDefaultConfig(context.TODO(), configOptions...) + cfg.Credentials = credentials.NewStaticCredentialsProvider(awsAccessKeyID, awsSecretAccessKey, "") + return sqs.NewFromConfig(cfg) +} + +func getTemplateData(sqsQueue string) (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + AwsRegion: awsRegion, + RoleArn: awsRoleArn, + SqsQueue: sqsQueue, + }, []Template{ + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "serviceAccountTemplate", Config: serviceAccountTemplate}, + } +}