diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c83a8e186a..cb9d5841aa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,7 @@ Here is an overview of all new **experimental** features: ### Improvements +- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478)) - **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802)) - **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) diff --git a/pkg/scalers/azure_queue_scaler.go b/pkg/scalers/azure_queue_scaler.go index 3b4fe1d113d..6f642ec04bf 100644 --- a/pkg/scalers/azure_queue_scaler.go +++ b/pkg/scalers/azure_queue_scaler.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strconv" + "strings" "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" "github.com/go-logr/logr" @@ -33,10 +34,16 @@ import ( ) const ( - queueLengthMetricName = "queueLength" - activationQueueLengthMetricName = "activationQueueLength" - defaultTargetQueueLength = 5 - externalMetricType = "External" + queueLengthMetricName = "queueLength" + activationQueueLengthMetricName = "activationQueueLength" + defaultTargetQueueLength = 5 + externalMetricType = "External" + QueueLengthStrategyAll string = "all" + QueueLengthStrategyVisibleOnly string = "visibleonly" +) + +var ( + maxPeekMessages int32 = 32 ) type azureQueueScaler struct { @@ -53,6 +60,7 @@ type azureQueueMetadata struct { connection string accountName string endpointSuffix string + queueLengthStrategy string triggerIndex int } @@ -123,6 +131,17 @@ func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Log return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given") } + if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" { + strategy := strings.ToLower(val) + if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly { + meta.queueLengthStrategy = strategy + } else { + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val) + } + } else { + meta.queueLengthStrategy = QueueLengthStrategyAll + } + // If the Use AAD Pod Identity is not present, or set to "none" // then check for connection string switch config.PodIdentity.Provider { @@ -172,12 +191,35 @@ func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricS // GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - props, err := s.queueClient.GetProperties(ctx, nil) + queuelen, err := s.getMessageCount(ctx) if err != nil { s.logger.Error(err, "error getting queue length") return []external_metrics.ExternalMetricValue{}, false, err } - queuelen := int64(*props.ApproximateMessagesCount) + metric := GenerateMetricInMili(metricName, float64(queuelen)) return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil } + +func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) { + strategy := strings.ToLower(s.metadata.queueLengthStrategy) + if strategy == QueueLengthStrategyVisibleOnly { + queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages}) + if err != nil { + return 0, err + } + visibleMessageCount := len(queue.Messages) + + // Queue has less messages than we allowed to peek for, + // so no need to fall back to the 'all' strategy + if visibleMessageCount < int(maxPeekMessages) { + return int64(visibleMessageCount), nil + } + } + + props, err := s.queueClient.GetProperties(ctx, nil) + if err != nil { + return 0, err + } + return int64(*props.ApproximateMessagesCount), nil +}