From 251979ffb47c7d9537256654bc83da5abe334369 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sun, 14 Apr 2024 18:29:03 +0200 Subject: [PATCH 1/2] Event, source, handler, predicates: Use generics This change add generic versions of event, handler and predicates along the existing ones, prefixed with `Typed`. The existing ones are left in place under the same signature. The `source` constructors are changed to be generic. --- examples/builtins/main.go | 4 +- pkg/builder/controller.go | 53 ++--- pkg/builder/controller_test.go | 2 +- pkg/controller/controller_integration_test.go | 4 +- pkg/controller/controller_test.go | 2 +- pkg/controller/example_test.go | 6 +- pkg/event/event.go | 51 +++-- pkg/handler/enqueue.go | 41 +++- pkg/handler/enqueue_mapped.go | 38 +++- pkg/handler/enqueue_owner.go | 46 ++-- pkg/handler/eventhandler.go | 56 +++-- pkg/handler/example_test.go | 44 ++-- pkg/internal/controller/controller.go | 15 +- pkg/internal/controller/controller_test.go | 47 ++-- .../recorder/recorder_integration_test.go | 2 +- pkg/internal/source/event_handler.go | 32 +-- pkg/internal/source/internal_test.go | 3 +- pkg/internal/source/kind.go | 55 +++-- pkg/predicate/predicate.go | 201 +++++++++++++----- pkg/source/example_test.go | 10 +- pkg/source/source.go | 62 +++--- pkg/source/source_test.go | 83 ++++---- 22 files changed, 527 insertions(+), 330 deletions(-) diff --git a/examples/builtins/main.go b/examples/builtins/main.go index cc14c4e9f9..5a6e313f7b 100644 --- a/examples/builtins/main.go +++ b/examples/builtins/main.go @@ -59,14 +59,14 @@ func main() { } // Watch ReplicaSets and enqueue ReplicaSet object key - if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.TypedEnqueueRequestForObject[*appsv1.ReplicaSet]{})); err != nil { entryLog.Error(err, "unable to watch ReplicaSets") os.Exit(1) } // Watch Pods and enqueue owning ReplicaSet key if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, - handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil { + handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil { entryLog.Error(err, "unable to watch Pods") os.Exit(1) } diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index f99a22d0db..2c0063a837 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -30,7 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" - internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -56,6 +55,7 @@ const ( type Builder struct { forInput ForInput ownsInput []OwnsInput + rawSources []source.Source watchesInput []WatchesInput mgr manager.Manager globalPredicates []predicate.Predicate @@ -123,7 +123,8 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { // WatchesInput represents the information set by Watches method. type WatchesInput struct { - src source.Source + obj client.Object + handler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection } @@ -132,15 +133,19 @@ type WatchesInput struct { // update events by *reconciling the object* with the given EventHandler. // // This is the equivalent of calling -// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...). +// WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)). func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { - input := WatchesInput{} + input := WatchesInput{ + obj: object, + handler: eventHandler, + } for _, opt := range opts { opt.ApplyToWatches(&input) } - src := source.Kind(blder.mgr.GetCache(), object, eventHandler, input.predicates...) - return blder.WatchesRawSource(src, opts...) + blder.watchesInput = append(blder.watchesInput, input) + + return blder } // WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata. @@ -180,13 +185,11 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler // // STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead. // This method is only exposed for more advanced use cases, most users should use one of the higher level functions. -func (blder *Builder) WatchesRawSource(src source.Source, opts ...WatchesOption) *Builder { - input := WatchesInput{src: src} - for _, opt := range opts { - opt.ApplyToWatches(&input) - } +// +// WatchesRawSource does not respect predicates configured through WithEventFilter. +func (blder *Builder) WatchesRawSource(src source.Source) *Builder { + blder.rawSources = append(blder.rawSources, src) - blder.watchesInput = append(blder.watchesInput, input) return blder } @@ -312,22 +315,22 @@ func (blder *Builder) doWatch() error { } // Do the watch requests - if len(blder.watchesInput) == 0 && blder.forInput.object == nil { - return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up") + if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 { + return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up") } - allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) for _, w := range blder.watchesInput { - // If the source of this watch is of type Kind, project it. - if srcKind, ok := w.src.(*internalsource.Kind); ok { - allPredicates := append(allPredicates, w.predicates...) - typeForSrc, err := blder.project(srcKind.Type, w.objectProjection) - if err != nil { - return err - } - srcKind.Type = typeForSrc - srcKind.Predicates = append(srcKind.Predicates, allPredicates...) + projected, err := blder.project(w.obj, w.objectProjection) + if err != nil { + return fmt.Errorf("failed to project for %T: %w", w.obj, err) + } + allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) + allPredicates = append(allPredicates, w.predicates...) + if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil { + return err } - if err := blder.ctrl.Watch(w.src); err != nil { + } + for _, src := range blder.rawSources { + if err := blder.ctrl.Watch(src); err != nil { return err } } diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index dd6ac679fd..4ff576edad 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -146,7 +146,7 @@ var _ = Describe("application", func() { instance, err := ControllerManagedBy(m). Named("my_controller"). Build(noop) - Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up"))) + Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up"))) Expect(instance).To(BeNil()) }) diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 2e38f77a7d..50900de61b 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -66,12 +66,12 @@ var _ = Describe("controller", func() { By("Watching Resources") err = instance.Watch( source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}, - handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}), + handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}), ), ) Expect(err).NotTo(HaveOccurred()) - err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{})) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{})) Expect(err).NotTo(HaveOccurred()) err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 415f03a5b4..0454cb4b90 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() { ctx, cancel := context.WithCancel(context.Background()) watchChan := make(chan event.GenericEvent, 1) - watch := &source.Channel{Source: watchChan, Handler: &handler.EnqueueRequestForObject{}} + watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{}) watchChan <- event.GenericEvent{Object: &corev1.Pod{}} reconcileStarted := make(chan struct{}) diff --git a/pkg/controller/example_test.go b/pkg/controller/example_test.go index 85e9b9737b..aea5943450 100644 --- a/pkg/controller/example_test.go +++ b/pkg/controller/example_test.go @@ -71,7 +71,7 @@ func ExampleController() { } // Watch for Pod create / update / delete events and call Reconcile - err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})) + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -108,7 +108,7 @@ func ExampleController_unstructured() { Version: "v1", }) // Watch for Pod create / update / delete events and call Reconcile - err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{})) + err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{})) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() { os.Exit(1) } - if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})); err != nil { log.Error(err, "unable to watch pods") os.Exit(1) } diff --git a/pkg/event/event.go b/pkg/event/event.go index 271b3c00fb..e99c210072 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -18,38 +18,55 @@ package event import "sigs.k8s.io/controller-runtime/pkg/client" -// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated +// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated +// by a source.Source and transformed into a reconcile.Request by a handler.EventHandler. +type CreateEvent = TypedCreateEvent[client.Object] + +// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated +// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler. +type UpdateEvent = TypedUpdateEvent[client.Object] + +// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated // by a source.Source and transformed into a reconcile.Request by an handler.EventHandler. -type CreateEvent struct { +type DeleteEvent = TypedDeleteEvent[client.Object] + +// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster). +// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an +// handler.EventHandler. +type GenericEvent = TypedGenericEvent[client.Object] + +// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated +// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler. +type TypedCreateEvent[T any] struct { // Object is the object from the event - Object client.Object + Object T } -// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated -// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler. -type UpdateEvent struct { +// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated +// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler. +type TypedUpdateEvent[T any] struct { // ObjectOld is the object from the event - ObjectOld client.Object + ObjectOld T // ObjectNew is the object from the event - ObjectNew client.Object + ObjectNew T } -// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated -// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler. -type DeleteEvent struct { +// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated +// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler. +type TypedDeleteEvent[T any] struct { // Object is the object from the event - Object client.Object + Object T // DeleteStateUnknown is true if the Delete event was missed but we identified the object // as having been deleted. DeleteStateUnknown bool } -// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster). -// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an -// handler.EventHandler. -type GenericEvent struct { +// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster). +// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an +// handler.TypedEventHandler. +type TypedGenericEvent[T any] struct { // Object is the object from the event - Object client.Object + Object T } diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index c72b2e1ebb..cbdb6e4d3d 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -18,9 +18,11 @@ package handler import ( "context" + "reflect" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -33,13 +35,18 @@ type empty struct{} var _ EventHandler = &EnqueueRequestForObject{} // EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event. -// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all +// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. -type EnqueueRequestForObject struct{} +type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object] + +// TypedEnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event. +// (e.g. the created / deleted / updated objects Name and Namespace). handler.TypedEnqueueRequestForObject is used by almost all +// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. +type TypedEnqueueRequestForObject[T client.Object] struct{} // Create implements EventHandler. -func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { +func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { + if isNil(evt.Object) { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } @@ -50,14 +57,14 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv } // Update implements EventHandler. -func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { switch { - case evt.ObjectNew != nil: + case !isNil(evt.ObjectNew): q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectNew.GetName(), Namespace: evt.ObjectNew.GetNamespace(), }}) - case evt.ObjectOld != nil: + case !isNil(evt.ObjectOld): q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectOld.GetName(), Namespace: evt.ObjectOld.GetNamespace(), @@ -68,8 +75,8 @@ func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEv } // Delete implements EventHandler. -func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { +func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { + if isNil(evt.Object) { enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return } @@ -80,8 +87,8 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv } // Generic implements EventHandler. -func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { +func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { + if isNil(evt.Object) { enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return } @@ -90,3 +97,15 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic Namespace: evt.Object.GetNamespace(), }}) } + +func isNil(arg any) bool { + if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr || + v.Kind() == reflect.Interface || + v.Kind() == reflect.Slice || + v.Kind() == reflect.Map || + v.Kind() == reflect.Chan || + v.Kind() == reflect.Func) && v.IsNil()) { + return true + } + return false +} diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index b55fdde6ba..6589ad4869 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -27,7 +27,11 @@ import ( // MapFunc is the signature required for enqueueing requests from a generic function. // This type is usually used with EnqueueRequestsFromMapFunc when registering an event handler. -type MapFunc func(context.Context, client.Object) []reconcile.Request +type MapFunc = TypedMapFunc[client.Object] + +// TypedMapFunc is the signature required for enqueueing requests from a generic function. +// This type is usually used with EnqueueRequestsFromTypedMapFunc when registering an event handler. +type TypedMapFunc[T any] func(context.Context, T) []reconcile.Request // EnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection // of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects @@ -40,44 +44,58 @@ type MapFunc func(context.Context, client.Object) []reconcile.Request // For UpdateEvents which contain both a new and old object, the transformation function is run on both // objects and both sets of Requests are enqueue. func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler { - return &enqueueRequestsFromMapFunc{ + return TypedEnqueueRequestsFromMapFunc(fn) +} + +// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection +// of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects +// defined by some user specified transformation of the source Event. (e.g. trigger Reconciler for a set of objects +// in response to a cluster resize event caused by adding or deleting a Node) +// +// TypedEnqueueRequestsFromMapFunc is frequently used to fan-out updates from one object to one or more other +// objects of a differing type. +// +// For TypedUpdateEvents which contain both a new and old object, the transformation function is run on both +// objects and both sets of Requests are enqueue. +func TypedEnqueueRequestsFromMapFunc[T any](fn TypedMapFunc[T]) TypedEventHandler[T] { + return &enqueueRequestsFromMapFunc[T]{ toRequests: fn, } } -var _ EventHandler = &enqueueRequestsFromMapFunc{} +var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object]{} -type enqueueRequestsFromMapFunc struct { +type enqueueRequestsFromMapFunc[T any] struct { // Mapper transforms the argument into a slice of keys to be reconciled - toRequests MapFunc + toRequests TypedMapFunc[T] } // Create implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.mapAndEnqueue(ctx, q, evt.Object, reqs) } // Update implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs) e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs) } // Delete implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.mapAndEnqueue(ctx, q, evt.Object, reqs) } // Generic implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.mapAndEnqueue(ctx, q, evt.Object, reqs) } -func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]empty) { +func (e *enqueueRequestsFromMapFunc[T]) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object T, reqs map[reconcile.Request]empty) { for _, req := range e.toRequests(ctx, object) { _, ok := reqs[req] if !ok { diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 02e7d756f8..49c05a8591 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -32,12 +32,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -var _ EventHandler = &enqueueRequestForOwner{} +var _ EventHandler = &enqueueRequestForOwner[client.Object]{} var log = logf.RuntimeLog.WithName("eventhandler").WithName("enqueueRequestForOwner") // OwnerOption modifies an EnqueueRequestForOwner EventHandler. -type OwnerOption func(e *enqueueRequestForOwner) +type OwnerOption func(e enqueueRequestForOwnerInterface) // EnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created // the object that was the source of the Event. @@ -48,7 +48,19 @@ type OwnerOption func(e *enqueueRequestForOwner) // // - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler { - e := &enqueueRequestForOwner{ + return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...) +} + +// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created +// the object that was the source of the Event. +// +// If a ReplicaSet creates Pods, users may reconcile the ReplicaSet in response to Pod Events using: +// +// - a source.Kind Source with Type of Pod. +// +// - a handler.typedEnqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. +func TypedEnqueueRequestForOwner[T client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[T] { + e := &enqueueRequestForOwner[T]{ ownerType: ownerType, mapper: mapper, } @@ -63,12 +75,16 @@ func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, owne // OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true. func OnlyControllerOwner() OwnerOption { - return func(e *enqueueRequestForOwner) { - e.isController = true + return func(e enqueueRequestForOwnerInterface) { + e.setIsController(true) } } -type enqueueRequestForOwner struct { +type enqueueRequestForOwnerInterface interface { + setIsController(bool) +} + +type enqueueRequestForOwner[T client.Object] struct { // ownerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. ownerType runtime.Object @@ -82,8 +98,12 @@ type enqueueRequestForOwner struct { mapper meta.RESTMapper } +func (e *enqueueRequestForOwner[T]) setIsController(isController bool) { + e.isController = isController +} + // Create implements EventHandler. -func (e *enqueueRequestForOwner) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -92,7 +112,7 @@ func (e *enqueueRequestForOwner) Create(ctx context.Context, evt event.CreateEve } // Update implements EventHandler. -func (e *enqueueRequestForOwner) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.ObjectOld, reqs) e.getOwnerReconcileRequest(evt.ObjectNew, reqs) @@ -102,7 +122,7 @@ func (e *enqueueRequestForOwner) Update(ctx context.Context, evt event.UpdateEve } // Delete implements EventHandler. -func (e *enqueueRequestForOwner) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -111,7 +131,7 @@ func (e *enqueueRequestForOwner) Delete(ctx context.Context, evt event.DeleteEve } // Generic implements EventHandler. -func (e *enqueueRequestForOwner) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -121,7 +141,7 @@ func (e *enqueueRequestForOwner) Generic(ctx context.Context, evt event.GenericE // parseOwnerTypeGroupKind parses the OwnerType into a Group and Kind and caches the result. Returns false // if the OwnerType could not be parsed using the scheme. -func (e *enqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error { +func (e *enqueueRequestForOwner[T]) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error { // Get the kinds of the type kinds, _, err := scheme.ObjectKinds(e.ownerType) if err != nil { @@ -141,7 +161,7 @@ func (e *enqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) // getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile // owners of object that match e.OwnerType. -func (e *enqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { +func (e *enqueueRequestForOwner[T]) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { // Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested // by the user for _, ref := range e.getOwnersReferences(object) { @@ -181,7 +201,7 @@ func (e *enqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, // getOwnersReferences returns the OwnerReferences for an object as specified by the enqueueRequestForOwner // - if IsController is true: only take the Controller OwnerReference (if found) // - if IsController is false: take all OwnerReferences. -func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []metav1.OwnerReference { +func (e *enqueueRequestForOwner[T]) getOwnersReferences(object metav1.Object) []metav1.OwnerReference { if object == nil { return nil } diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index ff2f3e80b2..5ff49dcd4b 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -20,12 +20,13 @@ import ( "context" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" ) // EventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). EventHandlers map an Event // for one object to trigger Reconciles for either the same object or different objects - e.g. if there is an -// Event for object with type Foo (using source.KindSource) then reconcile one or more object(s) with type Bar. +// Event for object with type Foo (using source.Kind) then reconcile one or more object(s) with type Bar. // // Identical reconcile.Requests will be batched together through the queuing mechanism before reconcile is called. // @@ -41,65 +42,88 @@ import ( // // Unless you are implementing your own EventHandler, you can ignore the functions on the EventHandler interface. // Most users shouldn't need to implement their own EventHandler. -type EventHandler interface { +type EventHandler TypedEventHandler[client.Object] + +// TypedEventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). TypedEventHandlers map an Event +// for one object to trigger Reconciles for either the same object or different objects - e.g. if there is an +// Event for object with type Foo (using source.Kind) then reconcile one or more object(s) with type Bar. +// +// Identical reconcile.Requests will be batched together through the queuing mechanism before reconcile is called. +// +// * Use TypedEnqueueRequestForObject to reconcile the object the event is for +// - do this for events for the type the Controller Reconciles. (e.g. Deployment for a Deployment Controller) +// +// * Use TypedEnqueueRequestForOwner to reconcile the owner of the object the event is for +// - do this for events for the types the Controller creates. (e.g. ReplicaSets created by a Deployment Controller) +// +// * Use TypedEnqueueRequestsFromMapFunc to transform an event for an object to a reconcile of an object +// of a different type - do this for events for types the Controller may be interested in, but doesn't create. +// (e.g. If Foo responds to cluster size events, map Node events to Foo objects.) +// +// Unless you are implementing your own TypedEventHandler, you can ignore the functions on the TypedEventHandler interface. +// Most users shouldn't need to implement their own TypedEventHandler. +type TypedEventHandler[T any] interface { // Create is called in response to a create event - e.g. Pod Creation. - Create(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) + Create(context.Context, event.TypedCreateEvent[T], workqueue.RateLimitingInterface) // Update is called in response to an update event - e.g. Pod Updated. - Update(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) + Update(context.Context, event.TypedUpdateEvent[T], workqueue.RateLimitingInterface) // Delete is called in response to a delete event - e.g. Pod Deleted. - Delete(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) + Delete(context.Context, event.TypedDeleteEvent[T], workqueue.RateLimitingInterface) // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // external trigger request - e.g. reconcile Autoscaling, or a Webhook. - Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) + Generic(context.Context, event.TypedGenericEvent[T], workqueue.RateLimitingInterface) } var _ EventHandler = Funcs{} -// Funcs implements EventHandler. -type Funcs struct { +// Funcs implements eventhandler. +type Funcs = TypedFuncs[client.Object] + +// TypedFuncs implements eventhandler. +type TypedFuncs[T any] struct { // Create is called in response to an add event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - CreateFunc func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) + CreateFunc func(context.Context, event.TypedCreateEvent[T], workqueue.RateLimitingInterface) // Update is called in response to an update event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - UpdateFunc func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) + UpdateFunc func(context.Context, event.TypedUpdateEvent[T], workqueue.RateLimitingInterface) // Delete is called in response to a delete event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - DeleteFunc func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) + DeleteFunc func(context.Context, event.TypedDeleteEvent[T], workqueue.RateLimitingInterface) // GenericFunc is called in response to a generic event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - GenericFunc func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) + GenericFunc func(context.Context, event.TypedGenericEvent[T], workqueue.RateLimitingInterface) } // Create implements EventHandler. -func (h Funcs) Create(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { +func (h TypedFuncs[T]) Create(ctx context.Context, e event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { if h.CreateFunc != nil { h.CreateFunc(ctx, e, q) } } // Delete implements EventHandler. -func (h Funcs) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (h TypedFuncs[T]) Delete(ctx context.Context, e event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { if h.DeleteFunc != nil { h.DeleteFunc(ctx, e, q) } } // Update implements EventHandler. -func (h Funcs) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (h TypedFuncs[T]) Update(ctx context.Context, e event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { if h.UpdateFunc != nil { h.UpdateFunc(ctx, e, q) } } // Generic implements EventHandler. -func (h Funcs) Generic(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { +func (h TypedFuncs[T]) Generic(ctx context.Context, e event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { if h.GenericFunc != nil { h.GenericFunc(ctx, e, q) } diff --git a/pkg/handler/example_test.go b/pkg/handler/example_test.go index 3c4dbd9f50..2971b60cca 100644 --- a/pkg/handler/example_test.go +++ b/pkg/handler/example_test.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -42,7 +41,7 @@ var ( func ExampleEnqueueRequestForObject() { // controller is a controller.controller err := c.Watch( - source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}), + source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}), ) if err != nil { // handle it @@ -54,8 +53,9 @@ func ExampleEnqueueRequestForObject() { func ExampleEnqueueRequestForOwner() { // controller is a controller.controller err := c.Watch( - source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, - handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), + source.Kind(mgr.GetCache(), + &appsv1.ReplicaSet{}, + handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), ), ) if err != nil { @@ -69,15 +69,15 @@ func ExampleEnqueueRequestsFromMapFunc() { // controller is a controller.controller err := c.Watch( source.Kind(mgr.GetCache(), &appsv1.Deployment{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { + handler.TypedEnqueueRequestsFromMapFunc[*appsv1.Deployment](func(ctx context.Context, a *appsv1.Deployment) []reconcile.Request { return []reconcile.Request{ {NamespacedName: types.NamespacedName{ - Name: a.GetName() + "-1", - Namespace: a.GetNamespace(), + Name: a.Name + "-1", + Namespace: a.Namespace, }}, {NamespacedName: types.NamespacedName{ - Name: a.GetName() + "-2", - Namespace: a.GetNamespace(), + Name: a.Name + "-2", + Namespace: a.Namespace, }}, } }), @@ -93,29 +93,29 @@ func ExampleFuncs() { // controller is a controller.controller err := c.Watch( source.Kind(mgr.GetCache(), &corev1.Pod{}, - handler.Funcs{ - CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + handler.TypedFuncs[*corev1.Pod]{ + CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.Object.GetName(), - Namespace: e.Object.GetNamespace(), + Name: e.Object.Name, + Namespace: e.Object.Namespace, }}) }, - UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.ObjectNew.GetName(), - Namespace: e.ObjectNew.GetNamespace(), + Name: e.ObjectNew.Name, + Namespace: e.ObjectNew.Namespace, }}) }, - DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.Object.GetName(), - Namespace: e.Object.GetNamespace(), + Name: e.Object.Name, + Namespace: e.Object.Namespace, }}) }, - GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, e event.TypedGenericEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.Object.GetName(), - Namespace: e.Object.GetNamespace(), + Name: e.Object.Name, + Namespace: e.Object.Namespace, }}) }, }, diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index f888494a84..9c709404b5 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -79,7 +79,7 @@ type Controller struct { CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. - startWatches []watchDescription + startWatches []source.Source // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. @@ -94,11 +94,6 @@ type Controller struct { LeaderElected *bool } -// watchDescription contains all the information necessary to start a watch. -type watchDescription struct { - src source.Source -} - // Reconcile implements reconcile.Reconciler. func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { defer func() { @@ -128,7 +123,7 @@ func (c *Controller) Watch(src source.Source) error { // // These watches are going to be held on the controller struct until the manager or user calls Start(...). if !c.Started { - c.startWatches = append(c.startWatches, watchDescription{src: src}) + c.startWatches = append(c.startWatches, src) return nil } @@ -175,9 +170,9 @@ func (c *Controller) Start(ctx context.Context) error { // caches to sync so that they have a chance to register their intendeded // caches. for _, watch := range c.startWatches { - c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src)) + c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch)) - if err := watch.src.Start(ctx, c.Queue); err != nil { + if err := watch.Start(ctx, c.Queue); err != nil { return err } } @@ -186,7 +181,7 @@ func (c *Controller) Start(ctx context.Context) error { c.LogConstructor(nil).Info("Starting Controller") for _, watch := range c.startWatches { - syncingSource, ok := watch.src.(source.SyncingSource) + syncingSource, ok := watch.(source.SyncingSource) if !ok { continue } diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index a10c9dcd3b..2e1842d907 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -126,9 +126,9 @@ var _ = Describe("controller", func() { Describe("Start", func() { It("should return an error if there is an error waiting for the informers", func() { f := false - ctrl.startWatches = []watchDescription{{ - src: source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, nil), - }} + ctrl.startWatches = []source.Source{ + source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}), + } ctrl.Name = "foo" ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -144,9 +144,9 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) c = &cacheWithIndefinitelyBlockingGetInformer{c} - ctrl.startWatches = []watchDescription{{ - src: source.Kind(c, &appsv1.Deployment{}, nil), - }} + ctrl.startWatches = []source.Source{ + source.Kind(c, &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}), + } ctrl.Name = "testcontroller" err = ctrl.Start(context.TODO()) @@ -161,12 +161,12 @@ var _ = Describe("controller", func() { c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) c = &cacheWithIndefinitelyBlockingGetInformer{c} - ctrl.startWatches = []watchDescription{{ - src: &singnallingSourceWrapper{ - SyncingSource: source.Kind(c, &appsv1.Deployment{}, nil), + ctrl.startWatches = []source.Source{ + &singnallingSourceWrapper{ + SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}), cacheSyncDone: sourceSynced, }, - }} + } ctrl.Name = "testcontroller" ctx, cancel := context.WithCancel(context.TODO()) @@ -189,12 +189,12 @@ var _ = Describe("controller", func() { sourceSynced := make(chan struct{}) c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) - ctrl.startWatches = []watchDescription{{ - src: &singnallingSourceWrapper{ - SyncingSource: source.Kind(c, &appsv1.Deployment{}, nil), + ctrl.startWatches = []source.Source{ + &singnallingSourceWrapper{ + SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}), cacheSyncDone: sourceSynced, }, - }} + } go func() { defer GinkgoRecover() @@ -226,23 +226,20 @@ var _ = Describe("controller", func() { Object: p, } - ins := &source.Channel{ - Source: ch, - Handler: handler.Funcs{ + ins := source.Channel( + ch, + handler.Funcs{ GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { defer GinkgoRecover() close(processed) }, }, - } - ins.DestBufferSize = 1 + ) // send the event to the channel ch <- evt - ctrl.startWatches = []watchDescription{{ - src: ins, - }} + ctrl.startWatches = []source.Source{ins} go func() { defer GinkgoRecover() @@ -255,10 +252,8 @@ var _ = Describe("controller", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ins := &source.Channel{} - ctrl.startWatches = []watchDescription{{ - src: ins, - }} + ins := source.Channel[string](nil, nil) + ctrl.startWatches = []source.Source{ins} e := ctrl.Start(ctx) Expect(e).To(HaveOccurred()) diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index f9a20737fc..48097872c5 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -56,7 +56,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Watching Resources") - err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{})) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{})) Expect(err).NotTo(HaveOccurred()) By("Starting the Manager") diff --git a/pkg/internal/source/event_handler.go b/pkg/internal/source/event_handler.go index ae8404a1fa..8651ea453e 100644 --- a/pkg/internal/source/event_handler.go +++ b/pkg/internal/source/event_handler.go @@ -33,8 +33,8 @@ import ( var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") // NewEventHandler creates a new EventHandler. -func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler { - return &EventHandler{ +func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.TypedEventHandler[T], predicates []predicate.TypedPredicate[T]) *EventHandler[T] { + return &EventHandler[T]{ ctx: ctx, handler: handler, queue: queue, @@ -43,19 +43,19 @@ func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, } // EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. -type EventHandler struct { +type EventHandler[T client.Object] struct { // ctx stores the context that created the event handler // that is used to propagate cancellation signals to each handler function. ctx context.Context - handler handler.EventHandler + handler handler.TypedEventHandler[T] queue workqueue.RateLimitingInterface - predicates []predicate.Predicate + predicates []predicate.TypedPredicate[T] } // HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs // TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 -func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs { +func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: e.OnAdd, UpdateFunc: e.OnUpdate, @@ -64,11 +64,11 @@ func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs { } // OnAdd creates CreateEvent and calls Create on EventHandler. -func (e *EventHandler) OnAdd(obj interface{}) { - c := event.CreateEvent{} +func (e *EventHandler[T]) OnAdd(obj interface{}) { + c := event.TypedCreateEvent[T]{} // Pull Object out of the object - if o, ok := obj.(client.Object); ok { + if o, ok := obj.(T); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", @@ -89,10 +89,10 @@ func (e *EventHandler) OnAdd(obj interface{}) { } // OnUpdate creates UpdateEvent and calls Update on EventHandler. -func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) { - u := event.UpdateEvent{} +func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) { + u := event.TypedUpdateEvent[T]{} - if o, ok := oldObj.(client.Object); ok { + if o, ok := oldObj.(T); ok { u.ObjectOld = o } else { log.Error(nil, "OnUpdate missing ObjectOld", @@ -101,7 +101,7 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) { } // Pull Object out of the object - if o, ok := newObj.(client.Object); ok { + if o, ok := newObj.(T); ok { u.ObjectNew = o } else { log.Error(nil, "OnUpdate missing ObjectNew", @@ -122,8 +122,8 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) { } // OnDelete creates DeleteEvent and calls Delete on EventHandler. -func (e *EventHandler) OnDelete(obj interface{}) { - d := event.DeleteEvent{} +func (e *EventHandler[T]) OnDelete(obj interface{}) { + d := event.TypedDeleteEvent[T]{} // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a // DeleteFinalStateUnknown struct, so the object needs to be pulled out. @@ -149,7 +149,7 @@ func (e *EventHandler) OnDelete(obj interface{}) { } // Pull Object out of the object - if o, ok := obj.(client.Object); ok { + if o, ok := obj.(T); ok { d.Object = o } else { log.Error(nil, "OnDelete missing Object", diff --git a/pkg/internal/source/internal_test.go b/pkg/internal/source/internal_test.go index 0574f7180e..f71be58424 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/internal/source/internal_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/gomega" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" @@ -37,7 +38,7 @@ import ( var _ = Describe("Internal", func() { var ctx = context.Background() - var instance *internal.EventHandler + var instance *internal.EventHandler[client.Object] var funcs, setfuncs *handler.Funcs var set bool BeforeEach(func() { diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index c56f423cf1..03431d1d24 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -4,12 +4,14 @@ import ( "context" "errors" "fmt" + "reflect" "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -17,37 +19,40 @@ import ( ) // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Kind struct { +type Kind[T client.Object] struct { // Type is the type of object to watch. e.g. &v1.Pod{} - Type client.Object + Type T // Cache used to watch APIs Cache cache.Cache - Handler handler.EventHandler + Handler handler.TypedEventHandler[T] - Predicates []predicate.Predicate + Predicates []predicate.TypedPredicate[T] - // started may contain an error if one was encountered during startup. If its closed and does not + // startedErr may contain an error if one was encountered during startup. If its closed and does not // contain an error, startup and syncing finished. - started chan error + startedErr chan error startCancel func() } // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { - if ks.Type == nil { +func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { + if isNil(ks.Type) { return fmt.Errorf("must create Kind with a non-nil object") } - if ks.Cache == nil { + if isNil(ks.Cache) { return fmt.Errorf("must create Kind with a non-nil cache") } + if isNil(ks.Handler) { + return errors.New("must create Kind with non-nil handler") + } // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) - ks.started = make(chan error) + ks.startedErr = make(chan error) go func() { var ( i cache.Informer @@ -75,30 +80,30 @@ func (ks *Kind) Start(ctx context.Context, queue workqueue.RateLimitingInterface return true, nil }); err != nil { if lastErr != nil { - ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr) + ks.startedErr <- fmt.Errorf("failed to get informer from cache: %w", lastErr) return } - ks.started <- err + ks.startedErr <- err return } _, err := i.AddEventHandler(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs()) if err != nil { - ks.started <- err + ks.startedErr <- err return } if !ks.Cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here - ks.started <- errors.New("cache did not sync") + ks.startedErr <- errors.New("cache did not sync") } - close(ks.started) + close(ks.startedErr) }() return nil } -func (ks *Kind) String() string { - if ks.Type != nil { +func (ks *Kind[T]) String() string { + if isNil(ks.Type) { return fmt.Sprintf("kind source: %T", ks.Type) } return "kind source: unknown type" @@ -106,9 +111,9 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. -func (ks *Kind) WaitForSync(ctx context.Context) error { +func (ks *Kind[T]) WaitForSync(ctx context.Context) error { select { - case err := <-ks.started: + case err := <-ks.startedErr: return err case <-ctx.Done(): ks.startCancel() @@ -118,3 +123,15 @@ func (ks *Kind) WaitForSync(ctx context.Context) error { return fmt.Errorf("timed out waiting for cache to be synced for Kind %T", ks.Type) } } + +func isNil(arg any) bool { + if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr || + v.Kind() == reflect.Interface || + v.Kind() == reflect.Slice || + v.Kind() == reflect.Map || + v.Kind() == reflect.Chan || + v.Kind() == reflect.Func) && v.IsNil()) { + return true + } + return false +} diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 3200313089..404c2b0f7a 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -18,6 +18,7 @@ package predicate import ( "maps" + "reflect" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -29,45 +30,51 @@ import ( var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters") // Predicate filters events before enqueuing the keys. -type Predicate interface { +type Predicate = TypedPredicate[client.Object] + +// TypedPredicate filters events before enqueuing the keys. +type TypedPredicate[T any] interface { // Create returns true if the Create event should be processed - Create(event.CreateEvent) bool + Create(event.TypedCreateEvent[T]) bool // Delete returns true if the Delete event should be processed - Delete(event.DeleteEvent) bool + Delete(event.TypedDeleteEvent[T]) bool // Update returns true if the Update event should be processed - Update(event.UpdateEvent) bool + Update(event.TypedUpdateEvent[T]) bool // Generic returns true if the Generic event should be processed - Generic(event.GenericEvent) bool + Generic(event.TypedGenericEvent[T]) bool } var _ Predicate = Funcs{} var _ Predicate = ResourceVersionChangedPredicate{} var _ Predicate = GenerationChangedPredicate{} var _ Predicate = AnnotationChangedPredicate{} -var _ Predicate = or{} -var _ Predicate = and{} -var _ Predicate = not{} +var _ Predicate = or[client.Object]{} +var _ Predicate = and[client.Object]{} +var _ Predicate = not[client.Object]{} // Funcs is a function that implements Predicate. -type Funcs struct { +type Funcs = TypedFuncs[client.Object] + +// TypedFuncs is a function that implements TypedPredicate. +type TypedFuncs[T any] struct { // Create returns true if the Create event should be processed - CreateFunc func(event.CreateEvent) bool + CreateFunc func(event.TypedCreateEvent[T]) bool // Delete returns true if the Delete event should be processed - DeleteFunc func(event.DeleteEvent) bool + DeleteFunc func(event.TypedDeleteEvent[T]) bool // Update returns true if the Update event should be processed - UpdateFunc func(event.UpdateEvent) bool + UpdateFunc func(event.TypedUpdateEvent[T]) bool // Generic returns true if the Generic event should be processed - GenericFunc func(event.GenericEvent) bool + GenericFunc func(event.TypedGenericEvent[T]) bool } // Create implements Predicate. -func (p Funcs) Create(e event.CreateEvent) bool { +func (p TypedFuncs[T]) Create(e event.TypedCreateEvent[T]) bool { if p.CreateFunc != nil { return p.CreateFunc(e) } @@ -75,7 +82,7 @@ func (p Funcs) Create(e event.CreateEvent) bool { } // Delete implements Predicate. -func (p Funcs) Delete(e event.DeleteEvent) bool { +func (p TypedFuncs[T]) Delete(e event.TypedDeleteEvent[T]) bool { if p.DeleteFunc != nil { return p.DeleteFunc(e) } @@ -83,7 +90,7 @@ func (p Funcs) Delete(e event.DeleteEvent) bool { } // Update implements Predicate. -func (p Funcs) Update(e event.UpdateEvent) bool { +func (p TypedFuncs[T]) Update(e event.TypedUpdateEvent[T]) bool { if p.UpdateFunc != nil { return p.UpdateFunc(e) } @@ -91,7 +98,7 @@ func (p Funcs) Update(e event.UpdateEvent) bool { } // Generic implements Predicate. -func (p Funcs) Generic(e event.GenericEvent) bool { +func (p TypedFuncs[T]) Generic(e event.TypedGenericEvent[T]) bool { if p.GenericFunc != nil { return p.GenericFunc(e) } @@ -118,6 +125,26 @@ func NewPredicateFuncs(filter func(object client.Object) bool) Funcs { } } +// NewTypedPredicateFuncs returns a predicate funcs that applies the given filter function +// on CREATE, UPDATE, DELETE and GENERIC events. For UPDATE events, the filter is applied +// to the new object. +func NewTypedPredicateFuncs[T any](filter func(object T) bool) TypedFuncs[T] { + return TypedFuncs[T]{ + CreateFunc: func(e event.TypedCreateEvent[T]) bool { + return filter(e.Object) + }, + UpdateFunc: func(e event.TypedUpdateEvent[T]) bool { + return filter(e.ObjectNew) + }, + DeleteFunc: func(e event.TypedDeleteEvent[T]) bool { + return filter(e.Object) + }, + GenericFunc: func(e event.TypedGenericEvent[T]) bool { + return filter(e.Object) + }, + } +} + // ResourceVersionChangedPredicate implements a default update predicate function on resource version change. type ResourceVersionChangedPredicate struct { Funcs @@ -153,17 +180,35 @@ func (ResourceVersionChangedPredicate) Update(e event.UpdateEvent) bool { // // * With this predicate, any update events with writes only to the status field will not be reconciled. // So in the event that the status block is overwritten or wiped by someone else the controller will not self-correct to restore the correct status. -type GenerationChangedPredicate struct { - Funcs +type GenerationChangedPredicate = TypedGenerationChangedPredicate[client.Object] + +// TypedGenerationChangedPredicate implements a default update predicate function on Generation change. +// +// This predicate will skip update events that have no change in the object's metadata.generation field. +// The metadata.generation field of an object is incremented by the API server when writes are made to the spec field of an object. +// This allows a controller to ignore update events where the spec is unchanged, and only the metadata and/or status fields are changed. +// +// For CustomResource objects the Generation is only incremented when the status subresource is enabled. +// +// Caveats: +// +// * The assumption that the Generation is incremented only on writing to the spec does not hold for all APIs. +// E.g For Deployment objects the Generation is also incremented on writes to the metadata.annotations field. +// For object types other than CustomResources be sure to verify which fields will trigger a Generation increment when they are written to. +// +// * With this predicate, any update events with writes only to the status field will not be reconciled. +// So in the event that the status block is overwritten or wiped by someone else the controller will not self-correct to restore the correct status. +type TypedGenerationChangedPredicate[T metav1.Object] struct { + TypedFuncs[T] } // Update implements default UpdateEvent filter for validating generation change. -func (GenerationChangedPredicate) Update(e event.UpdateEvent) bool { - if e.ObjectOld == nil { +func (TypedGenerationChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool { + if isNil(e.ObjectOld) { log.Error(nil, "Update event has no old object to update", "event", e) return false } - if e.ObjectNew == nil { + if isNil(e.ObjectNew) { log.Error(nil, "Update event has no new object for update", "event", e) return false } @@ -183,17 +228,20 @@ func (GenerationChangedPredicate) Update(e event.UpdateEvent) bool { // // This is mostly useful for controllers that needs to trigger both when the resource's generation is incremented // (i.e., when the resource' .spec changes), or an annotation changes (e.g., for a staging/alpha API). -type AnnotationChangedPredicate struct { - Funcs +type AnnotationChangedPredicate = TypedAnnotationChangedPredicate[client.Object] + +// TypedAnnotationChangedPredicate implements a default update predicate function on annotation change. +type TypedAnnotationChangedPredicate[T metav1.Object] struct { + TypedFuncs[T] } // Update implements default UpdateEvent filter for validating annotation change. -func (AnnotationChangedPredicate) Update(e event.UpdateEvent) bool { - if e.ObjectOld == nil { +func (TypedAnnotationChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool { + if isNil(e.ObjectOld) { log.Error(nil, "Update event has no old object to update", "event", e) return false } - if e.ObjectNew == nil { + if isNil(e.ObjectNew) { log.Error(nil, "Update event has no new object for update", "event", e) return false } @@ -214,17 +262,20 @@ func (AnnotationChangedPredicate) Update(e event.UpdateEvent) bool { // // This will be helpful when object's labels is carrying some extra specification information beyond object's spec, // and the controller will be triggered if any valid spec change (not only in spec, but also in labels) happens. -type LabelChangedPredicate struct { - Funcs +type LabelChangedPredicate = TypedLabelChangedPredicate[client.Object] + +// TypedLabelChangedPredicate implements a default update predicate function on label change. +type TypedLabelChangedPredicate[T metav1.Object] struct { + TypedFuncs[T] } // Update implements default UpdateEvent filter for checking label change. -func (LabelChangedPredicate) Update(e event.UpdateEvent) bool { - if e.ObjectOld == nil { +func (TypedLabelChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool { + if isNil(e.ObjectOld) { log.Error(nil, "Update event has no old object to update", "event", e) return false } - if e.ObjectNew == nil { + if isNil(e.ObjectNew) { log.Error(nil, "Update event has no new object for update", "event", e) return false } @@ -233,15 +284,15 @@ func (LabelChangedPredicate) Update(e event.UpdateEvent) bool { } // And returns a composite predicate that implements a logical AND of the predicates passed to it. -func And(predicates ...Predicate) Predicate { - return and{predicates} +func And[T any](predicates ...TypedPredicate[T]) TypedPredicate[T] { + return and[T]{predicates} } -type and struct { - predicates []Predicate +type and[T any] struct { + predicates []TypedPredicate[T] } -func (a and) Create(e event.CreateEvent) bool { +func (a and[T]) Create(e event.TypedCreateEvent[T]) bool { for _, p := range a.predicates { if !p.Create(e) { return false @@ -250,7 +301,7 @@ func (a and) Create(e event.CreateEvent) bool { return true } -func (a and) Update(e event.UpdateEvent) bool { +func (a and[T]) Update(e event.TypedUpdateEvent[T]) bool { for _, p := range a.predicates { if !p.Update(e) { return false @@ -259,7 +310,7 @@ func (a and) Update(e event.UpdateEvent) bool { return true } -func (a and) Delete(e event.DeleteEvent) bool { +func (a and[T]) Delete(e event.TypedDeleteEvent[T]) bool { for _, p := range a.predicates { if !p.Delete(e) { return false @@ -268,7 +319,7 @@ func (a and) Delete(e event.DeleteEvent) bool { return true } -func (a and) Generic(e event.GenericEvent) bool { +func (a and[T]) Generic(e event.TypedGenericEvent[T]) bool { for _, p := range a.predicates { if !p.Generic(e) { return false @@ -278,15 +329,15 @@ func (a and) Generic(e event.GenericEvent) bool { } // Or returns a composite predicate that implements a logical OR of the predicates passed to it. -func Or(predicates ...Predicate) Predicate { - return or{predicates} +func Or[T any](predicates ...TypedPredicate[T]) TypedPredicate[T] { + return or[T]{predicates} } -type or struct { - predicates []Predicate +type or[T any] struct { + predicates []TypedPredicate[T] } -func (o or) Create(e event.CreateEvent) bool { +func (o or[T]) Create(e event.TypedCreateEvent[T]) bool { for _, p := range o.predicates { if p.Create(e) { return true @@ -295,7 +346,7 @@ func (o or) Create(e event.CreateEvent) bool { return false } -func (o or) Update(e event.UpdateEvent) bool { +func (o or[T]) Update(e event.TypedUpdateEvent[T]) bool { for _, p := range o.predicates { if p.Update(e) { return true @@ -304,7 +355,7 @@ func (o or) Update(e event.UpdateEvent) bool { return false } -func (o or) Delete(e event.DeleteEvent) bool { +func (o or[T]) Delete(e event.TypedDeleteEvent[T]) bool { for _, p := range o.predicates { if p.Delete(e) { return true @@ -313,7 +364,7 @@ func (o or) Delete(e event.DeleteEvent) bool { return false } -func (o or) Generic(e event.GenericEvent) bool { +func (o or[T]) Generic(e event.TypedGenericEvent[T]) bool { for _, p := range o.predicates { if p.Generic(e) { return true @@ -323,27 +374,27 @@ func (o or) Generic(e event.GenericEvent) bool { } // Not returns a predicate that implements a logical NOT of the predicate passed to it. -func Not(predicate Predicate) Predicate { - return not{predicate} +func Not[T any](predicate TypedPredicate[T]) TypedPredicate[T] { + return not[T]{predicate} } -type not struct { - predicate Predicate +type not[T any] struct { + predicate TypedPredicate[T] } -func (n not) Create(e event.CreateEvent) bool { +func (n not[T]) Create(e event.TypedCreateEvent[T]) bool { return !n.predicate.Create(e) } -func (n not) Update(e event.UpdateEvent) bool { +func (n not[T]) Update(e event.TypedUpdateEvent[T]) bool { return !n.predicate.Update(e) } -func (n not) Delete(e event.DeleteEvent) bool { +func (n not[T]) Delete(e event.TypedDeleteEvent[T]) bool { return !n.predicate.Delete(e) } -func (n not) Generic(e event.GenericEvent) bool { +func (n not[T]) Generic(e event.TypedGenericEvent[T]) bool { return !n.predicate.Generic(e) } @@ -358,3 +409,41 @@ func LabelSelectorPredicate(s metav1.LabelSelector) (Predicate, error) { return selector.Matches(labels.Set(o.GetLabels())) }), nil } + +func isNil(arg any) bool { + if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr || + v.Kind() == reflect.Interface || + v.Kind() == reflect.Slice || + v.Kind() == reflect.Map || + v.Kind() == reflect.Chan || + v.Kind() == reflect.Func) && v.IsNil()) { + return true + } + return false +} + +// ToTyped converts a Predicate to a TypedPredicate. +func ToTyped[T client.Object](p Predicate) TypedPredicate[T] { + return TypedFuncs[T]{ + CreateFunc: func(e event.TypedCreateEvent[T]) bool { + return p.Create(event.TypedCreateEvent[client.Object]{Object: e.Object}) + }, + UpdateFunc: func(e event.TypedUpdateEvent[T]) bool { + return p.Update(event.TypedUpdateEvent[client.Object]{ + ObjectOld: e.ObjectOld, + ObjectNew: e.ObjectNew, + }) + }, + DeleteFunc: func(e event.TypedDeleteEvent[T]) bool { + return p.Delete(event.TypedDeleteEvent[client.Object]{ + Object: e.Object, + DeleteStateUnknown: e.DeleteStateUnknown, + }) + }, + GenericFunc: func(e event.TypedGenericEvent[T]) bool { + return p.Generic(event.TypedGenericEvent[client.Object]{ + Object: e.Object, + }) + }, + } +} diff --git a/pkg/source/example_test.go b/pkg/source/example_test.go index 7940a225d1..b596ff0a0a 100644 --- a/pkg/source/example_test.go +++ b/pkg/source/example_test.go @@ -31,7 +31,7 @@ var ctrl controller.Controller // This example Watches for Pod Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request // with the Name and Namespace of the Pod. func ExampleKind() { - err := ctrl.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})) + err := ctrl.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})) if err != nil { // handle it } @@ -43,10 +43,10 @@ func ExampleChannel() { events := make(chan event.GenericEvent) err := ctrl.Watch( - &source.Channel{ - Source: events, - Handler: &handler.EnqueueRequestForObject{}, - }, + source.Channel( + events, + &handler.EnqueueRequestForObject{}, + ), ) if err != nil { // handle it diff --git a/pkg/source/source.go b/pkg/source/source.go index e36450d88d..a88298c5ee 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -32,11 +32,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) -const ( - // defaultBufferSize is the default number of event notifications that can be buffered. - defaultBufferSize = 1024 -) - // Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) // which should be processed by event.EventHandlers to enqueue reconcile.Requests. // @@ -59,8 +54,8 @@ type SyncingSource interface { } // Kind creates a KindSource with the given cache provider. -func Kind(cache cache.Cache, object client.Object, handler handler.EventHandler, predicates ...predicate.Predicate) SyncingSource { - return &internal.Kind{ +func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource { + return &internal.Kind[T]{ Type: object, Cache: cache, Handler: handler, @@ -68,56 +63,55 @@ func Kind(cache cache.Cache, object client.Object, handler handler.EventHandler, } } -var _ Source = &Channel{} +var _ Source = &channel[string]{} // Channel is used to provide a source of events originating outside the cluster // (e.g. GitHub Webhook callback). Channel requires the user to wire the external // source (e.g. http handler) to write GenericEvents to the underlying channel. -type Channel struct { +func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) Source { + return &channel[T]{ + source: source, + handler: handler, + predicates: predicates, + } +} + +type channel[T any] struct { // once ensures the event distribution goroutine will be performed only once once sync.Once - // Source is the source channel to fetch GenericEvents - Source <-chan event.GenericEvent + // source is the source channel to fetch GenericEvents + source <-chan event.TypedGenericEvent[T] - Handler handler.EventHandler + handler handler.TypedEventHandler[T] - Predicates []predicate.Predicate + predicates []predicate.TypedPredicate[T] // dest is the destination channels of the added event handlers - dest []chan event.GenericEvent - - // DestBufferSize is the specified buffer size of dest channels. - // Default to 1024 if not specified. - DestBufferSize int + dest []chan event.TypedGenericEvent[T] // destLock is to ensure the destination channels are safely added/removed destLock sync.Mutex } -func (cs *Channel) String() string { +func (cs *channel[T]) String() string { return fmt.Sprintf("channel source: %p", cs) } // Start implements Source and should only be called by the Controller. -func (cs *Channel) Start( +func (cs *channel[T]) Start( ctx context.Context, queue workqueue.RateLimitingInterface, ) error { // Source should have been specified by the user. - if cs.Source == nil { + if cs.source == nil { return fmt.Errorf("must specify Channel.Source") } - if cs.Handler == nil { + if cs.handler == nil { return errors.New("must specify Channel.Handler") } - // use default value if DestBufferSize not specified - if cs.DestBufferSize == 0 { - cs.DestBufferSize = defaultBufferSize - } - - dst := make(chan event.GenericEvent, cs.DestBufferSize) + dst := make(chan event.TypedGenericEvent[T], 1024) cs.destLock.Lock() cs.dest = append(cs.dest, dst) @@ -131,7 +125,7 @@ func (cs *Channel) Start( go func() { for evt := range dst { shouldHandle := true - for _, p := range cs.Predicates { + for _, p := range cs.predicates { if !p.Generic(evt) { shouldHandle = false break @@ -142,7 +136,7 @@ func (cs *Channel) Start( func() { ctx, cancel := context.WithCancel(ctx) defer cancel() - cs.Handler.Generic(ctx, evt, queue) + cs.handler.Generic(ctx, evt, queue) }() } } @@ -151,7 +145,7 @@ func (cs *Channel) Start( return nil } -func (cs *Channel) doStop() { +func (cs *channel[T]) doStop() { cs.destLock.Lock() defer cs.destLock.Unlock() @@ -160,7 +154,7 @@ func (cs *Channel) doStop() { } } -func (cs *Channel) distribute(evt event.GenericEvent) { +func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) { cs.destLock.Lock() defer cs.destLock.Unlock() @@ -174,14 +168,14 @@ func (cs *Channel) distribute(evt event.GenericEvent) { } } -func (cs *Channel) syncLoop(ctx context.Context) { +func (cs *channel[T]) syncLoop(ctx context.Context) { for { select { case <-ctx.Done(): // Close destination channels cs.doStop() return - case evt, stillOpen := <-cs.Source: + case evt, stillOpen := <-cs.source: if !stillOpen { // if the source channel is closed, we're never gonna get // anything more on it, so stop & bail diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 84b0624afa..c781b45e3a 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -25,6 +25,7 @@ import ( . "github.com/onsi/gomega" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -65,22 +66,22 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}, handler.Funcs{ - CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{ + CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt.Object).To(Equal(p)) close(c) }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -102,12 +103,12 @@ var _ = Describe("Source", func() { ic := &informertest.FakeInformers{} q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}, handler.Funcs{ - CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{ + CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, evt event.TypedUpdateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.ObjectOld).To(Equal(p)) @@ -116,11 +117,11 @@ var _ = Describe("Source", func() { close(c) }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -147,22 +148,22 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}, handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{ + CreateFunc: func(context.Context, event.TypedCreateEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, evt event.TypedDeleteEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.Object).To(Equal(p)) close(c) }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -187,15 +188,21 @@ var _ = Describe("Source", func() { }) It("should return an error from Start if a type was not provided", func() { - instance := source.Kind(ic, nil, nil) + instance := source.Kind[client.Object](ic, nil, nil) err := instance.Start(ctx, nil) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object")) }) + It("should return an error from Start if a handler was not provided", func() { + instance := source.Kind(ic, &corev1.Pod{}, nil) + err := instance.Start(ctx, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("must create Kind with non-nil handler")) + }) It("should return an error if syncing fails", func() { f := false - instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, nil) + instance := source.Kind[client.Object](&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.EnqueueRequestForObject{}) Expect(instance.Start(context.Background(), nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) @@ -211,7 +218,7 @@ var _ = Describe("Source", func() { ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - instance := source.Kind(ic, &corev1.Pod{}, handler.Funcs{}) + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{}) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) Eventually(instance.WaitForSync).WithArguments(context.Background()).Should(HaveOccurred()) @@ -220,7 +227,7 @@ var _ = Describe("Source", func() { It("should return an error if syncing fails", func() { f := false - instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, nil) + instance := source.Kind[client.Object](&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.EnqueueRequestForObject{}) Expect(instance.Start(context.Background(), nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) @@ -287,9 +294,9 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{ - Source: ch, - Handler: handler.Funcs{ + instance := source.Channel( + ch, + handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -311,8 +318,8 @@ var _ = Describe("Source", func() { close(c) }, }, - Predicates: []predicate.Predicate{prct}, - } + prct, + ) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) @@ -329,9 +336,9 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{ - Source: ch, - Handler: handler.Funcs{ + instance := source.Channel( + ch, + handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -357,8 +364,7 @@ var _ = Describe("Source", func() { } }, }, - } - instance.DestBufferSize = 1 + ) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) @@ -388,9 +394,9 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{ - Source: ch, - Handler: handler.Funcs{ + instance := source.Channel( + ch, + handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -409,8 +415,7 @@ var _ = Describe("Source", func() { close(processed) }, }, - } - instance.DestBufferSize = 1 + ) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) @@ -432,9 +437,9 @@ var _ = Describe("Source", func() { By("feeding that channel to a channel source") processed := make(chan struct{}) defer close(processed) - src := &source.Channel{ - Source: ch, - Handler: handler.Funcs{ + src := source.Channel( + ch, + handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -453,7 +458,7 @@ var _ = Describe("Source", func() { processed <- struct{}{} }, }, - } + ) err := src.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) @@ -464,7 +469,7 @@ var _ = Describe("Source", func() { }) It("should get error if no source specified", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{ /*no source specified*/ } + instance := source.Channel[string](nil, nil /*no source specified*/) err := instance.Start(ctx, q) Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source"))) }) From dcb8c7525405b04ce10d407b12e66649f833782c Mon Sep 17 00:00:00 2001 From: Tim Ramlot <42113979+inteon@users.noreply.github.com> Date: Mon, 22 Apr 2024 09:47:39 +0200 Subject: [PATCH 2/2] instead of passing both predicates and handler to a source, wrap the handler with a predicates filter and pass the new handler to the source Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com> --- pkg/builder/controller.go | 6 +- pkg/handler/predicates.go | 63 ++++++++++++++++++ pkg/internal/source/event_handler.go | 34 ++-------- pkg/internal/source/internal_test.go | 62 +++++++++--------- pkg/internal/source/kind.go | 5 +- pkg/source/source.go | 93 ++++++++++++++++----------- pkg/source/source_integration_test.go | 24 +++---- pkg/source/source_test.go | 46 ++++++------- 8 files changed, 197 insertions(+), 136 deletions(-) create mode 100644 pkg/handler/predicates.go diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 2c0063a837..045e050f85 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -282,7 +282,7 @@ func (blder *Builder) doWatch() error { hdler := &handler.EnqueueRequestForObject{} allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, blder.forInput.predicates...) - src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...) + src := source.Kind(blder.mgr.GetCache(), obj, handler.WithPredicates(hdler, allPredicates...)) if err := blder.ctrl.Watch(src); err != nil { return err } @@ -308,7 +308,7 @@ func (blder *Builder) doWatch() error { ) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) - src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...) + src := source.Kind(blder.mgr.GetCache(), obj, handler.WithPredicates(hdler, allPredicates...)) if err := blder.ctrl.Watch(src); err != nil { return err } @@ -325,7 +325,7 @@ func (blder *Builder) doWatch() error { } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) - if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil { + if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, handler.WithPredicates(w.handler, allPredicates...))); err != nil { return err } } diff --git a/pkg/handler/predicates.go b/pkg/handler/predicates.go new file mode 100644 index 0000000000..d632a8bb8b --- /dev/null +++ b/pkg/handler/predicates.go @@ -0,0 +1,63 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "context" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// WithPredicates returns an EventHandler that only calls the underlying handler if all the given predicates return true. +func WithPredicates[T any](handler TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) TypedEventHandler[T] { + return &TypedFuncs[T]{ + CreateFunc: func(ctx context.Context, createEvent event.TypedCreateEvent[T], queue workqueue.RateLimitingInterface) { + for _, predicate := range predicates { + if !predicate.Create(createEvent) { + return + } + } + handler.Create(ctx, createEvent, queue) + }, + UpdateFunc: func(ctx context.Context, updateEvent event.TypedUpdateEvent[T], queue workqueue.RateLimitingInterface) { + for _, predicate := range predicates { + if !predicate.Update(updateEvent) { + return + } + } + handler.Update(ctx, updateEvent, queue) + }, + DeleteFunc: func(ctx context.Context, deleteEvent event.TypedDeleteEvent[T], queue workqueue.RateLimitingInterface) { + for _, predicate := range predicates { + if !predicate.Delete(deleteEvent) { + return + } + } + handler.Delete(ctx, deleteEvent, queue) + }, + GenericFunc: func(ctx context.Context, genericEvent event.TypedGenericEvent[T], queue workqueue.RateLimitingInterface) { + for _, predicate := range predicates { + if !predicate.Generic(genericEvent) { + return + } + } + handler.Generic(ctx, genericEvent, queue) + }, + } +} diff --git a/pkg/internal/source/event_handler.go b/pkg/internal/source/event_handler.go index 8651ea453e..c84552a87d 100644 --- a/pkg/internal/source/event_handler.go +++ b/pkg/internal/source/event_handler.go @@ -26,19 +26,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" - - "sigs.k8s.io/controller-runtime/pkg/predicate" ) var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") // NewEventHandler creates a new EventHandler. -func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.TypedEventHandler[T], predicates []predicate.TypedPredicate[T]) *EventHandler[T] { +func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.TypedEventHandler[T]) *EventHandler[T] { return &EventHandler[T]{ - ctx: ctx, - handler: handler, - queue: queue, - predicates: predicates, + ctx: ctx, + handler: handler, + queue: queue, } } @@ -48,9 +45,8 @@ type EventHandler[T client.Object] struct { // that is used to propagate cancellation signals to each handler function. ctx context.Context - handler handler.TypedEventHandler[T] - queue workqueue.RateLimitingInterface - predicates []predicate.TypedPredicate[T] + handler handler.TypedEventHandler[T] + queue workqueue.RateLimitingInterface } // HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs @@ -76,12 +72,6 @@ func (e *EventHandler[T]) OnAdd(obj interface{}) { return } - for _, p := range e.predicates { - if !p.Create(c) { - return - } - } - // Invoke create handler ctx, cancel := context.WithCancel(e.ctx) defer cancel() @@ -109,12 +99,6 @@ func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) { return } - for _, p := range e.predicates { - if !p.Update(u) { - return - } - } - // Invoke update handler ctx, cancel := context.WithCancel(e.ctx) defer cancel() @@ -157,12 +141,6 @@ func (e *EventHandler[T]) OnDelete(obj interface{}) { return } - for _, p := range e.predicates { - if !p.Delete(d) { - return - } - } - // Invoke delete handler ctx, cancel := context.WithCancel(e.ctx) defer cancel() diff --git a/pkg/internal/source/internal_test.go b/pkg/internal/source/internal_test.go index f71be58424..a371b4d977 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/internal/source/internal_test.go @@ -75,7 +75,7 @@ var _ = Describe("Internal", func() { set = true }, } - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs, nil) + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs) }) Describe("EventHandler", func() { @@ -100,41 +100,41 @@ var _ = Describe("Internal", func() { }) It("should used Predicates to filter CreateEvents", func() { - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, - }) + )) set = false instance.OnAdd(pod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - }) + )) instance.OnAdd(pod) Expect(set).To(BeTrue()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, - }) + )) instance.OnAdd(pod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - }) + )) instance.OnAdd(pod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - }) + )) instance.OnAdd(pod) Expect(set).To(BeTrue()) }) @@ -158,40 +158,40 @@ var _ = Describe("Internal", func() { It("should used Predicates to filter UpdateEvents", func() { set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }}, - }) + )) instance.OnUpdate(pod, newPod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, - }) + )) instance.OnUpdate(pod, newPod) Expect(set).To(BeTrue()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, - }) + )) instance.OnUpdate(pod, newPod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, - }) + )) instance.OnUpdate(pod, newPod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - }) + )) instance.OnUpdate(pod, newPod) Expect(set).To(BeTrue()) }) @@ -216,40 +216,40 @@ var _ = Describe("Internal", func() { It("should used Predicates to filter DeleteEvents", func() { set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, - }) + )) instance.OnDelete(pod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, - }) + )) instance.OnDelete(pod) Expect(set).To(BeTrue()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, - }) + )) instance.OnDelete(pod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, - }) + )) instance.OnDelete(pod) Expect(set).To(BeFalse()) set = false - instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, handler.WithPredicates(setfuncs, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, - }) + )) instance.OnDelete(pod) Expect(set).To(BeTrue()) }) diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 03431d1d24..ed0c63dc06 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -15,7 +15,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" ) // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). @@ -28,8 +27,6 @@ type Kind[T client.Object] struct { Handler handler.TypedEventHandler[T] - Predicates []predicate.TypedPredicate[T] - // startedErr may contain an error if one was encountered during startup. If its closed and does not // contain an error, startup and syncing finished. startedErr chan error @@ -87,7 +84,7 @@ func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterf return } - _, err := i.AddEventHandler(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs()) + _, err := i.AddEventHandler(NewEventHandler(ctx, queue, ks.Handler).HandlerFuncs()) if err != nil { ks.startedErr <- err return diff --git a/pkg/source/source.go b/pkg/source/source.go index a88298c5ee..b57b8c0e41 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -29,7 +29,6 @@ import ( internal "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/predicate" ) // Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) @@ -54,25 +53,53 @@ type SyncingSource interface { } // Kind creates a KindSource with the given cache provider. -func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource { +func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T]) SyncingSource { return &internal.Kind[T]{ - Type: object, - Cache: cache, - Handler: handler, - Predicates: predicates, + Type: object, + Cache: cache, + Handler: handler, } } var _ Source = &channel[string]{} +type channelOptions struct { + DestBufferSize int +} + +// ChannelOption is a functional option for configuring a Channel source. +type ChannelOption func(*channelOptions) + +// WithDestBufferSize specifies the buffer size of dest channels. +func WithDestBufferSize(destBufferSize int) ChannelOption { + return func(o *channelOptions) { + if destBufferSize <= 0 { + return // ignore invalid buffer size + } + + o.DestBufferSize = destBufferSize + } +} + // Channel is used to provide a source of events originating outside the cluster // (e.g. GitHub Webhook callback). Channel requires the user to wire the external // source (e.g. http handler) to write GenericEvents to the underlying channel. -func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) Source { +func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], options ...ChannelOption) Source { + opts := channelOptions{ + // 1024 is the default number of event notifications that can be buffered. + DestBufferSize: 1024, + } + for _, o := range options { + if o == nil { + continue // ignore nil options + } + o(&opts) + } + return &channel[T]{ - source: source, - handler: handler, - predicates: predicates, + options: opts, + source: source, + handler: handler, } } @@ -85,7 +112,7 @@ type channel[T any] struct { handler handler.TypedEventHandler[T] - predicates []predicate.TypedPredicate[T] + options channelOptions // dest is the destination channels of the added event handlers dest []chan event.TypedGenericEvent[T] @@ -111,7 +138,7 @@ func (cs *channel[T]) Start( return errors.New("must specify Channel.Handler") } - dst := make(chan event.TypedGenericEvent[T], 1024) + dst := make(chan event.TypedGenericEvent[T], cs.options.DestBufferSize) cs.destLock.Lock() cs.dest = append(cs.dest, dst) @@ -124,21 +151,11 @@ func (cs *channel[T]) Start( go func() { for evt := range dst { - shouldHandle := true - for _, p := range cs.predicates { - if !p.Generic(evt) { - shouldHandle = false - break - } - } - - if shouldHandle { - func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - cs.handler.Generic(ctx, evt, queue) - }() - } + func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + cs.handler.Generic(ctx, evt, queue) + }() } }() @@ -187,19 +204,23 @@ func (cs *channel[T]) syncLoop(ctx context.Context) { } } -// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Informer struct { +// informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type informer struct { // Informer is the controller-runtime Informer - Informer cache.Informer - Handler handler.EventHandler - Predicates []predicate.Predicate + Informer cache.Informer + Handler handler.EventHandler } -var _ Source = &Informer{} +var _ Source = &informer{} + +// Informer creates an InformerSource with the given cache provider. +func Informer(inf cache.Informer, eventhandler handler.EventHandler) Source { + return &informer{Informer: inf, Handler: eventhandler} +} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { +func (is *informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { // Informer should have been specified by the user. if is.Informer == nil { return fmt.Errorf("must specify Informer.Informer") @@ -208,14 +229,14 @@ func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInter return errors.New("must specify Informer.Handler") } - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs()) + _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler).HandlerFuncs()) if err != nil { return err } return nil } -func (is *Informer) String() string { +func (is *informer) String() string { return fmt.Sprintf("informer source: %p", is.Informer) } diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index f6b2948874..5688e17b8a 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -238,9 +238,9 @@ var _ = Describe("Source", func() { c := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{ - Informer: depInformer, - Handler: handler.Funcs{ + instance := source.Informer( + depInformer, + handler.Funcs{ CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() var err error @@ -264,7 +264,7 @@ var _ = Describe("Source", func() { Fail("Unexpected GenericEvent") }, }, - } + ) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) @@ -282,9 +282,9 @@ var _ = Describe("Source", func() { rs2.SetLabels(map[string]string{"biz": "baz"}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{ - Informer: depInformer, - Handler: handler.Funcs{ + instance := source.Informer( + depInformer, + handler.Funcs{ CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { }, UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { @@ -309,7 +309,7 @@ var _ = Describe("Source", func() { Fail("Unexpected GenericEvent") }, }, - } + ) err = instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) @@ -322,9 +322,9 @@ var _ = Describe("Source", func() { c := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{ - Informer: depInformer, - Handler: handler.Funcs{ + instance := source.Informer( + depInformer, + handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { }, UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { @@ -340,7 +340,7 @@ var _ = Describe("Source", func() { Fail("Unexpected GenericEvent") }, }, - } + ) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index c781b45e3a..aacb8ec2a5 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -296,29 +296,31 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := source.Channel( ch, - handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected CreateEvent") - }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected UpdateEvent") + handler.WithPredicates( + handler.Funcs{ + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected CreateEvent") + }, + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected UpdateEvent") + }, + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected DeleteEvent") + }, + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + // The empty event should have been filtered out by the predicates, + // and will not be passed to the handler. + Expect(q2).To(BeIdenticalTo(q)) + Expect(evt.Object).To(Equal(p)) + close(c) + }, }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected DeleteEvent") - }, - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { - defer GinkgoRecover() - // The empty event should have been filtered out by the predicates, - // and will not be passed to the handler. - Expect(q2).To(BeIdenticalTo(q)) - Expect(evt.Object).To(Equal(p)) - close(c) - }, - }, - prct, + prct, + ), ) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred())