From 54f2afe7c9ff75c123f472469392c01203546d93 Mon Sep 17 00:00:00 2001 From: Danil Grigorev Date: Thu, 29 Feb 2024 08:39:11 +0100 Subject: [PATCH] Make event handler type aware Signed-off-by: Danil Grigorev --- pkg/builder/controller.go | 5 ++- pkg/internal/source/event_handler.go | 24 ++++++------ pkg/internal/source/internal_test.go | 58 ++++++++++++---------------- pkg/internal/source/kind.go | 17 ++++---- pkg/source/source.go | 6 +-- pkg/source/source_test.go | 3 +- 6 files changed, 54 insertions(+), 59 deletions(-) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 1a115f2f7b..fdc9b2c2d7 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -176,6 +176,9 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler // // STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead. // This method is only exposed for more advanced use cases, most users should use one of the higher level functions. +// +// Example: +// WatchesRawSource(source.Kind(cache, &corev1.Pod{}), eventHandler, opts...) // ensure that source propagates only valid Pod objects. func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { input := WatchesInput{src: src, eventHandler: eventHandler} for _, opt := range opts { @@ -313,7 +316,7 @@ func (blder *Builder) doWatch() error { } for _, w := range blder.watchesInput { // If the source of this watch is of type Kind, project it. - if srcKind, ok := w.src.(*internalsource.Kind); ok { + if srcKind, ok := w.src.(*internalsource.Kind[client.Object]); ok { typeForSrc, err := blder.project(srcKind.Type, w.objectProjection) if err != nil { return err diff --git a/pkg/internal/source/event_handler.go b/pkg/internal/source/event_handler.go index ae8404a1fa..5e309db391 100644 --- a/pkg/internal/source/event_handler.go +++ b/pkg/internal/source/event_handler.go @@ -33,8 +33,8 @@ import ( var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") // NewEventHandler creates a new EventHandler. -func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler { - return &EventHandler{ +func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler[T] { + return &EventHandler[T]{ ctx: ctx, handler: handler, queue: queue, @@ -43,7 +43,7 @@ func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, } // EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. -type EventHandler struct { +type EventHandler[T client.Object] struct { // ctx stores the context that created the event handler // that is used to propagate cancellation signals to each handler function. ctx context.Context @@ -55,7 +55,7 @@ type EventHandler struct { // HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs // TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 -func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs { +func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: e.OnAdd, UpdateFunc: e.OnUpdate, @@ -64,11 +64,11 @@ func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs { } // OnAdd creates CreateEvent and calls Create on EventHandler. -func (e *EventHandler) OnAdd(obj interface{}) { +func (e *EventHandler[T]) OnAdd(obj interface{}) { c := event.CreateEvent{} // Pull Object out of the object - if o, ok := obj.(client.Object); ok { + if o, ok := obj.(T); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", @@ -89,10 +89,10 @@ func (e *EventHandler) OnAdd(obj interface{}) { } // OnUpdate creates UpdateEvent and calls Update on EventHandler. -func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) { +func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) { u := event.UpdateEvent{} - if o, ok := oldObj.(client.Object); ok { + if o, ok := oldObj.(T); ok { u.ObjectOld = o } else { log.Error(nil, "OnUpdate missing ObjectOld", @@ -101,7 +101,7 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) { } // Pull Object out of the object - if o, ok := newObj.(client.Object); ok { + if o, ok := newObj.(T); ok { u.ObjectNew = o } else { log.Error(nil, "OnUpdate missing ObjectNew", @@ -122,7 +122,7 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) { } // OnDelete creates DeleteEvent and calls Delete on EventHandler. -func (e *EventHandler) OnDelete(obj interface{}) { +func (e *EventHandler[T]) OnDelete(obj interface{}) { d := event.DeleteEvent{} // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a @@ -131,7 +131,7 @@ func (e *EventHandler) OnDelete(obj interface{}) { // This should never happen if we aren't missing events, which we have concluded that we are not // and made decisions off of this belief. Maybe this shouldn't be here? var ok bool - if _, ok = obj.(client.Object); !ok { + if _, ok = obj.(T); !ok { // If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { @@ -149,7 +149,7 @@ func (e *EventHandler) OnDelete(obj interface{}) { } // Pull Object out of the object - if o, ok := obj.(client.Object); ok { + if o, ok := obj.(T); ok { d.Object = o } else { log.Error(nil, "OnDelete missing Object", diff --git a/pkg/internal/source/internal_test.go b/pkg/internal/source/internal_test.go index 0574f7180e..6c3d6c5ac5 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/internal/source/internal_test.go @@ -37,7 +37,7 @@ import ( var _ = Describe("Internal", func() { var ctx = context.Background() - var instance *internal.EventHandler + var instance *internal.EventHandler[*corev1.Pod] var funcs, setfuncs *handler.Funcs var set bool BeforeEach(func() { @@ -74,7 +74,7 @@ var _ = Describe("Internal", func() { set = true }, } - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs, nil) + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, funcs, nil) }) Describe("EventHandler", func() { @@ -99,7 +99,7 @@ var _ = Describe("Internal", func() { }) It("should used Predicates to filter CreateEvents", func() { - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, }) set = false @@ -107,14 +107,14 @@ var _ = Describe("Internal", func() { Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, }) instance.OnAdd(pod) Expect(set).To(BeTrue()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, }) @@ -122,7 +122,7 @@ var _ = Describe("Internal", func() { Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, }) @@ -130,7 +130,7 @@ var _ = Describe("Internal", func() { Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, }) @@ -142,8 +142,8 @@ var _ = Describe("Internal", func() { instance.OnAdd(&metav1.ObjectMeta{}) }) - It("should not call Create EventHandler if the object does not have metadata", func() { - instance.OnAdd(FooRuntimeObject{}) + It("should not call Create EventHandler if an object is not 'that' object", func() { + instance.OnAdd(&corev1.Secret{}) }) It("should create an UpdateEvent", func() { @@ -157,21 +157,21 @@ var _ = Describe("Internal", func() { It("should used Predicates to filter UpdateEvents", func() { set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }}, }) instance.OnUpdate(pod, newPod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, }) instance.OnUpdate(pod, newPod) Expect(set).To(BeTrue()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, }) @@ -179,7 +179,7 @@ var _ = Describe("Internal", func() { Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, }) @@ -187,7 +187,7 @@ var _ = Describe("Internal", func() { Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, }) @@ -200,9 +200,9 @@ var _ = Describe("Internal", func() { instance.OnUpdate(&corev1.Pod{}, &metav1.ObjectMeta{}) }) - It("should not call Update EventHandler if the object does not have metadata", func() { - instance.OnUpdate(FooRuntimeObject{}, &corev1.Pod{}) - instance.OnUpdate(&corev1.Pod{}, FooRuntimeObject{}) + It("should not call Update EventHandler if an object is not 'that' object", func() { + instance.OnUpdate(&corev1.Secret{}, &corev1.Pod{}) + instance.OnUpdate(&corev1.Pod{}, &corev1.ConfigMap{}) }) It("should create a DeleteEvent", func() { @@ -215,21 +215,21 @@ var _ = Describe("Internal", func() { It("should used Predicates to filter DeleteEvents", func() { set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, }) instance.OnDelete(pod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, }) instance.OnDelete(pod) Expect(set).To(BeTrue()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, }) @@ -237,7 +237,7 @@ var _ = Describe("Internal", func() { Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, }) @@ -245,7 +245,7 @@ var _ = Describe("Internal", func() { Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, }) @@ -257,8 +257,8 @@ var _ = Describe("Internal", func() { instance.OnDelete(&metav1.ObjectMeta{}) }) - It("should not call Delete EventHandler if the object does not have metadata", func() { - instance.OnDelete(FooRuntimeObject{}) + It("should not call Delete EventHandler if an object is not 'that' object", func() { + instance.OnDelete(&corev1.Secret{}) }) It("should create a DeleteEvent from a tombstone", func() { @@ -274,16 +274,6 @@ var _ = Describe("Internal", func() { instance.OnDelete(tombstone) }) - - It("should ignore tombstone objects without meta", func() { - tombstone := cache.DeletedFinalStateUnknown{Obj: Foo{}} - instance.OnDelete(tombstone) - }) - It("should ignore objects without meta", func() { - instance.OnAdd(Foo{}) - instance.OnUpdate(Foo{}, Foo{}) - instance.OnDelete(Foo{}) - }) }) }) diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index b3a8227125..738749bc5a 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -17,9 +18,9 @@ import ( ) // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Kind struct { +type Kind[T client.Object] struct { // Type is the type of object to watch. e.g. &v1.Pod{} - Type client.Object + Type T // Cache used to watch APIs Cache cache.Cache @@ -32,9 +33,9 @@ type Kind struct { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, +func (ks *Kind[T]) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { - if ks.Type == nil { + if reflect.DeepEqual(ks.Type, *new(T)) { return fmt.Errorf("must create Kind with a non-nil object") } if ks.Cache == nil { @@ -79,7 +80,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return } - _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) + _, err := i.AddEventHandler(NewEventHandler[T](ctx, queue, handler, prct).HandlerFuncs()) if err != nil { ks.started <- err return @@ -94,8 +95,8 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return nil } -func (ks *Kind) String() string { - if ks.Type != nil { +func (ks *Kind[T]) String() string { + if !reflect.DeepEqual(ks.Type, *new(T)) { return fmt.Sprintf("kind source: %T", ks.Type) } return "kind source: unknown type" @@ -103,7 +104,7 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. -func (ks *Kind) WaitForSync(ctx context.Context) error { +func (ks *Kind[T]) WaitForSync(ctx context.Context) error { select { case err := <-ks.started: return err diff --git a/pkg/source/source.go b/pkg/source/source.go index 099c8d68fa..e98d33ede0 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -58,8 +58,8 @@ type SyncingSource interface { } // Kind creates a KindSource with the given cache provider. -func Kind(cache cache.Cache, object client.Object) SyncingSource { - return &internal.Kind{Type: object, Cache: cache} +func Kind[T client.Object](cache cache.Cache, object T) SyncingSource { + return &internal.Kind[T]{Type: object, Cache: cache} } var _ Source = &Channel{} @@ -198,7 +198,7 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que return fmt.Errorf("must specify Informer.Informer") } - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) + _, err := is.Informer.AddEventHandler(internal.NewEventHandler[client.Object](ctx, queue, handler, prct).HandlerFuncs()) if err != nil { return err } diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 16c365e8a2..084f8a2535 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -25,6 +25,7 @@ import ( . "github.com/onsi/gomega" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -187,7 +188,7 @@ var _ = Describe("Source", func() { }) It("should return an error from Start if a type was not provided", func() { - instance := source.Kind(ic, nil) + instance := source.Kind[client.Object](ic, nil) err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object"))