Skip to content

Commit

Permalink
feat(aws-sqs): Support for scaling to include delayed messages (kedac…
Browse files Browse the repository at this point in the history
  • Loading branch information
phr3nzii authored and toniiiik committed Jan 15, 2024
1 parent 42208b6 commit d3d515c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 23 deletions.
7 changes: 1 addition & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,9 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

## Unreleased

- Azure Devops pipeline workload identity/SPN support

- TODO: tests
- TODO: documentation

### New

- TODO ([#4853](https://github.com/kedacore/keda/issues/4853))
- **AWS SQS Scaler**: Support for scaling to include delayed messages. [#4377](https://github.com/kedacore/keda/issues/4377)

### Improvements

Expand Down
30 changes: 18 additions & 12 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ const (
targetQueueLengthDefault = 5
activationTargetQueueLengthDefault = 0
defaultScaleOnInFlight = true
defaultScaleOnDelayed = false
)

var awsSqsQueueMetricNamesForScalingInFlight = []string{
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
}

var awsSqsQueueMetricNamesForNotScalingInFlight = []string{
"ApproximateNumberOfMessages",
}

type awsSqsQueueScaler struct {
metricType v2.MetricTargetType
metadata *awsSqsQueueMetadata
Expand All @@ -49,6 +41,7 @@ type awsSqsQueueMetadata struct {
awsAuthorization awsAuthorizationMetadata
scalerIndex int
scaleOnInFlight bool
scaleOnDelayed bool
awsSqsQueueMetricNames []string
}

Expand Down Expand Up @@ -78,6 +71,7 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs
meta := awsSqsQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength
meta.scaleOnInFlight = defaultScaleOnInFlight
meta.scaleOnDelayed = defaultScaleOnDelayed

if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" {
queueLength, err := strconv.ParseInt(val, 10, 64)
Expand Down Expand Up @@ -109,10 +103,22 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs
}
}

if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" {
scaleOnDelayed, err := strconv.ParseBool(val)
if err != nil {
meta.scaleOnDelayed = defaultScaleOnDelayed
logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed)
} else {
meta.scaleOnDelayed = scaleOnDelayed
}
}

meta.awsSqsQueueMetricNames = []string{"ApproximateNumberOfMessages"}
if meta.scaleOnInFlight {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight
} else {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight
meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesNotVisible")
}
if meta.scaleOnDelayed {
meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesDelayed")
}

if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" {
Expand Down
61 changes: 56 additions & 5 deletions pkg/scalers/aws_sqs_queue_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"errors"
"strconv"
"testing"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -25,6 +26,10 @@ const (

testAWSSQSErrorQueueURL = "https://sqs.eu-west-1.amazonaws.com/account_id/Error"
testAWSSQSBadDataQueueURL = "https://sqs.eu-west-1.amazonaws.com/account_id/BadData"

testAWSSQSApproximateNumberOfMessagesVisible = 200
testAWSSQSApproximateNumberOfMessagesNotVisible = 100
testAWSSQSApproximateNumberOfMessagesDelayed = 50
)

var testAWSSQSEmptyResolvedEnv = map[string]string{}
Expand Down Expand Up @@ -65,14 +70,16 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G
Attributes: map[string]*string{
"ApproximateNumberOfMessages": aws.String("NotInt"),
"ApproximateNumberOfMessagesNotVisible": aws.String("NotInt"),
"ApproximateNumberOfMessagesDelayed": aws.String("NotInt"),
},
}, nil
}

return &sqs.GetQueueAttributesOutput{
Attributes: map[string]*string{
"ApproximateNumberOfMessages": aws.String("200"),
"ApproximateNumberOfMessagesNotVisible": aws.String("100"),
"ApproximateNumberOfMessages": aws.String(strconv.Itoa(testAWSSQSApproximateNumberOfMessagesVisible)),
"ApproximateNumberOfMessagesNotVisible": aws.String(strconv.Itoa(testAWSSQSApproximateNumberOfMessagesNotVisible)),
"ApproximateNumberOfMessagesDelayed": aws.String(strconv.Itoa(testAWSSQSApproximateNumberOfMessagesDelayed)),
},
}, nil
}
Expand Down Expand Up @@ -326,6 +333,44 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnInFlight enabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnDelayed": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnDelayed disabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnDelayed": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnDelayed enabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "false",
"scaleOnDelayed": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaledOnInFlight and scaleOnDelayed disabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "true",
"scaleOnDelayed": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaledOnInFlight and scaleOnDelayed enabled"},
{map[string]string{
"queueURL": testAWSSQSErrorQueueURL,
"queueLength": "1",
Expand Down Expand Up @@ -390,11 +435,17 @@ func TestAWSSQSScalerGetMetrics(t *testing.T) {
case testAWSSQSBadDataQueueURL:
assert.Error(t, err, "expect error because of bad data return from sqs")
default:
expectedMessages := testAWSSQSApproximateNumberOfMessagesVisible

if meta.scaleOnInFlight {
assert.EqualValues(t, int64(300.0), value[0].Value.Value())
} else {
assert.EqualValues(t, int64(200.0), value[0].Value.Value())
expectedMessages += testAWSSQSApproximateNumberOfMessagesNotVisible
}

if meta.scaleOnDelayed {
expectedMessages += testAWSSQSApproximateNumberOfMessagesDelayed
}

assert.EqualValues(t, int64(expectedMessages), value[0].Value.Value())
}
}
}

0 comments on commit d3d515c

Please sign in to comment.