Skip to content

Commit

Permalink
Update metric name generation in AWS scalers
Browse files Browse the repository at this point in the history
Signed-off-by: jorturfer <jorge_turrado@hotmail.es>
  • Loading branch information
JorTurFer committed Oct 6, 2021
1 parent 7a9b018 commit f931c75
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 51 deletions.
12 changes: 9 additions & 3 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"

logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
Expand Down Expand Up @@ -49,6 +48,8 @@ type awsCloudwatchMetadata struct {
awsRegion string

awsAuthorization awsAuthorizationMetadata

externalMetricName string
}

var cloudwatchLog = logf.Log.WithName("aws_cloudwatch_scaler")
Expand Down Expand Up @@ -189,6 +190,11 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e

meta.awsAuthorization = auth

meta.externalMetricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", "aws-cloudwatch", meta.namespace, meta.dimensionName[0], meta.dimensionValue[0]))

// Update externalMetricName with the index
meta.externalMetricName = GenerateMetricNameWithIndex(config, meta.externalMetricName)

return meta, nil
}

Expand All @@ -213,7 +219,7 @@ func (c *awsCloudwatchScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(c.metadata.targetMetricValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", "aws-cloudwatch", c.metadata.namespace, c.metadata.dimensionName[0], c.metadata.dimensionValue[0])),
Name: c.metadata.externalMetricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/aws_cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type parseAWSCloudwatchMetadataTestData struct {

type awsCloudwatchMetricIdentifier struct {
metadataTestData *parseAWSCloudwatchMetadataTestData
scalerIndex int
name string
}

Expand Down Expand Up @@ -233,7 +234,8 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
}

var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{
{&testAWSCloudwatchMetadata[1], "aws-cloudwatch-AWS-SQS-QueueName-keda"},
{&testAWSCloudwatchMetadata[1], 0, "s0-aws-cloudwatch-AWS-SQS-QueueName-keda"},
{&testAWSCloudwatchMetadata[1], 3, "s3-aws-cloudwatch-AWS-SQS-QueueName-keda"},
}

func TestCloudwatchParseMetadata(t *testing.T) {
Expand All @@ -250,7 +252,7 @@ func TestCloudwatchParseMetadata(t *testing.T) {

func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsCloudwatchMetricIdentifiers {
meta, err := parseAwsCloudwatchMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSCloudwatchResolvedEnv, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsCloudwatchMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSCloudwatchResolvedEnv, AuthParams: testData.metadataTestData.authParams, Index: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ type awsKinesisStreamScaler struct {
}

type awsKinesisStreamMetadata struct {
targetShardCount int
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
targetShardCount int
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
externalMetricName string
}

var kinesisStreamLog = logf.Log.WithName("aws_kinesis_stream_scaler")
Expand Down Expand Up @@ -83,6 +84,11 @@ func parseAwsKinesisStreamMetadata(config *ScalerConfig) (*awsKinesisStreamMetad

meta.awsAuthorization = auth

meta.externalMetricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-Kinesis-Stream", meta.streamName))

// Update externalMetricName with the index
meta.externalMetricName = GenerateMetricNameWithIndex(config, meta.externalMetricName)

return &meta, nil
}

Expand All @@ -105,7 +111,7 @@ func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec
targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-Kinesis-Stream", s.metadata.streamName)),
Name: s.metadata.externalMetricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
91 changes: 58 additions & 33 deletions pkg/scalers/aws_kinesis_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ var testAWSKinesisAuthentication = map[string]string{
}

type parseAWSKinesisMetadataTestData struct {
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
scalerIndex int
}

type awsKinesisMetricIdentifier struct {
metadataTestData *parseAWSKinesisMetadataTestData
scalerIndex int
name string
}

Expand All @@ -53,27 +55,34 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
externalMetricName: "s0-AWS-Kinesis-Stream-test",
},
isError: false,
comment: "properly formed stream name and region"},
isError: false,
comment: "properly formed stream name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"streamName": "",
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name"},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name",
scalerIndex: 1,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": ""},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region"},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region",
scalerIndex: 2,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -89,9 +98,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
externalMetricName: "s3-AWS-Kinesis-Stream-test",
},
isError: false,
comment: "properly formed stream name and region, empty shard count"},
isError: false,
comment: "properly formed stream name and region, empty shard count",
scalerIndex: 3,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -107,10 +119,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
externalMetricName: "s4-AWS-Kinesis-Stream-test",
},
isError: false,
comment: "properly formed stream name and region, wrong shard count"},

