diff --git a/CHANGELOG.md b/CHANGELOG.md index adda09d72bb..1fdec2449f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ ### New - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) +- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016)) ### Improvements diff --git a/apis/keda/v1alpha1/scaledjob_types.go b/apis/keda/v1alpha1/scaledjob_types.go index 7b6934dbee1..946cfe9a248 100644 --- a/apis/keda/v1alpha1/scaledjob_types.go +++ b/apis/keda/v1alpha1/scaledjob_types.go @@ -87,6 +87,8 @@ type ScalingStrategy struct { CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"` // +optional PendingPodConditions []string `json:"pendingPodConditions,omitempty"` + // +optional + MultipleScalersCalculation string `json:"multipleScalersCalculation,omitempty"` } func init() { diff --git a/config/crd/bases/keda.sh_scaledjobs.yaml b/config/crd/bases/keda.sh_scaledjobs.yaml index 02a84d23705..511680bc103 100644 --- a/config/crd/bases/keda.sh_scaledjobs.yaml +++ b/config/crd/bases/keda.sh_scaledjobs.yaml @@ -7362,6 +7362,8 @@ spec: type: integer customScalingRunningJobPercentage: type: string + multipleScalersCalculation: + type: string pendingPodConditions: items: type: string diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 1ddb78bf543..8fca1cff7de 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -23,7 +23,6 @@ import ( "time" "github.com/go-logr/logr" - "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -37,6 +36,7 @@ import ( "github.com/kedacore/keda/v2/pkg/scalers" "github.com/kedacore/keda/v2/pkg/scaling/executor" "github.com/kedacore/keda/v2/pkg/scaling/resolver" + "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) // ScaleHandler encapsulates the logic of calling the right scalers for @@ -264,99 +264,7 @@ func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scale } func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - var queueLength int64 - var targetAverageValue int64 - var maxValue int64 - isActive := false - - // TODO refactor this, do chores, reduce the verbosity ie: V(1) and frequency of logs - // move relevant funcs getTargetAverageValue(), min() and divideWithCeil() out of scaler_handler.go - for _, scaler := range scalers { - scalerLogger := h.logger.WithValues("Scaler", scaler) - - metricSpecs := scaler.GetMetricSpecForScaling() - - // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) - // or skip cpu/memory resource scaler - if len(metricSpecs) < 1 || metricSpecs[0].External == nil { - continue - } - - isTriggerActive, err := scaler.IsActive(ctx) - - scalerLogger.Info("Active trigger", "isTriggerActive", isTriggerActive) - - targetAverageValue = getTargetAverageValue(metricSpecs) - - scalerLogger.Info("Scaler targetAverageValue", "targetAverageValue", targetAverageValue) - - metrics, _ := scaler.GetMetrics(ctx, "queueLength", nil) - - var metricValue int64 - - for _, m := range metrics { - if m.MetricName == "queueLength" { - metricValue, _ = m.Value.AsInt64() - queueLength += metricValue - } - } - scalerLogger.Info("QueueLength Metric value", "queueLength", queueLength) - - scaler.Close() - if err != nil { - scalerLogger.V(1).Info("Error getting scale decision, but continue", "Error", err) - h.recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - continue - } else if isTriggerActive { - isActive = true - scalerLogger.Info("Scaler is active") - } - } - if targetAverageValue != 0 { - maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue)) - } - h.logger.Info("Scaler maxValue", "maxValue", maxValue) - return isActive, queueLength, maxValue -} - -func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 { - var targetAverageValue int64 - var metricValue int64 - var flag bool - for _, metric := range metricSpecs { - if metric.External.Target.AverageValue == nil { - metricValue = 0 - } else { - metricValue, flag = metric.External.Target.AverageValue.AsInt64() - if !flag { - metricValue = 0 - } - } - - targetAverageValue += metricValue - } - count := int64(len(metricSpecs)) - if count != 0 { - return targetAverageValue / count - } - return 0 -} - -func divideWithCeil(x, y int64) int64 { - ans := x / y - reminder := x % y - if reminder != 0 { - return ans + 1 - } - return ans -} - -// Min function for int64 -func min(x, y int64) int64 { - if x > y { - return y - } - return x + return scaledjob.GetScaleMetrics(ctx, scalers, scaledJob, h.recorder) } // buildScalers returns list of Scalers for the specified triggers diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index b2558582017..0eb291d9004 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -37,37 +37,6 @@ import ( "github.com/kedacore/keda/v2/pkg/scaling/executor" ) -func TestTargetAverageValue(t *testing.T) { - // count = 0 - specs := []v2beta2.MetricSpec{} - targetAverageValue := getTargetAverageValue(specs) - assert.Equal(t, int64(0), targetAverageValue) - // 1 1 - specs = []v2beta2.MetricSpec{ - createMetricSpec(1), - createMetricSpec(1), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(1), targetAverageValue) - // 5 5 3 - specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(4), targetAverageValue) - - // 5 5 4 - specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(4), targetAverageValue) -} - func TestCheckScaledObjectScalersWithError(t *testing.T) { ctrl := gomock.NewController(t) client := mock_client.NewMockClient(ctrl) diff --git a/pkg/scaling/scaledjob/scale_metrics.go b/pkg/scaling/scaledjob/scale_metrics.go new file mode 100644 index 00000000000..bcd60f8f6a0 --- /dev/null +++ b/pkg/scaling/scaledjob/scale_metrics.go @@ -0,0 +1,184 @@ +package scaledjob + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "k8s.io/api/autoscaling/v2beta2" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/scalers" +) + +type scalerMetrics struct { + queueLength int64 + maxValue int64 + isActive bool +} + +// GetScaleMetrics gets the metrics for decision making of scaling. +func GetScaleMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, recorder record.EventRecorder) (bool, int64, int64) { + var queueLength int64 + var maxValue int64 + isActive := false + + logger := logf.Log.WithName("scalemetrics") + scalersMetrics := getScalersMetrics(ctx, scalers, scaledJob, logger, recorder) + switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + case "avg": + queueLengthSum := int64(0) + maxValueSum := int64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLengthSum += metrics.queueLength + maxValueSum += metrics.maxValue + isActive = metrics.isActive + length++ + } + } + if length != 0 { + queueLength = divideWithCeil(queueLengthSum, int64(length)) + maxValue = divideWithCeil(maxValueSum, int64(length)) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLength += metrics.queueLength + maxValue += metrics.maxValue + isActive = metrics.isActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.queueLength > queueLength && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + } + maxValue = min(scaledJob.MaxReplicaCount(), maxValue) + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + + return isActive, queueLength, maxValue +} + +func getScalersMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, logger logr.Logger, recorder record.EventRecorder) []scalerMetrics { + scalersMetrics := []scalerMetrics{} + + for _, scaler := range scalers { + var queueLength int64 + var targetAverageValue int64 + isActive := false + maxValue := int64(0) + scalerType := fmt.Sprintf("%T:", scaler) + + scalerLogger := logger.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) + + metricSpecs := scaler.GetMetricSpecForScaling() + + // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) + // or skip cpu/memory resource scaler + if len(metricSpecs) < 1 || metricSpecs[0].External == nil { + continue + } + + isTriggerActive, err := scaler.IsActive(ctx) + if err != nil { + scalerLogger.V(1).Info("Error getting scaler.IsActive, but continue", "Error", err) + recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + scaler.Close() + continue + } + + targetAverageValue = getTargetAverageValue(metricSpecs) + + metrics, err := scaler.GetMetrics(ctx, "queueLength", nil) + if err != nil { + scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err) + recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + scaler.Close() + continue + } + + var metricValue int64 + + for _, m := range metrics { + if m.MetricName == "queueLength" { + metricValue, _ = m.Value.AsInt64() + queueLength += metricValue + } + } + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue) + + scaler.Close() + + if isTriggerActive { + isActive = true + } + + if targetAverageValue != 0 { + maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue)) + } + scalersMetrics = append(scalersMetrics, scalerMetrics{ + queueLength: queueLength, + maxValue: maxValue, + isActive: isActive, + }) + } + return scalersMetrics +} + +func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 { + var targetAverageValue int64 + var metricValue int64 + var flag bool + for _, metric := range metricSpecs { + if metric.External.Target.AverageValue == nil { + metricValue = 0 + } else { + metricValue, flag = metric.External.Target.AverageValue.AsInt64() + if !flag { + metricValue = 0 + } + } + + targetAverageValue += metricValue + } + count := int64(len(metricSpecs)) + if count != 0 { + return targetAverageValue / count + } + return 0 +} + +func divideWithCeil(x, y int64) int64 { + ans := x / y + reminder := x % y + if reminder != 0 { + return ans + 1 + } + return ans +} + +// Min function for int64 +func min(x, y int64) int64 { + if x > y { + return y + } + return x +} diff --git a/pkg/scaling/scaledjob/scale_metrics_test.go b/pkg/scaling/scaledjob/scale_metrics_test.go new file mode 100644 index 00000000000..3e4a410db3b --- /dev/null +++ b/pkg/scaling/scaledjob/scale_metrics_test.go @@ -0,0 +1,206 @@ +package scaledjob + +import ( + "context" + "fmt" + "testing" + + "github.com/go-playground/assert/v2" + "github.com/golang/mock/gomock" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/tools/record" + "k8s.io/metrics/pkg/apis/external_metrics" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" + "github.com/kedacore/keda/v2/pkg/scalers" +) + +func TestTargetAverageValue(t *testing.T) { + // count = 0 + specs := []v2beta2.MetricSpec{} + targetAverageValue := getTargetAverageValue(specs) + assert.Equal(t, int64(0), targetAverageValue) + // 1 1 + specs = []v2beta2.MetricSpec{ + createMetricSpec(1), + createMetricSpec(1), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(1), targetAverageValue) + // 5 5 3 + specs = []v2beta2.MetricSpec{ + createMetricSpec(5), + createMetricSpec(5), + createMetricSpec(3), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(4), targetAverageValue) + + // 5 5 4 + specs = []v2beta2.MetricSpec{ + createMetricSpec(5), + createMetricSpec(5), + createMetricSpec(3), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(4), targetAverageValue) +} + +func createMetricSpec(averageValue int) v2beta2.MetricSpec { + qty := resource.NewQuantity(int64(averageValue), resource.DecimalSI) + return v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Target: v2beta2.MetricTarget{ + AverageValue: qty, + }, + }, + } +} + +func TestIsScaledJobActive(t *testing.T) { + ctrl := gomock.NewController(t) + recorder := record.NewFakeRecorder(1) + + // Keep the current behavior + // Assme 1 trigger only + scaledJobSingle := createScaledObject(100, "") // testing default = max + scalerSingle := []scalers.Scaler{ + createScaler(ctrl, int64(20), int32(2), true), + } + + isActive, queueLength, maxValue := GetScaleMetrics(context.TODO(), scalerSingle, scaledJobSingle, recorder) + assert.Equal(t, true, isActive) + assert.Equal(t, int64(20), queueLength) + assert.Equal(t, int64(10), maxValue) + + // Non-Active trigger only + scalerSingle = []scalers.Scaler{ + createScaler(ctrl, int64(0), int32(2), false), + } + + isActive, queueLength, maxValue = GetScaleMetrics(context.TODO(), scalerSingle, scaledJobSingle, recorder) + assert.Equal(t, false, isActive) + assert.Equal(t, int64(0), queueLength) + assert.Equal(t, int64(0), maxValue) + + // Test the valiation + scalerTestDatam := []scalerTestData{ + newScalerTestData(100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), + newScalerTestData(100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), + newScalerTestData(100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), + newScalerTestData(100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), + newScalerTestData(25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), + } + + for index, scalerTestData := range scalerTestDatam { + scaledJob := createScaledObject(scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) + scalers := []scalers.Scaler{ + createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), + createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), + createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), + createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), + } + fmt.Printf("index: %d", index) + isActive, queueLength, maxValue = GetScaleMetrics(context.TODO(), scalers, scaledJob, recorder) + // assert.Equal(t, 5, index) + assert.Equal(t, scalerTestData.ResultIsActive, isActive) + assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) + assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) + } +} + +func newScalerTestData( + maxReplicaCount int, + multipleScalersCalculation string, + scaler1QueueLength, //nolint:golint,unparam + scaler1AverageValue int, //nolint:golint,unparam + scaler1IsActive bool, //nolint:golint,unparam + scaler2QueueLength, //nolint:golint,unparam + scaler2AverageValue int, //nolint:golint,unparam + scaler2IsActive bool, //nolint:golint,unparam + scaler3QueueLength, //nolint:golint,unparam + scaler3AverageValue int, //nolint:golint,unparam + scaler3IsActive bool, //nolint:golint,unparam + scaler4QueueLength, //nolint:golint,unparam + scaler4AverageValue int, //nolint:golint,unparam + scaler4IsActive bool, //nolint:golint,unparam + resultIsActive bool, //nolint:golint,unparam + resultQueueLength, + resultMaxLength int) scalerTestData { + return scalerTestData{ + MaxReplicaCount: int32(maxReplicaCount), + MultipleScalersCalculation: multipleScalersCalculation, + Scaler1QueueLength: int64(scaler1QueueLength), + Scaler1AverageValue: int32(scaler1AverageValue), + Scaler1IsActive: scaler1IsActive, + Scaler2QueueLength: int64(scaler2QueueLength), + Scaler2AverageValue: int32(scaler2AverageValue), + Scaler2IsActive: scaler2IsActive, + Scaler3QueueLength: int64(scaler3QueueLength), + Scaler3AverageValue: int32(scaler3AverageValue), + Scaler3IsActive: scaler3IsActive, + Scaler4QueueLength: int64(scaler4QueueLength), + Scaler4AverageValue: int32(scaler4AverageValue), + Scaler4IsActive: scaler4IsActive, + ResultIsActive: resultIsActive, + ResultQueueLength: int64(resultQueueLength), + ResultMaxValue: int64(resultMaxLength), + } +} + +type scalerTestData struct { + MaxReplicaCount int32 + MultipleScalersCalculation string + Scaler1QueueLength int64 + Scaler1AverageValue int32 + Scaler1IsActive bool + Scaler2QueueLength int64 + Scaler2AverageValue int32 + Scaler2IsActive bool + Scaler3QueueLength int64 + Scaler3AverageValue int32 + Scaler3IsActive bool + Scaler4QueueLength int64 + Scaler4AverageValue int32 + Scaler4IsActive bool + ResultIsActive bool + ResultQueueLength int64 + ResultMaxValue int64 +} + +func createScaledObject(maxReplicaCount int32, multipleScalersCalculation string) *kedav1alpha1.ScaledJob { + if multipleScalersCalculation != "" { + return &kedav1alpha1.ScaledJob{ + Spec: kedav1alpha1.ScaledJobSpec{ + MaxReplicaCount: &maxReplicaCount, + ScalingStrategy: kedav1alpha1.ScalingStrategy{ + MultipleScalersCalculation: multipleScalersCalculation, + }, + }, + } + } + return &kedav1alpha1.ScaledJob{ + Spec: kedav1alpha1.ScaledJobSpec{ + MaxReplicaCount: &maxReplicaCount, + }, + } +} + +func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int32, isActive bool) *mock_scalers.MockScaler { + metricName := "queueLength" + scaler := mock_scalers.NewMockScaler(ctrl) + metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(int(averageValue))} + metrics := []external_metrics.ExternalMetricValue{ + { + MetricName: metricName, + Value: *resource.NewQuantity(queueLength, resource.DecimalSI), + }, + } + scaler.EXPECT().IsActive(gomock.Any()).Return(isActive, nil) + scaler.EXPECT().GetMetricSpecForScaling().Return(metricsSpecs) + scaler.EXPECT().GetMetrics(gomock.Any(), metricName, nil).Return(metrics, nil) + scaler.EXPECT().Close() + return scaler +}