Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add annotation to pause autoscaling at a fixed count #2765

Merged
merged 5 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@

### New

- **General:** Introduce annotation `"autoscaling.keda.sh/paused-replicas"` for ScaledObjects to pause scaling at a fixed replica count. ([#944](https://github.com/kedacore/keda/issues/944))
- **General:** Introduce ARM-based container image for KEDA ([#2263](https://github.com/kedacore/keda/issues/2263)|[#2262](https://github.com/kedacore/keda/issues/2262))
- **General:** Introduce new AWS DynamoDB Scaler ([#2486](https://github.com/kedacore/keda/issues/2482))
- **General:** Introduce new Azure Data Explorer Scaler ([#1488](https://github.com/kedacore/keda/issues/1488)|[#2734](https://github.com/kedacore/keda/issues/2734))
- **General:** Introduce new GCP Stackdriver Scaler ([#2661](https://github.com/kedacore/keda/issues/2661))
- **General:** Introduce new GCP Storage Scaler ([#2628](https://github.com/kedacore/keda/issues/2628))
- **General:** Introduce ARM-based container image for KEDA ([#2263](https://github.com/kedacore/keda/issues/2263)|[#2262](https://github.com/kedacore/keda/issues/2262))
- **General:** Provide support for authentication via Azure Key Vault ([#900](https://github.com/kedacore/keda/issues/900)|[#2733](https://github.com/kedacore/keda/issues/2733))
- **General**: Support for `ValueMetricType` in `ScaledObject` for all scalers except CPU/Memory ([#2030](https://github.com/kedacore/keda/issues/2030))

Expand Down
2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ type ScaledObjectStatus struct {
Conditions Conditions `json:"conditions,omitempty"`
// +optional
Health map[string]HealthStatus `json:"health,omitempty"`
// +optional
PausedReplicaCount *int32 `json:"pausedReplicaCount,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
5 changes: 5 additions & 0 deletions apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ spec:
originalReplicaCount:
format: int32
type: integer
pausedReplicaCount:
format: int32
type: integer
resourceMetricNames:
items:
type: string
Expand Down
21 changes: 19 additions & 2 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/scaling/executor"
version "github.com/kedacore/keda/v2/version"
)

Expand Down Expand Up @@ -90,10 +91,26 @@ func (r *ScaledObjectReconciler) newHPAForScaledObject(ctx context.Context, logg
labels[key] = value
}

minReplicas := getHPAMinReplicas(scaledObject)
maxReplicas := getHPAMaxReplicas(scaledObject)

pausedCount, err := executor.GetPausedReplicaCount(scaledObject)
if err != nil {
return nil, err
}
if pausedCount != nil {
// MinReplicas on HPA can't be 0
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
if *pausedCount == 0 {
*pausedCount = 1
}
minReplicas = pausedCount
maxReplicas = *pausedCount
}

hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{
Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{
MinReplicas: getHPAMinReplicas(scaledObject),
MaxReplicas: getHPAMaxReplicas(scaledObject),
MinReplicas: minReplicas,
MaxReplicas: maxReplicas,
Metrics: scaledObjectMetricSpecs,
Behavior: behavior,
ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{
Expand Down
12 changes: 10 additions & 2 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
// predicate.GenerationChangedPredicate{} ignore updates to ScaledObject Status
// (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(
predicate.Or(kedacontrollerutil.PausedReplicasPredicate{}, predicate.GenerationChangedPredicate{}),
)).
Owns(&autoscalingv2beta2.HorizontalPodAutoscaler{}).
Complete(r)
}
Expand Down Expand Up @@ -281,7 +283,9 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// do we need the scale to update the status later?
wantStatusUpdate := scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil
_, present := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
removePausedStatus := scaledObject.Status.PausedReplicaCount != nil && !present
wantStatusUpdate := scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil || removePausedStatus

// check if we already know.
var scale *autoscalingv1.Scale
Expand Down Expand Up @@ -321,6 +325,10 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
status.OriginalReplicaCount = &scale.Spec.Replicas
}

if removePausedStatus {
status.PausedReplicaCount = nil
}

if err := kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil {
return gvkr, err
}
Expand Down
30 changes: 30 additions & 0 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package util

import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"

type PausedReplicasPredicate struct {
predicate.Funcs
}

func (PausedReplicasPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

newAnnotations := e.ObjectNew.GetAnnotations()
oldAnnotations := e.ObjectOld.GetAnnotations()
if newAnnotations != nil && oldAnnotations != nil {
if newVal, ok1 := newAnnotations[PausedReplicasAnnotation]; ok1 {
if oldVal, ok2 := oldAnnotations[PausedReplicasAnnotation]; ok2 {
return newVal != oldVal
}
return true
}
}
return false
}
61 changes: 55 additions & 6 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package executor

import (
"context"
"strconv"
"time"

"github.com/go-logr/logr"
Expand All @@ -28,6 +29,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my thinking, we should unify the functions for Status & Conditions handling. Referencing controller related stuff here is suboptimal.

There are two locations with similar functions:

func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error {
var patch runtimeclient.Patch
runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.Conditions = *conditions
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.Conditions = *conditions
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status with Conditions")
return err
}
err := client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status with Conditions")
}
return err
}
// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error.
func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())
scaledObject.Status = *status
err := client.Status().Patch(ctx, scaledObject, patch)
if err != nil {
logger.Error(err, "Failed to patch ScaledObjects Status")
}
return err
}

func (e *scaleExecutor) updateLastActiveTime(ctx context.Context, logger logr.Logger, object interface{}) error {
var patch runtimeclient.Patch
now := metav1.Now()
runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.LastActiveTime = &now
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.LastActiveTime = &now
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status")
return err
}
err := e.client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status")
}
return err
}
func (e *scaleExecutor) setCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string, setCondition func(kedav1alpha1.Conditions, metav1.ConditionStatus, string, string)) error {
var patch runtimeclient.Patch
runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
setCondition(obj.Status.Conditions, status, reason, message)
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
setCondition(obj.Status.Conditions, status, reason, message)
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status")
return err
}
err := e.client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status")
}
return err
}
func (e *scaleExecutor) setReadyCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
active := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) {
conditions.SetReadyCondition(status, reason, message)
}
return e.setCondition(ctx, logger, object, status, reason, message, active)
}
func (e *scaleExecutor) setActiveCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
active := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) {
conditions.SetActiveCondition(status, reason, message)
}
return e.setCondition(ctx, logger, object, status, reason, message, active)
}
func (e *scaleExecutor) setFallbackCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
fallback := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) {
conditions.SetFallbackCondition(status, reason, message)
}
return e.setCondition(ctx, logger, object, status, reason, message, fallback)
}

We should refactor this to have it in one place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move keda/controllers/keda/util/status.go to pkg/util/status.go, this way the SetStatusConditions and UpdateScaldObjectStatus can be used across the project. As for unifying the functions for Status and Conditions handling, I think that should be tackled in a different PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah a change like that. And I concur this should be tackled in a different PR.


"github.com/kedacore/keda/v2/pkg/eventreason"
)

Expand Down Expand Up @@ -69,12 +72,6 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
currentReplicas = currentScale.Spec.Replicas
}

// if scaledObject.Spec.MinReplicaCount is not set, then set the default value (0)
minReplicas := int32(0)
if scaledObject.Spec.MinReplicaCount != nil {
minReplicas = *scaledObject.Spec.MinReplicaCount
}

// if the ScaledObject's triggers aren't in the error state,
// but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true'
readyCondition := scaledObject.Status.Conditions.GetReadyCondition()
Expand All @@ -85,6 +82,44 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
}
}

// Check if we are paused, and if we are then update the scale to the desired count.
pausedCount, err := GetPausedReplicaCount(scaledObject)
if err != nil {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
logger.Error(err, "error getting the paused replica count on the current ScaledObject.")
return
}

status := scaledObject.Status.DeepCopy()
if pausedCount != nil && *pausedCount != currentReplicas && status.PausedReplicaCount == nil {
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount)
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount)
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
return
}
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
}
logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount)
return
}

// if scaledObject.Spec.MinReplicaCount is not set, then set the default value (0)
minReplicas := int32(0)
if scaledObject.Spec.MinReplicaCount != nil {
minReplicas = *scaledObject.Spec.MinReplicaCount
}

if isActive {
switch {
case scaledObject.Spec.IdleReplicaCount != nil && currentReplicas < minReplicas,
Expand Down Expand Up @@ -320,3 +355,17 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool

return false, *scaledObject.Spec.MinReplicaCount
}

func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) {
if scaledObject.Annotations != nil {
if val, ok := scaledObject.Annotations[kedacontrollerutil.PausedReplicasAnnotation]; ok {
conv, err := strconv.ParseInt(val, 10, 32)
if err != nil {
return nil, err
}
count := int32(conv)
return &count, nil
}
}
return nil, nil
}
62 changes: 62 additions & 0 deletions pkg/scaling/executor/scale_scaledobjects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,65 @@ func TestScaleFromIdleToMinReplicasWhenActive(t *testing.T) {
condition := scaledObject.Status.Conditions.GetActiveCondition()
assert.Equal(t, true, condition.IsTrue())
}

func TestScaleToPausedReplicasCount(t *testing.T) {
ctrl := gomock.NewController(t)
client := mock_client.NewMockClient(ctrl)
recorder := record.NewFakeRecorder(1)
mockScaleClient := mock_scale.NewMockScalesGetter(ctrl)
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)

scaledObject := v1alpha1.ScaledObject{
ObjectMeta: v1.ObjectMeta{
Name: "name",
Namespace: "namespace",
Annotations: map[string]string{
"autoscaling.keda.sh/paused-replicas": "0",
},
},
Spec: v1alpha1.ScaledObjectSpec{
ScaleTargetRef: &v1alpha1.ScaleTarget{
Name: "name",
},
},
Status: v1alpha1.ScaledObjectStatus{
ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{
Group: "apps",
Kind: "Deployment",
},
},
}

scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions()

pausedReplicaCount := int32(0)
replicaCount := int32(2)

client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Replicas: &replicaCount,
},
})

scale := &autoscalingv1.Scale{
Spec: autoscalingv1.ScaleSpec{
Replicas: replicaCount,
},
}

mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2)
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)

assert.Equal(t, pausedReplicaCount, scale.Spec.Replicas)
condition := scaledObject.Status.Conditions.GetActiveCondition()
assert.Equal(t, false, condition.IsTrue())
}
Loading