Skip to content

Commit

Permalink
Improve metric name creation to be unique using scaler index inside t…
Browse files Browse the repository at this point in the history
…he scaler (kedacore#2161)

Signed-off-by: Jorge Turrado <jorge.turrado@docplanner.com>
Signed-off-by: nilayasiktoprak <nilayasiktoprak@gmail.com>
  • Loading branch information
Jorge Turrado Ferrero authored and nilayasiktoprak committed Oct 23, 2021
1 parent 24e3e62 commit 1ce3d88
Show file tree
Hide file tree
Showing 72 changed files with 472 additions and 247 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
- Add support to provide the metric name in Azure Log Analytics Scaler ([#2106](https://github.com/kedacore/keda/pull/2106))
- Add `pageSize` (using regex) in RabbitMQ Scaler ([#2162](https://github.com/kedacore/keda/pull/2162))
- Add `unsafeSsl` parameter in InfluxDB scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Improve metric name creation to be unique using scaler index inside the scaler ([#2161](https://github.com/kedacore/keda/pull/2161))

### Breaking Changes

Expand Down
25 changes: 25 additions & 0 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,31 @@ The return type of this function is `MetricSpec`, but in KEDA's case we will mos
- `TargetValue`: is the value of the metric we want to reach at all times at all costs. As long as the current metric doesn't match TargetValue, HPA will increase the number of the pods until it reaches the maximum number of pods allowed to scale to.
- `TargetAverageValue`: the value of the metric for which we require one pod to handle. e.g. if we are have a scaler based on the length of a message queue, and we specificy 10 for `TargetAverageValue`, we are saying that each pod will handle 10 messages. So if the length of the queue becomes 30, we expect that we have 3 pods in our cluster. (`TargetAverage` and `TargetValue` are mutually exclusive)

All scalers receive a parameter named `scalerIndex` as part of `ScalerConfig`. This value is the index of the current scaler in a ScaledObject. All metric names have to start with `sX-` (where `X` is `scalerIndex`). This convention makes the metric name unique in the ScaledObject and brings the option to have more than 1 "similar metric name" defined in a ScaledObject.

For example:
- s0-redis-mylist
- s1-redis-mylist

>**Note:** There is a naming helper function `GenerateMetricNameWithIndex(scalerIndex int, metricName string)`, that receives the current index and the original metric name (without the prefix) and returns the concatenated string using the convention (please use this function).<br>Next lines are an example about how to use it:
>```golang
>func (s *artemisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
> targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
> externalMetric := &v2beta2.ExternalMetricSource{
> Metric: v2beta2.MetricIdentifier{
> Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "artemis", s.metadata.brokerName, s.metadata.queueName))),
> },
> Target: v2beta2.MetricTarget{
> Type: v2beta2.AverageValueMetricType,
> AverageValue: targetMetricValue,
> },
> }
> metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: artemisMetricType}
> return []v2beta2.MetricSpec{metricSpec}
>}
>```
### IsActive
For some reason, the scaler might need to declare itself as in-active, and the way it can do this is through implementing the function `IsActive`.
Expand Down
6 changes: 3 additions & 3 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ var _ = Describe("ScaledObjectController", func() {
return k8sClient.Get(context.Background(), types.NamespacedName{Name: "keda-hpa-clean-up-test", Namespace: "default"}, hpa)
}).ShouldNot(HaveOccurred())
Expect(hpa.Spec.Metrics).To(HaveLen(2))
Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("cron-UTC-0xxxx-1xxxx"))
Expect(hpa.Spec.Metrics[1].External.Metric.Name).To(Equal("cron-UTC-2xxxx-3xxxx"))
Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("s0-cron-UTC-0xxxx-1xxxx"))
Expect(hpa.Spec.Metrics[1].External.Metric.Name).To(Equal("s1-cron-UTC-2xxxx-3xxxx"))

// Remove the second trigger.
Eventually(func() error {
Expand All @@ -263,7 +263,7 @@ var _ = Describe("ScaledObjectController", func() {
return len(hpa.Spec.Metrics)
}).Should(Equal(1))
// And it should only be the first one left.
Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("cron-UTC-0xxxx-1xxxx"))
Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("s0-cron-UTC-0xxxx-1xxxx"))
})

It("deploys ScaledObject and creates HPA, when IdleReplicaCount, MinReplicaCount and MaxReplicaCount is defined", func() {
Expand Down
6 changes: 5 additions & 1 deletion pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type artemisMetadata struct {
restAPITemplate string
queueLength int
corsHeader string
scalerIndex int
}

//revive:enable:var-naming
Expand Down Expand Up @@ -153,6 +154,9 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) {
if meta.password == "" {
return nil, fmt.Errorf("password cannot be empty")
}

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

Expand Down Expand Up @@ -250,7 +254,7 @@ func (s *artemisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "artemis", s.metadata.brokerName, s.metadata.queueName)),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "artemis", s.metadata.brokerName, s.metadata.queueName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/artemis_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type parseArtemisMetadataTestData struct {

type artemisMetricIdentifier struct {
metadataTestData *parseArtemisMetadataTestData
scalerIndex int
name string
}

Expand Down Expand Up @@ -59,7 +60,8 @@ var testArtemisMetadata = []parseArtemisMetadataTestData{
}

var artemisMetricIdentifiers = []artemisMetricIdentifier{
{&testArtemisMetadata[7], "artemis-broker-activemq-queue1"},
{&testArtemisMetadata[7], 0, "s0-artemis-broker-activemq-queue1"},
{&testArtemisMetadata[7], 1, "s1-artemis-broker-activemq-queue1"},
}

var testArtemisMetadataWithEmptyAuthParams = []parseArtemisMetadataTestData{
Expand Down Expand Up @@ -141,7 +143,7 @@ func TestArtemisParseMetadata(t *testing.T) {

func TestArtemisGetMetricSpecForScaling(t *testing.T) {
for _, testData := range artemisMetricIdentifiers {
meta, err := parseArtemisMetadata(&ScalerConfig{ResolvedEnv: sampleArtemisResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil})
meta, err := parseArtemisMetadata(&ScalerConfig{ResolvedEnv: sampleArtemisResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type awsCloudwatchMetadata struct {
awsRegion string

awsAuthorization awsAuthorizationMetadata

scalerIndex int
}

var cloudwatchLog = logf.Log.WithName("aws_cloudwatch_scaler")
Expand Down Expand Up @@ -189,6 +191,8 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return meta, nil
}

Expand All @@ -213,7 +217,7 @@ func (c *awsCloudwatchScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(c.metadata.targetMetricValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", "aws-cloudwatch", c.metadata.namespace, c.metadata.dimensionName[0], c.metadata.dimensionValue[0])),
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", "aws-cloudwatch", c.metadata.namespace, c.metadata.dimensionName[0], c.metadata.dimensionValue[0]))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/aws_cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type parseAWSCloudwatchMetadataTestData struct {

type awsCloudwatchMetricIdentifier struct {
metadataTestData *parseAWSCloudwatchMetadataTestData
scalerIndex int
name string
}

Expand Down Expand Up @@ -233,7 +234,8 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
}

var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{
{&testAWSCloudwatchMetadata[1], "aws-cloudwatch-AWS-SQS-QueueName-keda"},
{&testAWSCloudwatchMetadata[1], 0, "s0-aws-cloudwatch-AWS-SQS-QueueName-keda"},
{&testAWSCloudwatchMetadata[1], 3, "s3-aws-cloudwatch-AWS-SQS-QueueName-keda"},
}

func TestCloudwatchParseMetadata(t *testing.T) {
Expand All @@ -250,7 +252,7 @@ func TestCloudwatchParseMetadata(t *testing.T) {

func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsCloudwatchMetricIdentifiers {
meta, err := parseAwsCloudwatchMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSCloudwatchResolvedEnv, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsCloudwatchMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSCloudwatchResolvedEnv, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type awsKinesisStreamMetadata struct {
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
}

var kinesisStreamLog = logf.Log.WithName("aws_kinesis_stream_scaler")
Expand Down Expand Up @@ -83,6 +84,8 @@ func parseAwsKinesisStreamMetadata(config *ScalerConfig) (*awsKinesisStreamMetad

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

Expand All @@ -105,7 +108,7 @@ func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec
targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-Kinesis-Stream", s.metadata.streamName)),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-Kinesis-Stream", s.metadata.streamName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
91 changes: 58 additions & 33 deletions pkg/scalers/aws_kinesis_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ var testAWSKinesisAuthentication = map[string]string{
}

type parseAWSKinesisMetadataTestData struct {
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
scalerIndex int
}

type awsKinesisMetricIdentifier struct {
metadataTestData *parseAWSKinesisMetadataTestData
scalerIndex int
name string
}

Expand All @@ -53,27 +55,34 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 0,
},
isError: false,
comment: "properly formed stream name and region"},
isError: false,
comment: "properly formed stream name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"streamName": "",
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name"},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name",
scalerIndex: 1,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": ""},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region"},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region",
scalerIndex: 2,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -89,9 +98,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 3,
},
isError: false,
comment: "properly formed stream name and region, empty shard count"},
isError: false,
comment: "properly formed stream name and region, empty shard count",
scalerIndex: 3,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -107,10 +119,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 4,
},
isError: false,
comment: "properly formed stream name and region, wrong shard count"},

isError: false,
comment: "properly formed stream name and region, wrong shard count",
scalerIndex: 4,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -120,9 +134,11 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
"awsAccessKeyID": "",
"awsSecretAccessKey": testAWSKinesisSecretAccessKey,
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id"},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id",
scalerIndex: 5,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -131,9 +147,11 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
"awsAccessKeyID": testAWSKinesisAccessKeyID,
"awsSecretAccessKey": "",
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key"},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key",
scalerIndex: 6,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -149,9 +167,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsRoleArn: testAWSKinesisRoleArn,
podIdentityOwner: true,
},
scalerIndex: 7,
},
isError: false,
comment: "with AWS Role from TriggerAuthentication"},
isError: false,
comment: "with AWS Role from TriggerAuthentication",
scalerIndex: 7,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -165,18 +186,22 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsAuthorization: awsAuthorizationMetadata{
podIdentityOwner: false,
},
scalerIndex: 8,
},
isError: false,
comment: "with AWS Role assigned on KEDA operator itself"},
isError: false,
comment: "with AWS Role assigned on KEDA operator itself",
scalerIndex: 8,
},
}

var awsKinesisMetricIdentifiers = []awsKinesisMetricIdentifier{
{&testAWSKinesisMetadata[1], "AWS-Kinesis-Stream-test"},
{&testAWSKinesisMetadata[1], 0, "s0-AWS-Kinesis-Stream-test"},
{&testAWSKinesisMetadata[1], 1, "s1-AWS-Kinesis-Stream-test"},
}

func TestKinesisParseMetadata(t *testing.T) {
for _, testData := range testAWSKinesisMetadata {
result, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams})
result, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams, ScalerIndex: testData.scalerIndex})
if err != nil && !testData.isError {
t.Errorf("Expected success because %s got error, %s", testData.comment, err)
}
Expand All @@ -192,7 +217,7 @@ func TestKinesisParseMetadata(t *testing.T) {

func TestAWSKinesisGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsKinesisMetricIdentifiers {
meta, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type awsSqsQueueMetadata struct {
queueName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
}

// NewAwsSqsQueueScaler creates a new awsSqsQueueScaler
Expand Down Expand Up @@ -105,6 +106,8 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig) (*awsSqsQueueMetadata, error

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

Expand All @@ -127,7 +130,7 @@ func (s *awsSqsQueueScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-SQS-Queue", s.metadata.queueName)),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-SQS-Queue", s.metadata.queueName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
Loading

0 comments on commit 1ce3d88

Please sign in to comment.