Skip to content

Commit

Permalink
webhook: cache miss fallback to direct client for ScaledObject (#6186)
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com>
Co-authored-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
wozniakjan and zroubalik authored Oct 11, 2024
1 parent 584202f commit 0b7ac6d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973))
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524))
Expand Down
38 changes: 33 additions & 5 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -44,14 +45,31 @@ import (
var scaledobjectlog = logf.Log.WithName("scaledobject-validation-webhook")

var kc client.Client
var cacheMissToDirectClient bool
var directClient client.Client
var restMapper meta.RESTMapper

var memoryString = "memory"
var cpuString = "cpu"

func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager) error {
func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager, cacheMissFallback bool) error {
kc = mgr.GetClient()
restMapper = mgr.GetRESTMapper()
cacheMissToDirectClient = cacheMissFallback
if cacheMissToDirectClient {
cfg := mgr.GetConfig()
opts := client.Options{
HTTPClient: mgr.GetHTTPClient(),
Scheme: mgr.GetScheme(),
Mapper: restMapper,
Cache: nil, // this disables the cache and explicitly uses the direct client
}
var err error
directClient, err = client.New(cfg, opts)
if err != nil {
return fmt.Errorf("failed to initialize direct client: %w", err)
}
}
return ctrl.NewWebhookManagedBy(mgr).
WithValidator(&ScaledObjectCustomValidator{}).
For(so).
Expand Down Expand Up @@ -312,6 +330,18 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error
return nil
}

// getFromCacheOrDirect is a helper function that tries to get an object from the cache
// if it fails, it tries to get it from the direct client
func getFromCacheOrDirect(ctx context.Context, key client.ObjectKey, obj client.Object) error {
err := kc.Get(ctx, key, obj, &client.GetOptions{})
if cacheMissToDirectClient {
if kerrors.IsNotFound(err) {
return directClient.Get(ctx, key, obj, &client.GetOptions{})
}
}
return err
}

func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool) error {
if dryRun {
return nil
Expand All @@ -334,15 +364,13 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool
switch incomingSoGckr.GVKString() {
case "apps/v1.Deployment":
deployment := &appsv1.Deployment{}
err := kc.Get(context.Background(), key, deployment, &client.GetOptions{})
if err != nil {
if err := getFromCacheOrDirect(context.Background(), key, deployment); err != nil {
return err
}
podSpec = &deployment.Spec.Template.Spec
case "apps/v1.StatefulSet":
statefulset := &appsv1.StatefulSet{}
err := kc.Get(context.Background(), key, statefulset, &client.GetOptions{})
if err != nil {
if err := getFromCacheOrDirect(context.Background(), key, statefulset); err != nil {
return err
}
podSpec = &statefulset.Spec.Template.Spec
Expand Down
2 changes: 1 addition & 1 deletion apis/keda/v1alpha1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = BeforeSuite(func() {
})
Expect(err).NotTo(HaveOccurred())

err = (&ScaledObject{}).SetupWebhookWithManager(mgr)
err = (&ScaledObject{}).SetupWebhookWithManager(mgr, false)
Expect(err).NotTo(HaveOccurred())
err = (&ScaledJob{}).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
Expand Down
8 changes: 5 additions & 3 deletions cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func main() {
var webhooksClientRequestBurst int
var certDir string
var webhooksPort int
var cacheMissToDirectClient bool

pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -70,6 +71,7 @@ func main() {
pflag.IntVar(&webhooksClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver")
pflag.StringVar(&certDir, "cert-dir", "/certs", "Webhook certificates dir to use. Defaults to /certs")
pflag.IntVar(&webhooksPort, "port", 9443, "Port number to serve webhooks. Defaults to 9443")
pflag.BoolVar(&cacheMissToDirectClient, "cache-miss-to-direct-client", false, "If true, on cache misses the webhook will call the direct client to fetch the object")

opts := zap.Options{}
opts.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -117,7 +119,7 @@ func main() {

kedautil.PrintWelcome(setupLog, kubeVersion, "admission webhooks")

setupWebhook(mgr)
setupWebhook(mgr, cacheMissToDirectClient)

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
Expand All @@ -134,9 +136,9 @@ func main() {
}
}

func setupWebhook(mgr manager.Manager) {
func setupWebhook(mgr manager.Manager, cacheMissToDirectClient bool) {
// setup webhooks
if err := (&kedav1alpha1.ScaledObject{}).SetupWebhookWithManager(mgr); err != nil {
if err := (&kedav1alpha1.ScaledObject{}).SetupWebhookWithManager(mgr, cacheMissToDirectClient); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "ScaledObject")
os.Exit(1)
}
Expand Down

0 comments on commit 0b7ac6d

Please sign in to comment.