Skip to content

Commit

Permalink
Update GetAzureQueueLength in azure storage queue scaler to consider …
Browse files Browse the repository at this point in the history
…queueLengthStrategy

Signed-off-by: Leonardo D'Ippolito <contact@leodip.com>
  • Loading branch information
leodip authored and JorTurFer committed Jul 30, 2024
1 parent ccadc8a commit be22628
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478))
- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))
- **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
Expand Down
54 changes: 48 additions & 6 deletions pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strconv"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/go-logr/logr"
Expand All @@ -33,10 +34,16 @@ import (
)

const (
queueLengthMetricName = "queueLength"
activationQueueLengthMetricName = "activationQueueLength"
defaultTargetQueueLength = 5
externalMetricType = "External"
queueLengthMetricName = "queueLength"
activationQueueLengthMetricName = "activationQueueLength"
defaultTargetQueueLength = 5
externalMetricType = "External"
QueueLengthStrategyAll string = "all"
QueueLengthStrategyVisibleOnly string = "visibleonly"
)

var (
maxPeekMessages int32 = 32
)

type azureQueueScaler struct {
Expand All @@ -53,6 +60,7 @@ type azureQueueMetadata struct {
connection string
accountName string
endpointSuffix string
queueLengthStrategy string
triggerIndex int
}

Expand Down Expand Up @@ -123,6 +131,17 @@ func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Log
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given")
}

if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" {
strategy := strings.ToLower(val)
if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly {
meta.queueLengthStrategy = strategy
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val)
}
} else {
meta.queueLengthStrategy = QueueLengthStrategyAll
}

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
switch config.PodIdentity.Provider {
Expand Down Expand Up @@ -172,12 +191,35 @@ func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricS

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
props, err := s.queueClient.GetProperties(ctx, nil)
queuelen, err := s.getMessageCount(ctx)
if err != nil {
s.logger.Error(err, "error getting queue length")
return []external_metrics.ExternalMetricValue{}, false, err
}
queuelen := int64(*props.ApproximateMessagesCount)

metric := GenerateMetricInMili(metricName, float64(queuelen))
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil
}

func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) {
strategy := strings.ToLower(s.metadata.queueLengthStrategy)
if strategy == QueueLengthStrategyVisibleOnly {
queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages})
if err != nil {
return 0, err
}
visibleMessageCount := len(queue.Messages)

// Queue has less messages than we allowed to peek for,
// so no need to fall back to the 'all' strategy
if visibleMessageCount < int(maxPeekMessages) {
return int64(visibleMessageCount), nil
}
}

props, err := s.queueClient.GetProperties(ctx, nil)
if err != nil {
return 0, err
}
return int64(*props.ApproximateMessagesCount), nil
}

0 comments on commit be22628

Please sign in to comment.