isError: false,
comment: "properly formed stream name and region, wrong shard count",
scalerIndex: 4,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -120,9 +134,11 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
"awsAccessKeyID": "",
"awsSecretAccessKey": testAWSKinesisSecretAccessKey,
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id"},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id",
scalerIndex: 5,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -131,9 +147,11 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
"awsAccessKeyID": testAWSKinesisAccessKeyID,
"awsSecretAccessKey": "",
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key"},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key",
scalerIndex: 6,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -149,9 +167,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsRoleArn: testAWSKinesisRoleArn,
podIdentityOwner: true,
},
externalMetricName: "s7-AWS-Kinesis-Stream-test",
},
isError: false,
comment: "with AWS Role from TriggerAuthentication"},
isError: false,
comment: "with AWS Role from TriggerAuthentication",
scalerIndex: 7,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -165,18 +186,22 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsAuthorization: awsAuthorizationMetadata{
podIdentityOwner: false,
},
externalMetricName: "s8-AWS-Kinesis-Stream-test",
},
isError: false,
comment: "with AWS Role assigned on KEDA operator itself"},
isError: false,
comment: "with AWS Role assigned on KEDA operator itself",
scalerIndex: 8,
},
}

var awsKinesisMetricIdentifiers = []awsKinesisMetricIdentifier{
{&testAWSKinesisMetadata[1], "AWS-Kinesis-Stream-test"},
{&testAWSKinesisMetadata[1], 0, "s0-AWS-Kinesis-Stream-test"},
{&testAWSKinesisMetadata[1], 1, "s1-AWS-Kinesis-Stream-test"},
}

func TestKinesisParseMetadata(t *testing.T) {
for _, testData := range testAWSKinesisMetadata {
result, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams})
result, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams, Index: testData.scalerIndex})
if err != nil && !testData.isError {
t.Errorf("Expected success because %s got error, %s", testData.comment, err)
}
Expand All @@ -192,7 +217,7 @@ func TestKinesisParseMetadata(t *testing.T) {

func TestAWSKinesisGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsKinesisMetricIdentifiers {
meta, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams, Index: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ type awsSqsQueueScaler struct {
}

type awsSqsQueueMetadata struct {
targetQueueLength int
queueURL string
queueName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
targetQueueLength int
queueURL string
queueName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
externalMetricName string
}

// NewAwsSqsQueueScaler creates a new awsSqsQueueScaler
Expand Down Expand Up @@ -105,6 +106,11 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig) (*awsSqsQueueMetadata, error

meta.awsAuthorization = auth

meta.externalMetricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-SQS-Queue", meta.queueName))

// Update externalMetricName with the index
meta.externalMetricName = GenerateMetricNameWithIndex(config, meta.externalMetricName)

return &meta, nil
}

Expand All @@ -127,7 +133,7 @@ func (s *awsSqsQueueScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-SQS-Queue", s.metadata.queueName)),
Name: s.metadata.externalMetricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/aws_sqs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type parseAWSSQSMetadataTestData struct {

type awsSQSMetricIdentifier struct {
metadataTestData *parseAWSSQSMetadataTestData
scalerIndex int
name string
}

Expand Down Expand Up @@ -131,7 +132,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{
}

var awsSQSMetricIdentifiers = []awsSQSMetricIdentifier{
{&testAWSSQSMetadata[1], "AWS-SQS-Queue-DeleteArtifactQ"},
{&testAWSSQSMetadata[1], 0, "s0-AWS-SQS-Queue-DeleteArtifactQ"},
{&testAWSSQSMetadata[1], 1, "s1-AWS-SQS-Queue-DeleteArtifactQ"},
}

func TestSQSParseMetadata(t *testing.T) {
Expand All @@ -148,7 +150,7 @@ func TestSQSParseMetadata(t *testing.T) {

func TestAWSSQSGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsSQSMetricIdentifiers {
meta, err := parseAwsSqsQueueMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSSQSAuthentication, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsSqsQueueMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSSQSAuthentication, AuthParams: testData.metadataTestData.authParams, Index: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down

0 comments on commit f931c75

Please sign in to comment.