Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: util functions for Status & Conditions handling #4487

Merged
merged 3 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ New deprecation(s):
- **General**: Fix odd number of arguments passed as key-value pairs for logging ([#4368](https://github.com/kedacore/keda/issues/4368))
- **General**: Automatically scale test clusters in/out to reduce environmental footprint & improve cost-efficiency ([#4456](https://github.com/kedacore/keda/pull/4456))
- **General**: Use default metrics provider from sigs.k8s.io/custom-metrics-apiserver ([#4473](https://github.com/kedacore/keda/pull/4473))
- **General**: Refactor several functions for Status & Conditions handling into pkg util functions ([#2906](https://github.com/kedacore/keda/pull/2906))

## v2.10.0

Expand Down
5 changes: 3 additions & 2 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/scaling/executor"
kedautil "github.com/kedacore/keda/v2/pkg/util"
version "github.com/kedacore/keda/v2/version"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logg
status := scaledObject.Status.DeepCopy()
status.HpaName = hpaName

err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
err = kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used hpaName")
return err
Expand Down Expand Up @@ -237,7 +238,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,

updateHealthStatus(scaledObject, externalMetricNames, status)

err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
err = kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used externalMetricNames")
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// +kubebuilder:rbac:groups=keda.sh,resources=scaledjobs;scaledjobs/finalizers;scaledjobs/status,verbs="*"
Expand Down Expand Up @@ -124,7 +124,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// ensure Status Conditions are initialized
if !scaledJob.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
Expand Down
7 changes: 4 additions & 3 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs="*"
Expand Down Expand Up @@ -166,7 +167,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
// ensure Status Conditions are initialized
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -188,7 +189,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, msg)
}

if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -325,7 +326,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
status.PausedReplicaCount = nil
}

if err := kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil {
if err := kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil {
return gvkr, err
}
logger.Info("Detected resource targeted for scaling", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
Expand Down
74 changes: 35 additions & 39 deletions pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
Expand Down Expand Up @@ -62,52 +63,47 @@ func NewScaleExecutor(client runtimeclient.Client, scaleClient scale.ScalesGette
}

func (e *scaleExecutor) updateLastActiveTime(ctx context.Context, logger logr.Logger, object interface{}) error {
var patch runtimeclient.Patch

now := metav1.Now()
runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.LastActiveTime = &now
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.LastActiveTime = &now
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status")
return err
}

err := e.client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status")
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
now, ok := target.(metav1.Time)
if !ok {
return fmt.Errorf("transform target is not metav1.Time type %v", target)
}
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
obj.Status.LastActiveTime = &now
case *kedav1alpha1.ScaledJob:
obj.Status.LastActiveTime = &now
default:
}
return nil
}
return err
return kedautil.TransformObject(ctx, e.client, logger, object, now, transform)
}

func (e *scaleExecutor) setCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string, setCondition func(kedav1alpha1.Conditions, metav1.ConditionStatus, string, string)) error {
var patch runtimeclient.Patch

runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
setCondition(obj.Status.Conditions, status, reason, message)
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
setCondition(obj.Status.Conditions, status, reason, message)
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status")
return err
type transformStruct struct {
status metav1.ConditionStatus
reason string
message string
}

err := e.client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status")
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
transformObj := target.(*transformStruct)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
setCondition(obj.Status.Conditions, transformObj.status, transformObj.reason, transformObj.message)
case *kedav1alpha1.ScaledJob:
setCondition(obj.Status.Conditions, transformObj.status, transformObj.reason, transformObj.message)
default:
}
return nil
}
target := transformStruct{
status: status,
reason: reason,
message: message,
}
return err
return kedautil.TransformObject(ctx, e.client, logger, object, &target, transform)
}

func (e *scaleExecutor) setReadyCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) {
Expand Down Expand Up @@ -106,7 +107,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
return
}
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
err = kedautil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
Expand Down
63 changes: 47 additions & 16 deletions controllers/keda/util/status.go → pkg/util/status.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 The KEDA Authors
Copyright 2023 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,36 +28,67 @@ import (

// SetStatusConditions patches given object with passed list of conditions based on the object's type or returns an error.
func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error {
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
conditions, ok := target.(*kedav1alpha1.Conditions)
if !ok {
return fmt.Errorf("transform target is not kedav1alpha1.Conditions type %v", target)
}
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
obj.Status.Conditions = *conditions
case *kedav1alpha1.ScaledJob:
obj.Status.Conditions = *conditions
default:
}
return nil
}
return TransformObject(ctx, client, logger, object, conditions, transform)
}

// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error.
func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
status, ok := target.(*kedav1alpha1.ScaledObjectStatus)
if !ok {
return fmt.Errorf("transform target is not kedav1alpha1.ScaledObjectStatus type %v", target)
}
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
obj.Status = *status
default:
}
return nil
}
return TransformObject(ctx, client, logger, scaledObject, status, transform)
}

// TransformObject patches the given object with the targeted passed to it through a transformer function or returns an error.
func TransformObject(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, target interface{}, transform func(runtimeclient.Object, interface{}) error) error {
var patch runtimeclient.Patch

runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.Conditions = *conditions
if err := transform(obj, target); err != nil {
logger.Error(err, "failed to patch ScaledObject")
return err
}
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.Conditions = *conditions
if err := transform(obj, target); err != nil {
logger.Error(err, "failed to patch ScaledJob")
return err
}
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status with Conditions")
logger.Error(err, "failed to patch Objects")
return err
}

err := client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status with Conditions")
}
return err
}

// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error.
func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())
scaledObject.Status = *status
err := client.Status().Patch(ctx, scaledObject, patch)
if err != nil {
logger.Error(err, "Failed to patch ScaledObjects Status")
logger.Error(err, "failed to patch Objects")
}
return err
}