Skip to content

Commit

Permalink
Add eagerScalingStrategy for ScaledJob (#5872)
Browse files Browse the repository at this point in the history
Signed-off-by: June Han <sorrowitsch@gmail.com>
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
Co-authored-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
junekhan and zroubalik authored Jul 30, 2024
1 parent 343396b commit 1d51361
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114))
- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478))
- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))
- **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896))
Expand Down
28 changes: 19 additions & 9 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *scaleExecutor) getScalingDecision(scaledJob *kedav1alpha1.ScaledJob, ru
scaleTo = scaleToMinReplica
effectiveMaxScale = scaleToMinReplica
} else {
effectiveMaxScale = NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount-minReplicaCount, pendingJobCount, scaledJob.MaxReplicaCount())
effectiveMaxScale, scaleTo = NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount-minReplicaCount, pendingJobCount, scaledJob.MaxReplicaCount(), scaleTo)
}
return effectiveMaxScale, scaleTo
}
Expand Down Expand Up @@ -391,6 +391,9 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S
case "accurate":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "accurate")
return accurateScalingStrategy{}
case "eager":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "eager")
return eagerScalingStrategy{}
default:
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "default")
return defaultScalingStrategy{}
Expand All @@ -399,33 +402,40 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S

// ScalingStrategy is an interface for switching scaling algorithm
type ScalingStrategy interface {
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, scaleTo int64) (int64, int64)
}

type defaultScalingStrategy struct {
}

func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, _ int64) int64 {
return maxScale - runningJobCount
func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, _, scaleTo int64) (int64, int64) {
return maxScale - runningJobCount, scaleTo
}

type customScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32
CustomScalingRunningJobPercentage *float64
}

func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, maxReplicaCount int64) int64 {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount)
func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, maxReplicaCount, scaleTo int64) (int64, int64) {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount), scaleTo
}

type accurateScalingStrategy struct {
}

func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, scaleTo int64) (int64, int64) {
if (maxScale + runningJobCount) > maxReplicaCount {
return maxReplicaCount - runningJobCount
return maxReplicaCount - runningJobCount, scaleTo
}
return maxScale - pendingJobCount
return maxScale - pendingJobCount, scaleTo
}

type eagerScalingStrategy struct {
}

func (s eagerScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, _ int64) (int64, int64) {
return min(maxReplicaCount-runningJobCount-pendingJobCount, maxScale), maxReplicaCount
}

func min(x, y int64) int64 {
Expand Down
51 changes: 39 additions & 12 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,17 @@ func TestNewNewScalingStrategy(t *testing.T) {
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))
}

func maxScaleValue(maxValue, _ int64) int64 {
return maxValue
}

func TestDefaultScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(2), maxScaleValue(strategy.GetEffectiveMaxScale(2, 0, 0, 5, 1)))
}

func TestCustomScalingStrategy(t *testing.T) {
Expand All @@ -97,13 +101,13 @@ func TestCustomScalingStrategy(t *testing.T) {
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 0, 10))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(9), maxScaleValue(strategy.GetEffectiveMaxScale(10, 0, 0, 10, 1)))
strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom"))

// If you don't set the two parameters is the same behavior as DefaultStrategy
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(2), maxScaleValue(strategy.GetEffectiveMaxScale(2, 0, 0, 5, 1)))

// Empty String will be DefaultStrategy
customScalingQueueLengthDeduction = int32(1)
Expand All @@ -115,25 +119,48 @@ func TestCustomScalingStrategy(t *testing.T) {
customScalingQueueLengthDeduction = int32(2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))

// Exceed the MaxReplicaCount
customScalingQueueLengthDeduction = int32(-2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 0, 4))
assert.Equal(t, int64(4), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 4, 1)))
}

func TestAccurateScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 0, 5))
assert.Equal(t, int64(3), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(3), maxScaleValue(strategy.GetEffectiveMaxScale(5, 2, 0, 5, 1)))

