From ef00b89935e8992cef712511a669f7eea84bdcbe Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Wed, 3 Nov 2021 13:57:40 +0100 Subject: [PATCH 1/2] Properly propagate context Signed-off-by: Zbynek Roubalik --- .../generated/openapi/zz_generated.openapi.go | 1 + apis/keda/v1alpha1/zz_generated.deepcopy.go | 1 + controllers/keda/hpa.go | 8 +++--- controllers/keda/scaledjob_controller.go | 8 +++--- controllers/keda/scaledjob_finalizer.go | 8 +++--- controllers/keda/scaledobject_controller.go | 26 +++++++++---------- controllers/keda/scaledobject_finalizer.go | 12 ++++----- controllers/keda/util/status.go | 8 +++--- pkg/provider/fallback.go | 12 ++++----- pkg/provider/fallback_test.go | 19 +++++++------- pkg/provider/provider.go | 2 +- pkg/scalers/azure_eventhub_scaler.go | 4 +-- pkg/scalers/external_scaler.go | 4 +-- pkg/scalers/mongo_scaler.go | 4 +-- 14 files changed, 60 insertions(+), 57 deletions(-) diff --git a/adapter/generated/openapi/zz_generated.openapi.go b/adapter/generated/openapi/zz_generated.openapi.go index fd629cb2226..c00b62da2c3 100644 --- a/adapter/generated/openapi/zz_generated.openapi.go +++ b/adapter/generated/openapi/zz_generated.openapi.go @@ -1,3 +1,4 @@ +//go:build !autogenerated // +build !autogenerated /* diff --git a/apis/keda/v1alpha1/zz_generated.deepcopy.go b/apis/keda/v1alpha1/zz_generated.deepcopy.go index 53d00a01491..d6e286bf9e8 100644 --- a/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/controllers/keda/hpa.go b/controllers/keda/hpa.go index 3342f27b00e..302098a5674 100644 --- a/controllers/keda/hpa.go +++ b/controllers/keda/hpa.go @@ -49,7 +49,7 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logg return err } - err = r.Client.Create(context.TODO(), hpa) + err = r.Client.Create(ctx, hpa) if err != nil { logger.Error(err, "Failed to create new HPA in cluster", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", hpaName) return err @@ -130,7 +130,7 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l // DeepDerivative ignores extra entries in arrays which makes removing the last trigger not update things, so trigger and update any time the metrics count is different. if len(hpa.Spec.Metrics) != len(foundHpa.Spec.Metrics) || !equality.Semantic.DeepDerivative(hpa.Spec, foundHpa.Spec) { logger.V(1).Info("Found difference in the HPA spec accordint to ScaledObject", "currentHPA", foundHpa.Spec, "newHPA", hpa.Spec) - if r.Client.Update(context.TODO(), hpa) != nil { + if r.Client.Update(ctx, hpa) != nil { foundHpa.Spec = hpa.Spec logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name) return err @@ -143,7 +143,7 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l if !equality.Semantic.DeepDerivative(hpa.ObjectMeta.Labels, foundHpa.ObjectMeta.Labels) { logger.V(1).Info("Found difference in the HPA labels accordint to ScaledObject", "currentHPA", foundHpa.ObjectMeta.Labels, "newHPA", hpa.ObjectMeta.Labels) - if r.Client.Update(context.TODO(), hpa) != nil { + if r.Client.Update(ctx, hpa) != nil { foundHpa.ObjectMeta.Labels = hpa.ObjectMeta.Labels logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name) return err @@ -203,7 +203,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, updateHealthStatus(scaledObject, externalMetricNames, status) - err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status) + err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status) if err != nil { logger.Error(err, "Error updating scaledObject status with used externalMetricNames") return nil, err diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index d900228b62d..05c7fb6866e 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -87,18 +87,18 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // Check if the ScaledJob instance is marked to be deleted, which is // indicated by the deletion timestamp being set. if scaledJob.GetDeletionTimestamp() != nil { - return ctrl.Result{}, r.finalizeScaledJob(reqLogger, scaledJob) + return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob) } // ensure finalizer is set on this CR - if err := r.ensureFinalizer(reqLogger, scaledJob); err != nil { + if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil { return ctrl.Result{}, err } // ensure Status Conditions are initialized if !scaledJob.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() - if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, conditions); err != nil { + if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil { return ctrl.Result{}, err } } @@ -126,7 +126,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) } - if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, &conditions); err != nil { + if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, err diff --git a/controllers/keda/scaledjob_finalizer.go b/controllers/keda/scaledjob_finalizer.go index 435d37064d5..f78591e98b9 100644 --- a/controllers/keda/scaledjob_finalizer.go +++ b/controllers/keda/scaledjob_finalizer.go @@ -32,7 +32,7 @@ const ( ) // finalizeScaledJob runs finalization logic on ScaledJob if there's finalizer -func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { +func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { if util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) { // Run finalization logic for scaledJobFinalizer. If the // finalization logic fails, don't remove the finalizer so @@ -44,7 +44,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k // Remove scaledJobFinalizer. Once all finalizers have been // removed, the object will be deleted. scaledJob.SetFinalizers(util.Remove(scaledJob.GetFinalizers(), scaledJobFinalizer)) - if err := r.Client.Update(context.TODO(), scaledJob); err != nil { + if err := r.Client.Update(ctx, scaledJob); err != nil { logger.Error(err, "Failed to update ScaledJob after removing a finalizer", "finalizer", scaledJobFinalizer) return err } @@ -56,13 +56,13 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k } // ensureFinalizer check there is finalizer present on the ScaledJob, if not it adds one -func (r *ScaledJobReconciler) ensureFinalizer(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { +func (r *ScaledJobReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { if !util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) { logger.Info("Adding Finalizer for the ScaledJob") scaledJob.SetFinalizers(append(scaledJob.GetFinalizers(), scaledJobFinalizer)) // Update CR - err := r.Client.Update(context.TODO(), scaledJob) + err := r.Client.Update(ctx, scaledJob) if err != nil { logger.Error(err, "Failed to update ScaledJob with a finalizer", "finalizer", scaledJobFinalizer) return err diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 60926bec0aa..a9fbdfa809c 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -158,18 +158,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request // Check if the ScaledObject instance is marked to be deleted, which is // indicated by the deletion timestamp being set. if scaledObject.GetDeletionTimestamp() != nil { - return ctrl.Result{}, r.finalizeScaledObject(reqLogger, scaledObject) + return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject) } // ensure finalizer is set on this CR - if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil { + if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil { return ctrl.Result{}, err } // ensure Status Conditions are initialized if !scaledObject.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() - if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, conditions); err != nil { + if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil { return ctrl.Result{}, err } } @@ -191,7 +191,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg) } - if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil { + if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil { return ctrl.Result{}, err } @@ -207,13 +207,13 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg } // Check the label needed for Metrics servers is present on ScaledObject - err := r.ensureScaledObjectLabel(logger, scaledObject) + err := r.ensureScaledObjectLabel(ctx, logger, scaledObject) if err != nil { return "Failed to update ScaledObject with scaledObjectName label", err } // Check if resource targeted for scaling exists and exposes /scale subresource - gvkr, err := r.checkTargetResourceIsScalable(logger, scaledObject) + gvkr, err := r.checkTargetResourceIsScalable(ctx, logger, scaledObject) if err != nil { return "ScaledObject doesn't have correct scaleTargetRef specification", err } @@ -251,7 +251,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg // ensureScaledObjectLabel ensures that scaledobject.keda.sh/name= label exist in the ScaledObject // This is how the MetricsAdapter will know which ScaledObject a metric is for when the HPA queries it. -func (r *ScaledObjectReconciler) ensureScaledObjectLabel(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { +func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { const labelScaledObjectName = "scaledobject.keda.sh/name" if scaledObject.Labels == nil { @@ -265,11 +265,11 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(logger logr.Logger, sca } logger.V(1).Info("Adding \"scaledobject.keda.sh/name\" label on ScaledObject", "value", scaledObject.Name) - return r.Client.Update(context.TODO(), scaledObject) + return r.Client.Update(ctx, scaledObject) } // checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource -func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) { +func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) { gvkr, err := kedautil.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind) if err != nil { logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind) @@ -289,12 +289,12 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge // not cached, let's try to detect /scale subresource // also rechecks when we need to update the status. var errScale error - scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) if errScale != nil { // not able to get /scale subresource -> let's check if the resource even exist in the cluster unstruct := &unstructured.Unstructured{} unstruct.SetGroupVersionKind(gvkr.GroupVersionKind()) - if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil { + if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil { // resource doesn't exist logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) return gvkr, err @@ -319,7 +319,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge status.OriginalReplicaCount = &scale.Spec.Replicas } - if err := kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status); err != nil { + if err := kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil { return gvkr, err } logger.Info("Detected resource targeted for scaling", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) @@ -353,7 +353,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont hpaName := getHPAName(scaledObject) foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} // Check if HPA for this ScaledObject already exists - err := r.Client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa) + err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa) if err != nil && errors.IsNotFound(err) { // HPA wasn't found -> let's create a new one err = r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr) diff --git a/controllers/keda/scaledobject_finalizer.go b/controllers/keda/scaledobject_finalizer.go index 81f7bcec8a3..5ca4a7684d5 100644 --- a/controllers/keda/scaledobject_finalizer.go +++ b/controllers/keda/scaledobject_finalizer.go @@ -34,7 +34,7 @@ const ( ) // finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer -func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { +func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { if util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) { // Run finalization logic for scaledObjectFinalizer. If the // finalization logic fails, don't remove the finalizer so @@ -45,7 +45,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled // if enabled, scale scaleTarget back to the original replica count (to the state it was before scaling with KEDA) if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.RestoreToOriginalReplicaCount { - scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { logger.V(1).Info("Failed to get scaleTarget's scale status, because it was probably deleted", "error", err) @@ -54,7 +54,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled } } else { scale.Spec.Replicas = *scaledObject.Status.OriginalReplicaCount - _, err = r.scaleClient.Scales(scaledObject.Namespace).Update(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) + _, err = r.scaleClient.Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) if err != nil { logger.Error(err, "Failed to restore scaleTarget's replica count back to the original", "finalizer", scaledObjectFinalizer) } @@ -65,7 +65,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled // Remove scaledObjectFinalizer. Once all finalizers have been // removed, the object will be deleted. scaledObject.SetFinalizers(util.Remove(scaledObject.GetFinalizers(), scaledObjectFinalizer)) - if err := r.Client.Update(context.TODO(), scaledObject); err != nil { + if err := r.Client.Update(ctx, scaledObject); err != nil { logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer) return err } @@ -77,13 +77,13 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled } // ensureFinalizer check there is finalizer present on the ScaledObject, if not it adds one -func (r *ScaledObjectReconciler) ensureFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { +func (r *ScaledObjectReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { if !util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) { logger.Info("Adding Finalizer for the ScaledObject") scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer)) // Update CR - err := r.Client.Update(context.TODO(), scaledObject) + err := r.Client.Update(ctx, scaledObject) if err != nil { logger.Error(err, "Failed to update ScaledObject with a finalizer", "finalizer", scaledObjectFinalizer) return err diff --git a/controllers/keda/util/status.go b/controllers/keda/util/status.go index f394b59b12a..3e36d848c74 100644 --- a/controllers/keda/util/status.go +++ b/controllers/keda/util/status.go @@ -27,7 +27,7 @@ import ( ) // SetStatusConditions patches given object with passed list of conditions based on the object's type or returns an error. -func SetStatusConditions(client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error { +func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error { var patch runtimeclient.Patch runtimeObj := object.(runtimeclient.Object) @@ -44,7 +44,7 @@ func SetStatusConditions(client runtimeclient.StatusClient, logger logr.Logger, return err } - err := client.Status().Patch(context.TODO(), runtimeObj, patch) + err := client.Status().Patch(ctx, runtimeObj, patch) if err != nil { logger.Error(err, "Failed to patch Objects Status with Conditions") } @@ -52,10 +52,10 @@ func SetStatusConditions(client runtimeclient.StatusClient, logger logr.Logger, } // UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error. -func UpdateScaledObjectStatus(client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error { +func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error { patch := runtimeclient.MergeFrom(scaledObject.DeepCopy()) scaledObject.Status = *status - err := client.Status().Patch(context.TODO(), scaledObject, patch) + err := client.Status().Patch(ctx, scaledObject, patch) if err != nil { logger.Error(err, "Failed to patch ScaledObjects Status") } diff --git a/pkg/provider/fallback.go b/pkg/provider/fallback.go index b994fa528a3..8e186d685ed 100644 --- a/pkg/provider/fallback.go +++ b/pkg/provider/fallback.go @@ -35,11 +35,11 @@ func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2bet return scaledObject.Spec.Fallback != nil && metricSpec.External.Target.Type == v2beta2.AverageValueMetricType } -func (p *KedaProvider) getMetricsWithFallback(scaler scalers.Scaler, metricName string, metricSelector labels.Selector, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { +func (p *KedaProvider) getMetricsWithFallback(ctx context.Context, scaler scalers.Scaler, metricName string, metricSelector labels.Selector, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { status := scaledObject.Status.DeepCopy() initHealthStatus(status) - metrics, err := scaler.GetMetrics(context.TODO(), metricName, metricSelector) + metrics, err := scaler.GetMetrics(ctx, metricName, metricSelector) healthStatus := getHealthStatus(status, metricName) if err == nil { @@ -48,7 +48,7 @@ func (p *KedaProvider) getMetricsWithFallback(scaler scalers.Scaler, metricName healthStatus.Status = kedav1alpha1.HealthStatusHappy status.Health[metricName] = *healthStatus - p.updateStatus(scaledObject, status, metricSpec) + p.updateStatus(ctx, scaledObject, status, metricSpec) return metrics, nil } @@ -56,7 +56,7 @@ func (p *KedaProvider) getMetricsWithFallback(scaler scalers.Scaler, metricName *healthStatus.NumberOfFailures++ status.Health[metricName] = *healthStatus - p.updateStatus(scaledObject, status, metricSpec) + p.updateStatus(ctx, scaledObject, status, metricSpec) switch { case !isFallbackEnabled(scaledObject, metricSpec): @@ -104,7 +104,7 @@ func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.Metr return fallbackMetrics } -func (p *KedaProvider) updateStatus(scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus, metricSpec v2beta2.MetricSpec) { +func (p *KedaProvider) updateStatus(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus, metricSpec v2beta2.MetricSpec) { patch := runtimeclient.MergeFrom(scaledObject.DeepCopy()) if fallbackExistsInScaledObject(scaledObject, metricSpec) { @@ -114,7 +114,7 @@ func (p *KedaProvider) updateStatus(scaledObject *kedav1alpha1.ScaledObject, sta } scaledObject.Status = *status - err := p.client.Status().Patch(context.TODO(), scaledObject, patch) + err := p.client.Status().Patch(ctx, scaledObject, patch) if err != nil { logger.Error(err, "Failed to patch ScaledObjects Status") } diff --git a/pkg/provider/fallback_test.go b/pkg/provider/fallback_test.go index 64b7690015d..8646ff598ca 100644 --- a/pkg/provider/fallback_test.go +++ b/pkg/provider/fallback_test.go @@ -17,6 +17,7 @@ limitations under the License. package provider import ( + "context" "errors" "fmt" "testing" @@ -86,7 +87,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -116,7 +117,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -131,7 +132,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -159,7 +160,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -188,7 +189,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -243,7 +244,7 @@ var _ = Describe("fallback", func() { statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Some error")) client.EXPECT().Status().Return(statusWriter) - metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -272,7 +273,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -305,7 +306,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ToNot(HaveOccurred()) condition := so.Status.Conditions.GetFallbackCondition() Expect(condition.IsTrue()).Should(BeTrue()) @@ -338,7 +339,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) condition := so.Status.Conditions.GetFallbackCondition() diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 2b2b663df09..b306e32ce3f 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -110,7 +110,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, err := p.getMetricsWithFallback(scaler, info.Metric, metricSelector, scaledObject, metricSpec) + metrics, err := p.getMetricsWithFallback(ctx, scaler, info.Metric, metricSelector, scaledObject, metricSpec) if err != nil { logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scaler) diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 566e8ee96d1..c7968f67047 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -319,9 +319,9 @@ func getTotalLagRelatedToPartitionAmount(unprocessedEventsCount int64, partition } // Close closes Azure Event Hub Scaler -func (scaler *azureEventHubScaler) Close(context.Context) error { +func (scaler *azureEventHubScaler) Close(ctx context.Context) error { if scaler.client != nil { - err := scaler.client.Close(context.TODO()) + err := scaler.client.Close(ctx) if err != nil { eventhubLog.Error(err, "error closing azure event hub client") return err diff --git a/pkg/scalers/external_scaler.go b/pkg/scalers/external_scaler.go index e25a03358d2..6a7f8693b41 100644 --- a/pkg/scalers/external_scaler.go +++ b/pkg/scalers/external_scaler.go @@ -138,7 +138,7 @@ func (s *externalScaler) Close(context.Context) error { } // GetMetricSpecForScaling returns the metric spec for the HPA -func (s *externalScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { +func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec { var result []v2beta2.MetricSpec grpcClient, done, err := getClientForConnectionPool(s.metadata) @@ -148,7 +148,7 @@ func (s *externalScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metr } defer done() - response, err := grpcClient.GetMetricSpec(context.TODO(), &s.scaledObjectRef) + response, err := grpcClient.GetMetricSpec(ctx, &s.scaledObjectRef) if err != nil { externalLog.Error(err, "error") return nil diff --git a/pkg/scalers/mongo_scaler.go b/pkg/scalers/mongo_scaler.go index 6387f6d6f83..bddd95dc1e9 100644 --- a/pkg/scalers/mongo_scaler.go +++ b/pkg/scalers/mongo_scaler.go @@ -199,9 +199,9 @@ func (s *mongoDBScaler) IsActive(ctx context.Context) (bool, error) { } // Close disposes of mongoDB connections -func (s *mongoDBScaler) Close(context.Context) error { +func (s *mongoDBScaler) Close(ctx context.Context) error { if s.client != nil { - err := s.client.Disconnect(context.TODO()) + err := s.client.Disconnect(ctx) if err != nil { mongoDBLog.Error(err, fmt.Sprintf("failed to close mongoDB connection, because of %v", err)) return err From 8ddf63a825a5c6f5bc9cdb661cd2fbd14a7a789a Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Wed, 3 Nov 2021 14:30:55 +0100 Subject: [PATCH 2/2] update changelog Signed-off-by: Zbynek Roubalik --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1b816eb2b6..b4096e75100 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,7 +56,7 @@ ### Other -- Ensure that `context.Context` values are passed down the stack from all scaler gRPC handler implementation to scaler implementation code ([#2202](https://github.com/kedacore/keda/pull/2202)) +- Ensure that `context.Context` values are properly passed down the stack ([#2202](https://github.com/kedacore/keda/pull/2202)|[#2249](https://github.com/kedacore/keda/pull/2249)) - Migrate to Kubebuilder v3 ([#2082](https://github.com/kedacore/keda/pull/2082)) - API path has been changed: `github.com/kedacore/keda/v2/api/v1alpha1` -> `github.com/kedacore/keda/v2/apis/keda/v1alpha1` - Use Patch to set FallbackCondition on ScaledObject.Status ([#2037](https://github.com/kedacore/keda/pull/2037))