Skip to content

Commit

Permalink
Properly propagate context (#2249)
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubali@redhat.com>
  • Loading branch information
zroubalik authored Nov 3, 2021
1 parent cabe4b5 commit 2350679
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

### Other

- Ensure that `context.Context` values are passed down the stack from all scaler gRPC handler implementation to scaler implementation code ([#2202](https://github.com/kedacore/keda/pull/2202))
- Ensure that `context.Context` values are properly passed down the stack ([#2202](https://github.com/kedacore/keda/pull/2202)|[#2249](https://github.com/kedacore/keda/pull/2249))
- Migrate to Kubebuilder v3 ([#2082](https://github.com/kedacore/keda/pull/2082))
- API path has been changed: `github.com/kedacore/keda/v2/api/v1alpha1` -> `github.com/kedacore/keda/v2/apis/keda/v1alpha1`
- Use Patch to set FallbackCondition on ScaledObject.Status ([#2037](https://github.com/kedacore/keda/pull/2037))
Expand Down
1 change: 1 addition & 0 deletions adapter/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logg
return err
}

err = r.Client.Create(context.TODO(), hpa)
err = r.Client.Create(ctx, hpa)
if err != nil {
logger.Error(err, "Failed to create new HPA in cluster", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", hpaName)
return err
Expand Down Expand Up @@ -130,7 +130,7 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l
// DeepDerivative ignores extra entries in arrays which makes removing the last trigger not update things, so trigger and update any time the metrics count is different.
if len(hpa.Spec.Metrics) != len(foundHpa.Spec.Metrics) || !equality.Semantic.DeepDerivative(hpa.Spec, foundHpa.Spec) {
logger.V(1).Info("Found difference in the HPA spec accordint to ScaledObject", "currentHPA", foundHpa.Spec, "newHPA", hpa.Spec)
if r.Client.Update(context.TODO(), hpa) != nil {
if r.Client.Update(ctx, hpa) != nil {
foundHpa.Spec = hpa.Spec
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
return err
Expand All @@ -143,7 +143,7 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l

if !equality.Semantic.DeepDerivative(hpa.ObjectMeta.Labels, foundHpa.ObjectMeta.Labels) {
logger.V(1).Info("Found difference in the HPA labels accordint to ScaledObject", "currentHPA", foundHpa.ObjectMeta.Labels, "newHPA", hpa.ObjectMeta.Labels)
if r.Client.Update(context.TODO(), hpa) != nil {
if r.Client.Update(ctx, hpa) != nil {
foundHpa.ObjectMeta.Labels = hpa.ObjectMeta.Labels
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
return err
Expand Down Expand Up @@ -203,7 +203,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,

updateHealthStatus(scaledObject, externalMetricNames, status)

err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status)
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used externalMetricNames")
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Check if the ScaledJob instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if scaledJob.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledJob(reqLogger, scaledJob)
return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob)
}

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(reqLogger, scaledJob); err != nil {
if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil {
return ctrl.Result{}, err
}

// ensure Status Conditions are initialized
if !scaledJob.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, &conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
)

// finalizeScaledJob runs finalization logic on ScaledJob if there's finalizer
func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
if util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
// Run finalization logic for scaledJobFinalizer. If the
// finalization logic fails, don't remove the finalizer so
Expand All @@ -44,7 +44,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
// Remove scaledJobFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledJob.SetFinalizers(util.Remove(scaledJob.GetFinalizers(), scaledJobFinalizer))
if err := r.Client.Update(context.TODO(), scaledJob); err != nil {
if err := r.Client.Update(ctx, scaledJob); err != nil {
logger.Error(err, "Failed to update ScaledJob after removing a finalizer", "finalizer", scaledJobFinalizer)
return err
}
Expand All @@ -56,13 +56,13 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
}

// ensureFinalizer check there is finalizer present on the ScaledJob, if not it adds one
func (r *ScaledJobReconciler) ensureFinalizer(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
func (r *ScaledJobReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
if !util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
logger.Info("Adding Finalizer for the ScaledJob")
scaledJob.SetFinalizers(append(scaledJob.GetFinalizers(), scaledJobFinalizer))

// Update CR
err := r.Client.Update(context.TODO(), scaledJob)
err := r.Client.Update(ctx, scaledJob)
if err != nil {
logger.Error(err, "Failed to update ScaledJob with a finalizer", "finalizer", scaledJobFinalizer)
return err
Expand Down
26 changes: 13 additions & 13 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Check if the ScaledObject instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if scaledObject.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledObject(reqLogger, scaledObject)
return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject)
}

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil {
if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil {
return ctrl.Result{}, err
}

// ensure Status Conditions are initialized
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -191,7 +191,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -207,13 +207,13 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}

// Check the label needed for Metrics servers is present on ScaledObject
err := r.ensureScaledObjectLabel(logger, scaledObject)
err := r.ensureScaledObjectLabel(ctx, logger, scaledObject)
if err != nil {
return "Failed to update ScaledObject with scaledObjectName label", err
}

// Check if resource targeted for scaling exists and exposes /scale subresource
gvkr, err := r.checkTargetResourceIsScalable(logger, scaledObject)
gvkr, err := r.checkTargetResourceIsScalable(ctx, logger, scaledObject)
if err != nil {
return "ScaledObject doesn't have correct scaleTargetRef specification", err
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg

// ensureScaledObjectLabel ensures that scaledobject.keda.sh/name=<scaledObject.Name> label exist in the ScaledObject
// This is how the MetricsAdapter will know which ScaledObject a metric is for when the HPA queries it.
func (r *ScaledObjectReconciler) ensureScaledObjectLabel(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
const labelScaledObjectName = "scaledobject.keda.sh/name"

if scaledObject.Labels == nil {
Expand All @@ -265,11 +265,11 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(logger logr.Logger, sca
}

logger.V(1).Info("Adding \"scaledobject.keda.sh/name\" label on ScaledObject", "value", scaledObject.Name)
return r.Client.Update(context.TODO(), scaledObject)
return r.Client.Update(ctx, scaledObject)
}

// checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedautil.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
if err != nil {
logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
Expand All @@ -289,12 +289,12 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge
// not cached, let's try to detect /scale subresource
// also rechecks when we need to update the status.
var errScale error
scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
Expand All @@ -319,7 +319,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge
status.OriginalReplicaCount = &scale.Spec.Replicas
}

if err := kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status); err != nil {
if err := kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil {
return gvkr, err
}
logger.Info("Detected resource targeted for scaling", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
Expand Down Expand Up @@ -353,7 +353,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
// HPA wasn't found -> let's create a new one
err = r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
Expand Down
12 changes: 6 additions & 6 deletions controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
)

// finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer
func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
if util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
Expand All @@ -45,7 +45,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled

// if enabled, scale scaleTarget back to the original replica count (to the state it was before scaling with KEDA)
if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.RestoreToOriginalReplicaCount {
scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
logger.V(1).Info("Failed to get scaleTarget's scale status, because it was probably deleted", "error", err)
Expand All @@ -54,7 +54,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}
} else {
scale.Spec.Replicas = *scaledObject.Status.OriginalReplicaCount
_, err = r.scaleClient.Scales(scaledObject.Namespace).Update(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{})
_, err = r.scaleClient.Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Failed to restore scaleTarget's replica count back to the original", "finalizer", scaledObjectFinalizer)
}
Expand All @@ -65,7 +65,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
// Remove scaledObjectFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledObject.SetFinalizers(util.Remove(scaledObject.GetFinalizers(), scaledObjectFinalizer))
if err := r.Client.Update(context.TODO(), scaledObject); err != nil {
if err := r.Client.Update(ctx, scaledObject); err != nil {
logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer)
return err
}
Expand All @@ -77,13 +77,13 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}

// ensureFinalizer check there is finalizer present on the ScaledObject, if not it adds one
func (r *ScaledObjectReconciler) ensureFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
if !util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
logger.Info("Adding Finalizer for the ScaledObject")
scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer))

// Update CR
err := r.Client.Update(context.TODO(), scaledObject)
err := r.Client.Update(ctx, scaledObject)
if err != nil {
logger.Error(err, "Failed to update ScaledObject with a finalizer", "finalizer", scaledObjectFinalizer)
return err
Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/util/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// SetStatusConditions patches given object with passed list of conditions based on the object's type or returns an error.
func SetStatusConditions(client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error {
func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error {
var patch runtimeclient.Patch

runtimeObj := object.(runtimeclient.Object)
Expand All @@ -44,18 +44,18 @@ func SetStatusConditions(client runtimeclient.StatusClient, logger logr.Logger,
return err
}

err := client.Status().Patch(context.TODO(), runtimeObj, patch)
err := client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status with Conditions")
}
return err
}

// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error.
func UpdateScaledObjectStatus(client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())
scaledObject.Status = *status
err := client.Status().Patch(context.TODO(), scaledObject, patch)
err := client.Status().Patch(ctx, scaledObject, patch)
if err != nil {
logger.Error(err, "Failed to patch ScaledObjects Status")
}
Expand Down
Loading

0 comments on commit 2350679

Please sign in to comment.