diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index b3a8227125..733420392b 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -17,9 +18,9 @@ import ( ) // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Kind struct { +type Kind[T client.Object] struct { // Type is the type of object to watch. e.g. &v1.Pod{} - Type client.Object + Type T // Cache used to watch APIs Cache cache.Cache @@ -32,9 +33,9 @@ type Kind struct { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, +func (ks *Kind[T]) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { - if ks.Type == nil { + if reflect.DeepEqual(ks.Type, *new(T)) { return fmt.Errorf("must create Kind with a non-nil object") } if ks.Cache == nil { @@ -79,7 +80,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return } - _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) + _, err := i.AddEventHandler(NewObjectEventHandler[T](ctx, queue, handler, prct).HandlerFuncs()) if err != nil { ks.started <- err return @@ -94,8 +95,8 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return nil } -func (ks *Kind) String() string { - if ks.Type != nil { +func (ks *Kind[T]) String() string { + if !reflect.DeepEqual(ks.Type, *new(T)) { return fmt.Sprintf("kind source: %T", ks.Type) } return "kind source: unknown type" @@ -103,7 +104,7 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. -func (ks *Kind) WaitForSync(ctx context.Context) error { +func (ks *Kind[T]) WaitForSync(ctx context.Context) error { select { case err := <-ks.started: return err diff --git a/pkg/internal/source/object_event_handler.go b/pkg/internal/source/object_event_handler.go new file mode 100644 index 0000000000..0020fa59f7 --- /dev/null +++ b/pkg/internal/source/object_event_handler.go @@ -0,0 +1,167 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "fmt" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// NewObjectEventHandler creates a new TypedEventHandler. +func NewObjectEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *ObjectEventHandler[T] { + return &ObjectEventHandler[T]{ + ctx: ctx, + handler: handler, + queue: queue, + predicates: predicates, + } +} + +// ObjectEventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. +type ObjectEventHandler[T client.Object] struct { + // ctx stores the context that created the event handler + // that is used to propagate cancellation signals to each handler function. + ctx context.Context + + handler handler.EventHandler + queue workqueue.RateLimitingInterface + predicates []predicate.Predicate +} + +// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs +// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 +func (e *ObjectEventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: e.OnAdd, + UpdateFunc: e.OnUpdate, + DeleteFunc: e.OnDelete, + } +} + +// OnAdd creates CreateEvent and calls Create on EventHandler. +func (e *ObjectEventHandler[T]) OnAdd(obj interface{}) { + c := event.CreateEvent{} + + // Pull Object out of the object + if o, ok := obj.(T); ok { + c.Object = o + } else { + log.Error(nil, "OnAdd missing Object", + "object", obj, "type", fmt.Sprintf("%T", obj)) + return + } + + for _, p := range e.predicates { + if !p.Create(c) { + return + } + } + + // Invoke create handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Create(ctx, c, e.queue) +} + +// OnUpdate creates UpdateEvent and calls Update on EventHandler. +func (e *ObjectEventHandler[T]) OnUpdate(oldObj, newObj interface{}) { + u := event.UpdateEvent{} + + if o, ok := oldObj.(T); ok { + u.ObjectOld = o + } else { + log.Error(nil, "OnUpdate missing ObjectOld", + "object", oldObj, "type", fmt.Sprintf("%T", oldObj)) + return + } + + // Pull Object out of the object + if o, ok := newObj.(T); ok { + u.ObjectNew = o + } else { + log.Error(nil, "OnUpdate missing ObjectNew", + "object", newObj, "type", fmt.Sprintf("%T", newObj)) + return + } + + for _, p := range e.predicates { + if !p.Update(u) { + return + } + } + + // Invoke update handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Update(ctx, u, e.queue) +} + +// OnDelete creates DeleteEvent and calls Delete on EventHandler. +func (e *ObjectEventHandler[T]) OnDelete(obj interface{}) { + d := event.DeleteEvent{} + + // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a + // DeleteFinalStateUnknown struct, so the object needs to be pulled out. + // Copied from sample-controller + // This should never happen if we aren't missing events, which we have concluded that we are not + // and made decisions off of this belief. Maybe this shouldn't be here? + var ok bool + if _, ok = obj.(T); !ok { + // If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + log.Error(nil, "Error decoding objects. Expected cache.DeletedFinalStateUnknown", + "type", fmt.Sprintf("%T", obj), + "object", obj) + return + } + + // Set DeleteStateUnknown to true + d.DeleteStateUnknown = true + + // Set obj to the tombstone obj + obj = tombstone.Obj + } + + // Pull Object out of the object + if o, ok := obj.(T); ok { + d.Object = o + } else { + log.Error(nil, "OnDelete missing Object", + "object", obj, "type", fmt.Sprintf("%T", obj)) + return + } + + for _, p := range e.predicates { + if !p.Delete(d) { + return + } + } + + // Invoke delete handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Delete(ctx, d, e.queue) +} diff --git a/pkg/internal/source/object_event_handler_test.go b/pkg/internal/source/object_event_handler_test.go new file mode 100644 index 0000000000..f58d6314f3 --- /dev/null +++ b/pkg/internal/source/object_event_handler_test.go @@ -0,0 +1,276 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +var _ = Describe("Internal", func() { + var ctx = context.Background() + var instance *internal.ObjectEventHandler[*corev1.Pod] + var funcs, setfuncs *handler.Funcs + var set bool + BeforeEach(func() { + funcs = &handler.Funcs{ + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Did not expect CreateEvent to be called.") + }, + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Did not expect DeleteEvent to be called.") + }, + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Did not expect UpdateEvent to be called.") + }, + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Did not expect GenericEvent to be called.") + }, + } + + setfuncs = &handler.Funcs{ + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + set = true + }, + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + set = true + }, + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + set = true + }, + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + set = true + }, + } + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, funcs, nil) + }) + + Describe("EventHandler", func() { + var pod, newPod *corev1.Pod + + BeforeEach(func() { + pod = &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "test"}}, + }, + } + newPod = pod.DeepCopy() + newPod.Labels = map[string]string{"foo": "bar"} + }) + + It("should create a CreateEvent", func() { + funcs.CreateFunc = func(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(evt.Object).To(Equal(pod)) + } + instance.OnAdd(pod) + }) + + It("should used Predicates to filter CreateEvents", func() { + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, + }) + set = false + instance.OnAdd(pod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, + }) + instance.OnAdd(pod) + Expect(set).To(BeTrue()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, + }) + instance.OnAdd(pod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, + }) + instance.OnAdd(pod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, + }) + instance.OnAdd(pod) + Expect(set).To(BeTrue()) + }) + + It("should not call Create EventHandler if the object is not a runtime.Object", func() { + instance.OnAdd(&metav1.ObjectMeta{}) + }) + + It("should not call Create EventHandler if an object is not 'that' object", func() { + instance.OnAdd(&corev1.Secret{}) + }) + + It("should create an UpdateEvent", func() { + funcs.UpdateFunc = func(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(evt.ObjectOld).To(Equal(pod)) + Expect(evt.ObjectNew).To(Equal(newPod)) + } + instance.OnUpdate(pod, newPod) + }) + + It("should used Predicates to filter UpdateEvents", func() { + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }}, + }) + instance.OnUpdate(pod, newPod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, + }) + instance.OnUpdate(pod, newPod) + Expect(set).To(BeTrue()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, + predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, + }) + instance.OnUpdate(pod, newPod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, + predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, + }) + instance.OnUpdate(pod, newPod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, + predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, + }) + instance.OnUpdate(pod, newPod) + Expect(set).To(BeTrue()) + }) + + It("should not call Update EventHandler if the object is not a runtime.Object", func() { + instance.OnUpdate(&metav1.ObjectMeta{}, &corev1.Pod{}) + instance.OnUpdate(&corev1.Pod{}, &metav1.ObjectMeta{}) + }) + + It("should not call Update EventHandler if an object is not 'that' object", func() { + instance.OnUpdate(&corev1.Secret{}, &corev1.Pod{}) + instance.OnUpdate(&corev1.Pod{}, &corev1.ConfigMap{}) + }) + + It("should create a DeleteEvent", func() { + funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(evt.Object).To(Equal(pod)) + } + instance.OnDelete(pod) + }) + + It("should used Predicates to filter DeleteEvents", func() { + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, + }) + instance.OnDelete(pod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, + }) + instance.OnDelete(pod) + Expect(set).To(BeTrue()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, + }) + instance.OnDelete(pod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, + }) + instance.OnDelete(pod) + Expect(set).To(BeFalse()) + + set = false + instance = internal.NewObjectEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, + predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, + }) + instance.OnDelete(pod) + Expect(set).To(BeTrue()) + }) + + It("should not call Delete EventHandler if the object is not a runtime.Object", func() { + instance.OnDelete(&metav1.ObjectMeta{}) + }) + + It("should not call Delete EventHandler if an object is not 'that' object", func() { + instance.OnDelete(&corev1.Secret{}) + }) + + It("should create a DeleteEvent from a tombstone", func() { + + tombstone := cache.DeletedFinalStateUnknown{ + Obj: pod, + } + funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(evt.Object).To(Equal(pod)) + Expect(evt.DeleteStateUnknown).Should(BeTrue()) + } + + instance.OnDelete(tombstone) + }) + }) +}) diff --git a/pkg/source/source.go b/pkg/source/source.go index 099c8d68fa..5c68753d08 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -59,7 +59,12 @@ type SyncingSource interface { // Kind creates a KindSource with the given cache provider. func Kind(cache cache.Cache, object client.Object) SyncingSource { - return &internal.Kind{Type: object, Cache: cache} + return &internal.Kind[client.Object]{Type: object, Cache: cache} +} + +// ObjectKind creates a typed KindSource with the given cache provider. +func ObjectKind[T client.Object](cache cache.Cache, object T) SyncingSource { + return &internal.Kind[T]{Type: object, Cache: cache} } var _ Source = &Channel{}