Skip to content

Commit

Permalink
Update GetAzureQueueLength in azure queue scaler, to support differen…
Browse files Browse the repository at this point in the history
…t queue length strategies.

Signed-off-by: Leonardo D'Ippolito <leodip@outlook.com>
  • Loading branch information
leodip committed Jun 9, 2024
1 parent fd4fcc3 commit 7e82da9
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features:
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478))

### Fixes

Expand Down
38 changes: 37 additions & 1 deletion pkg/scalers/azure/azure_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ package azure

import (
"context"
"strings"

"github.com/Azure/azure-storage-queue-go/azqueue"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

const (
maxPeekMessages int32 = 32
)

// GetAzureQueueLength returns the length of a queue in int, see https://learn.microsoft.com/en-us/azure/storage/queues/storage-dotnet-how-to-use-queues?tabs=dotnet#get-the-queue-length
func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix string) (int64, error) {
func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix, queueLengthStrategy string) (int64, error) {
credential, endpoint, err := ParseAzureStorageQueueConnection(ctx, podIdentity, connectionString, accountName, endpointSuffix)
if err != nil {
return -1, err
Expand All @@ -35,10 +40,41 @@ func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodId
serviceURL := azqueue.NewServiceURL(*endpoint, p)
queueURL := serviceURL.NewQueueURL(queueName)

strategy := strings.ToLower(queueLengthStrategy)
if strategy == "visibleonly" {
visibleMessageCount, err := getVisibleCount(ctx, &queueURL, maxPeekMessages)
if err != nil {
return -1, err
}

// Queue has less messages than we allowed to peek for, so no need to get the approximation
if visibleMessageCount < int64(maxPeekMessages) {
return visibleMessageCount, nil
}

props, err := queueURL.GetProperties(ctx)
if err != nil {
return -1, err
}

return int64(props.ApproximateMessagesCount()), nil
}

// Default strategy (visible + invisible messages)
props, err := queueURL.GetProperties(ctx)
if err != nil {
return -1, err
}

return int64(props.ApproximateMessagesCount()), nil
}

func getVisibleCount(ctx context.Context, queueURL *azqueue.QueueURL, maxCount int32) (int64, error) {
messagesURL := queueURL.NewMessagesURL()
queue, err := messagesURL.Peek(ctx, maxCount)
if err != nil {
return 0, err
}
num := queue.NumMessages()
return int64(num), nil
}
44 changes: 17 additions & 27 deletions pkg/scalers/azure/azure_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,24 @@ import (
"testing"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/stretchr/testify/assert"
)

func TestGetQueueLength(t *testing.T) {
length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "")
if length != -1 {
t.Error("Expected length to be -1, but got", length)
}

if err == nil {
t.Error("Expected error for empty connection string, but got nil")
}

if !errors.Is(err, ErrAzureConnectionStringKeyName) {
t.Error("Expected error to contain parsing error message, but got", err.Error())
}

length, err = GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "")

if length != -1 {
t.Error("Expected length to be -1, but got", length)
}

if err == nil {
t.Error("Expected error for empty connection string, but got nil")
}

var base64Error base64.CorruptInputError
if !errors.As(err, &base64Error) {
t.Error("Expected error to contain base64 error message, but got", err.Error())
}
t.Run("Empty connection string", func(t *testing.T) {
length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "", "")

assert.Equal(t, int64(-1), length, "Expected length to be -1")
assert.NotNil(t, err, "Expected error for empty connection string")
assert.True(t, errors.Is(err, ErrAzureConnectionStringKeyName), "Expected error to contain parsing error message")
})

t.Run("Invalid base64 connection string", func(t *testing.T) {
length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "", "")

assert.Equal(t, int64(-1), length, "Expected length to be -1")
assert.NotNil(t, err, "Expected error for invalid connection string")
var base64Error base64.CorruptInputError
assert.True(t, errors.As(err, &base64Error), "Expected error to contain base64 error message")
})
}
14 changes: 14 additions & 0 deletions pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand Down Expand Up @@ -54,6 +55,7 @@ type azureQueueMetadata struct {
connection string
accountName string
endpointSuffix string
queueLengthStrategy string
triggerIndex int
}

Expand Down Expand Up @@ -120,6 +122,17 @@ func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Log
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given")
}

if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" {
strategy := strings.ToLower(val)
if strategy == "default" || strategy == "visibleonly" {
meta.queueLengthStrategy = strategy
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val)
}
} else {
meta.queueLengthStrategy = "default"
}

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
switch config.PodIdentity.Provider {
Expand Down Expand Up @@ -179,6 +192,7 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName
s.metadata.queueName,
s.metadata.accountName,
s.metadata.endpointSuffix,
s.metadata.queueLengthStrategy,
)

if err != nil {
Expand Down

0 comments on commit 7e82da9

Please sign in to comment.