From 882beeeb857eded13fbd42cedd5d5db3a6b33cf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 19 Jun 2024 10:09:06 +0200 Subject: [PATCH] Make EventPolicy EventHandler generic --- pkg/reconciler/eventpolicy.go | 112 ++++++++++++++++++ .../inmemorychannel/controller/controller.go | 88 +------------- 2 files changed, 117 insertions(+), 83 deletions(-) create mode 100644 pkg/reconciler/eventpolicy.go diff --git a/pkg/reconciler/eventpolicy.go b/pkg/reconciler/eventpolicy.go new file mode 100644 index 00000000000..16bd5fcf267 --- /dev/null +++ b/pkg/reconciler/eventpolicy.go @@ -0,0 +1,112 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "strings" +) + +// enqueueApplyingResourcesOfEventPolicy checks if the given GVK is referenced in the given EventPolicy. +// If so, it enqueues it into the enqueueFn(). +func enqueueApplyingResourcesOfEventPolicy(indexer cache.Indexer, gvk schema.GroupVersionKind, policyObj interface{}, enqueueFn func(key types.NamespacedName)) { + eventPolicy, ok := policyObj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + for _, to := range eventPolicy.Spec.To { + if to.Ref != nil { + toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) + if err != nil { + continue + } + + if strings.EqualFold(toGV.Group, gvk.Group) && + strings.EqualFold(to.Ref.Kind, gvk.Kind) { + + enqueueFn(types.NamespacedName{ + Namespace: eventPolicy.Namespace, + Name: to.Ref.Name, + }) + } + } + + if to.Selector != nil { + selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) + if err != nil { + continue + } + + if strings.EqualFold(selectorGV.Group, gvk.Group) && + strings.EqualFold(to.Selector.Kind, gvk.Kind) { + + selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) + if err != nil { + continue + } + + resources := []metav1.Object{} + err = cache.ListAllByNamespace(indexer, eventPolicy.Namespace, selector, func(i interface{}) { + resources = append(resources, i.(metav1.Object)) + }) + if err != nil { + continue + } + + for _, resource := range resources { + enqueueFn(types.NamespacedName{ + Namespace: resource.GetNamespace(), + Name: resource.GetName(), + }) + } + } + } + } +} + +// EventPolicyEventHandler returns an ResourceEventHandler, which enqueues the referencing resources of the EventPolicy +// if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK. +func EventPolicyEventHandler(indexer cache.Indexer, gvk schema.GroupVersionKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // Here we need to check if the old or the new EventPolicy was referencing the InMemoryChannel + + // make sure, we enqueue the keys only once + toEnqueue := map[types.NamespacedName]struct{}{} + addToEnqueueList := func(key types.NamespacedName) { + toEnqueue[key] = struct{}{} + } + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, oldObj, addToEnqueueList) + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, newObj, addToEnqueueList) + + for k := range toEnqueue { + enqueueFn(k) + } + }, + DeleteFunc: func(obj interface{}) { + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn) + }, + } +} diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index d4ffd55b960..2e7d59b3680 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -18,16 +18,10 @@ package controller import ( "context" - "strings" - "github.com/kelseyhightower/envconfig" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" - "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing/pkg/apis/messaging" - v1 "knative.dev/eventing/pkg/client/listers/messaging/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -149,24 +143,11 @@ func NewController( Handler: controller.HandleAll(globalResync), }) + imcGVK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel") + // Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing // or got updated and now is referencing the InMemoryChannel - eventPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - // Here we need to check if either the old or the new EventPolicy was referencing the InMemoryChannel - - alreadyEnqueued := enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), oldObj, impl.EnqueueKey) - if !alreadyEnqueued { - enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), newObj, impl.EnqueueKey) - } - }, - DeleteFunc: func(obj interface{}) { - enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey) - }, - }) + eventPolicyInformer.Informer().AddEventHandler(reconciler.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGVK, impl.EnqueueKey)) // Setup the watch on the config map of dispatcher config configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx)) @@ -175,62 +156,3 @@ func NewController( return impl } - -// enqueueApplyingChannelOfEventPolicy checks if an InMemoryChannel is referenced in the given EventPolicy. -// If so, it enqueues the channel into the enqueueFn() and returns true. -func enqueueApplyingChannelOfEventPolicy(imcLister v1.InMemoryChannelLister, obj interface{}, enqueueFn func(key types.NamespacedName)) bool { - eventPolicy, ok := obj.(*v1alpha1.EventPolicy) - if !ok { - return false - } - - for _, to := range eventPolicy.Spec.To { - if to.Ref != nil { - toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) - if err != nil { - return false - } - - if strings.EqualFold(toGV.Group, messaging.GroupName) && - strings.EqualFold(to.Ref.Kind, "InMemoryChannel") { - - enqueueFn(types.NamespacedName{ - Namespace: eventPolicy.Namespace, - Name: to.Ref.Name, - }) - return true - } - } - - if to.Selector != nil { - selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) - if err != nil { - return false - } - - if strings.EqualFold(selectorGV.Group, messaging.GroupName) && - strings.EqualFold(to.Selector.Kind, "InMemoryChannel") { - - selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) - if err != nil { - return false - } - - imcs, err := imcLister.InMemoryChannels(eventPolicy.Namespace).List(selector) - if err != nil { - return false - } - - for _, imc := range imcs { - enqueueFn(types.NamespacedName{ - Namespace: eventPolicy.Namespace, - Name: imc.Name, - }) - } - return true - } - } - } - - return false -}