// Test with 2 pending jobs
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 4, 2, 10))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(5, 4, 2, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 4, 2, 10, 1)))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(5, 4, 2, 5, 1)))
}

func TestEagerScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("eager", "eager", 0, "0"))

maxScale, scaleTo := strategy.GetEffectiveMaxScale(4, 3, 0, 10, 1)
assert.Equal(t, int64(4), maxScale)
assert.Equal(t, int64(10), scaleTo)
maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 0, 3, 10, 1)
assert.Equal(t, int64(4), maxScale)
assert.Equal(t, int64(10), scaleTo)

maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 7, 0, 10, 1)
assert.Equal(t, int64(3), maxScale)
assert.Equal(t, int64(10), scaleTo)
maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 1, 6, 10, 1)
assert.Equal(t, int64(3), maxScale)
assert.Equal(t, int64(10), scaleTo)

maxScale, scaleTo = strategy.GetEffectiveMaxScale(15, 0, 0, 10, 1)
assert.Equal(t, int64(10), maxScale)
assert.Equal(t, int64(10), scaleTo)
}

func TestCleanUpMixedCaseWithSortByTime(t *testing.T) {
Expand Down
134 changes: 134 additions & 0 deletions tests/internals/scaling_strategies/eager_scaling_strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//go:build e2e
// +build e2e

package eager_scaling_strategy_test

import (
"encoding/base64"
"fmt"
"testing"

"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper" // For helper methods
. "github.com/kedacore/keda/v2/tests/scalers/rabbitmq"
)

var _ = godotenv.Load("../../.env") // For loading env variables from .env

const (
testName = "eager-scaling-strategy-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
rmqNamespace = fmt.Sprintf("%s-rmq", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
queueName = "hello"
user = fmt.Sprintf("%s-user", testName)
password = fmt.Sprintf("%s-password", testName)
vhost = "/"
connectionString = fmt.Sprintf("amqp://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace)
httpConnectionString = fmt.Sprintf("http://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace)
secretName = fmt.Sprintf("%s-secret", testName)
)

// YAML templates for your Kubernetes resources
const (
scaledJobTemplate = `
apiVersion: v1
kind: Secret
metadata:
name: {{.SecretName}}
namespace: {{.TestNamespace}}
data:
RabbitApiHost: {{.Base64Connection}}
---
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
labels:
app: {{.ScaledJobName}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: sleeper
image: busybox
command:
- sleep
- "300"
imagePullPolicy: IfNotPresent
envFrom:
- secretRef:
name: {{.SecretName}}
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
maxReplicaCount: 10
scalingStrategy:
strategy: "eager"
triggers:
- type: rabbitmq
metadata:
queueName: {{.QueueName}}
hostFromEnv: RabbitApiHost
mode: QueueLength
value: '1'
`
)

type templateData struct {
ScaledJobName string
TestNamespace string
QueueName string
SecretName string
Base64Connection string
}

func TestScalingStrategy(t *testing.T) {
kc := GetKubernetesClient(t)
data, templates := getTemplateData()
t.Cleanup(func() {
DeleteKubernetesResources(t, testNamespace, data, templates)
RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth())
})

RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth())
CreateKubernetesResources(t, kc, testNamespace, data, templates)

testEagerScaling(t, kc)
}

func getTemplateData() (templateData, []Template) {
return templateData{
// Populate fields required in YAML templates
ScaledJobName: scaledJobName,
TestNamespace: testNamespace,
QueueName: queueName,
Base64Connection: base64.StdEncoding.EncodeToString([]byte(httpConnectionString)),
SecretName: secretName,
}, []Template{
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
}
}

func testEagerScaling(t *testing.T, kc *kubernetes.Clientset) {
iterationCount := 20
RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1),
"job count should be %d after %d iterations", 4, iterationCount)

RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 8, iterationCount, 1),
"job count should be %d after %d iterations", 8, iterationCount)

RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 10, iterationCount, 1),
"job count should be %d after %d iterations", 10, iterationCount)
}

0 comments on commit 1d51361

Please sign in to comment.