From 0b7ac6d041731af88fad8d162441e35c8e406b1a Mon Sep 17 00:00:00 2001 From: Jan Wozniak Date: Fri, 11 Oct 2024 02:36:08 -0400 Subject: [PATCH] webhook: cache miss fallback to direct client for ScaledObject (#6186) Signed-off-by: Jan Wozniak Co-authored-by: Zbynek Roubalik --- CHANGELOG.md | 1 + apis/keda/v1alpha1/scaledobject_webhook.go | 38 +++++++++++++++++++--- apis/keda/v1alpha1/suite_test.go | 2 +- cmd/webhooks/main.go | 8 +++-- 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2790b3ae0ab..deab8a04db4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New +- **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524)) diff --git a/apis/keda/v1alpha1/scaledobject_webhook.go b/apis/keda/v1alpha1/scaledobject_webhook.go index b3602d16739..b96c984445b 100644 --- a/apis/keda/v1alpha1/scaledobject_webhook.go +++ b/apis/keda/v1alpha1/scaledobject_webhook.go @@ -29,6 +29,7 @@ import ( appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -44,14 +45,31 @@ import ( var scaledobjectlog = logf.Log.WithName("scaledobject-validation-webhook") var kc client.Client +var cacheMissToDirectClient bool +var directClient client.Client var restMapper meta.RESTMapper var memoryString = "memory" var cpuString = "cpu" -func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager) error { +func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager, cacheMissFallback bool) error { kc = mgr.GetClient() restMapper = mgr.GetRESTMapper() + cacheMissToDirectClient = cacheMissFallback + if cacheMissToDirectClient { + cfg := mgr.GetConfig() + opts := client.Options{ + HTTPClient: mgr.GetHTTPClient(), + Scheme: mgr.GetScheme(), + Mapper: restMapper, + Cache: nil, // this disables the cache and explicitly uses the direct client + } + var err error + directClient, err = client.New(cfg, opts) + if err != nil { + return fmt.Errorf("failed to initialize direct client: %w", err) + } + } return ctrl.NewWebhookManagedBy(mgr). WithValidator(&ScaledObjectCustomValidator{}). For(so). @@ -312,6 +330,18 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error return nil } +// getFromCacheOrDirect is a helper function that tries to get an object from the cache +// if it fails, it tries to get it from the direct client +func getFromCacheOrDirect(ctx context.Context, key client.ObjectKey, obj client.Object) error { + err := kc.Get(ctx, key, obj, &client.GetOptions{}) + if cacheMissToDirectClient { + if kerrors.IsNotFound(err) { + return directClient.Get(ctx, key, obj, &client.GetOptions{}) + } + } + return err +} + func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool) error { if dryRun { return nil @@ -334,15 +364,13 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool switch incomingSoGckr.GVKString() { case "apps/v1.Deployment": deployment := &appsv1.Deployment{} - err := kc.Get(context.Background(), key, deployment, &client.GetOptions{}) - if err != nil { + if err := getFromCacheOrDirect(context.Background(), key, deployment); err != nil { return err } podSpec = &deployment.Spec.Template.Spec case "apps/v1.StatefulSet": statefulset := &appsv1.StatefulSet{} - err := kc.Get(context.Background(), key, statefulset, &client.GetOptions{}) - if err != nil { + if err := getFromCacheOrDirect(context.Background(), key, statefulset); err != nil { return err } podSpec = &statefulset.Spec.Template.Spec diff --git a/apis/keda/v1alpha1/suite_test.go b/apis/keda/v1alpha1/suite_test.go index ce53cc62682..e595508cbdf 100644 --- a/apis/keda/v1alpha1/suite_test.go +++ b/apis/keda/v1alpha1/suite_test.go @@ -118,7 +118,7 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - err = (&ScaledObject{}).SetupWebhookWithManager(mgr) + err = (&ScaledObject{}).SetupWebhookWithManager(mgr, false) Expect(err).NotTo(HaveOccurred()) err = (&ScaledJob{}).SetupWebhookWithManager(mgr) Expect(err).NotTo(HaveOccurred()) diff --git a/cmd/webhooks/main.go b/cmd/webhooks/main.go index 46a80a3955e..56c03eb1b00 100644 --- a/cmd/webhooks/main.go +++ b/cmd/webhooks/main.go @@ -62,6 +62,7 @@ func main() { var webhooksClientRequestBurst int var certDir string var webhooksPort int + var cacheMissToDirectClient bool pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -70,6 +71,7 @@ func main() { pflag.IntVar(&webhooksClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver") pflag.StringVar(&certDir, "cert-dir", "/certs", "Webhook certificates dir to use. Defaults to /certs") pflag.IntVar(&webhooksPort, "port", 9443, "Port number to serve webhooks. Defaults to 9443") + pflag.BoolVar(&cacheMissToDirectClient, "cache-miss-to-direct-client", false, "If true, on cache misses the webhook will call the direct client to fetch the object") opts := zap.Options{} opts.BindFlags(flag.CommandLine) @@ -117,7 +119,7 @@ func main() { kedautil.PrintWelcome(setupLog, kubeVersion, "admission webhooks") - setupWebhook(mgr) + setupWebhook(mgr, cacheMissToDirectClient) if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") @@ -134,9 +136,9 @@ func main() { } } -func setupWebhook(mgr manager.Manager) { +func setupWebhook(mgr manager.Manager, cacheMissToDirectClient bool) { // setup webhooks - if err := (&kedav1alpha1.ScaledObject{}).SetupWebhookWithManager(mgr); err != nil { + if err := (&kedav1alpha1.ScaledObject{}).SetupWebhookWithManager(mgr, cacheMissToDirectClient); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "ScaledObject") os.Exit(1) }