diff --git a/.golangci.yml b/.golangci.yml index ed692daa31..86110ffefa 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -80,6 +80,8 @@ issues: - Subprocess launch(ed with variable|ing should be audited) - (G204|G104|G307) - "ST1000: at least one file in a package should have a package comment" + - "SA1019: \"sigs.k8s.io/controller-runtime/pkg/runtime/inject\"" + - "SA1019: inject.*" exclude-rules: - linters: - gosec diff --git a/examples/builtins/main.go b/examples/builtins/main.go index ff1f0dfa3b..cddaae24bb 100644 --- a/examples/builtins/main.go +++ b/examples/builtins/main.go @@ -59,14 +59,14 @@ func main() { } // Watch ReplicaSets and enqueue ReplicaSet object key - if err := c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil { entryLog.Error(err, "unable to watch ReplicaSets") os.Exit(1) } // Watch Pods and enqueue owning ReplicaSet key - if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, - &handler.EnqueueRequestForOwner{OwnerType: &appsv1.ReplicaSet{}, IsController: true}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil { entryLog.Error(err, "unable to watch Pods") os.Exit(1) } diff --git a/hack/test-all.sh b/hack/test-all.sh index 202dd89492..34d841cfd0 100755 --- a/hack/test-all.sh +++ b/hack/test-all.sh @@ -25,7 +25,7 @@ if [[ -n ${ARTIFACTS:-} ]]; then fi result=0 -go test -race ${P_FLAG} ${MOD_OPT} ./... ${GINKGO_ARGS} || result=$? +go test -v -race ${P_FLAG} ${MOD_OPT} ./... --ginkgo.fail-fast ${GINKGO_ARGS} || result=$? if [[ -n ${ARTIFACTS:-} ]]; then mkdir -p ${ARTIFACTS} diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 03f9633a74..46ed8248a4 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -217,11 +218,11 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client. func (blder *Builder) doWatch() error { // Reconcile type if blder.forInput.object != nil { - typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) + obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } - src := &source.Kind{Type: typeForSrc} + src := source.Kind(blder.mgr.GetCache(), obj) hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { @@ -234,15 +235,16 @@ func (blder *Builder) doWatch() error { return errors.New("Owns() can only be used together with For()") } for _, own := range blder.ownsInput { - typeForSrc, err := blder.project(own.object, own.objectProjection) + obj, err := blder.project(own.object, own.objectProjection) if err != nil { return err } - src := &source.Kind{Type: typeForSrc} - hdler := &handler.EnqueueRequestForOwner{ - OwnerType: blder.forInput.object, - IsController: true, - } + src := source.Kind(blder.mgr.GetCache(), obj) + hdler := handler.EnqueueRequestForOwner( + blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(), + blder.forInput.object, + handler.OnlyControllerOwner(), + ) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { @@ -258,8 +260,8 @@ func (blder *Builder) doWatch() error { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) - // If the source of this watch is of type *source.Kind, project it. - if srckind, ok := w.src.(*source.Kind); ok { + // If the source of this watch is of type Kind, project it. + if srckind, ok := w.src.(*internalsource.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 1e5849deb1..c37f1d36bc 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -118,7 +118,7 @@ var _ = Describe("application", func() { Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). - Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}). + Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}). Build(noop) Expect(err).To(MatchError(ContainSubstring("one of For() or Named() must be called"))) Expect(instance).To(BeNil()) @@ -157,7 +157,7 @@ var _ = Describe("application", func() { instance, err := ControllerManagedBy(m). Named("my_controller"). - Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}). + Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}). Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) @@ -369,8 +369,9 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(m). For(&appsv1.Deployment{}). Watches( // Equivalent of Owns - &source.Kind{Type: &appsv1.ReplicaSet{}}, - &handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true}) + source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), + ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -384,10 +385,11 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(m). Named("Deployment"). Watches( // Equivalent of For - &source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}). + source.Kind(m.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}). Watches( // Equivalent of Owns - &source.Kind{Type: &appsv1.ReplicaSet{}}, - &handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true}) + source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), + ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -481,7 +483,7 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(mgr). For(&appsv1.Deployment{}, OnlyMetadata). Owns(&appsv1.ReplicaSet{}, OnlyMetadata). - Watches(&source.Kind{Type: &appsv1.StatefulSet{}}, + Watches(source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}), handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { defer GinkgoRecover() diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 905296cd35..eb0e68a095 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -37,14 +37,12 @@ import ( // Cluster provides various methods to interact with a cluster. type Cluster interface { - // SetFields will set any dependencies on an object for which the object has implemented the inject - // interface - e.g. inject.Client. - // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. - SetFields(interface{}) error - // GetConfig returns an initialized Config GetConfig() *rest.Config + // GetCache returns a cache.Cache + GetCache() cache.Cache + // GetScheme returns an initialized Scheme GetScheme() *runtime.Scheme @@ -57,9 +55,6 @@ type Cluster interface { // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer - // GetCache returns a cache.Cache - GetCache() cache.Cache - // GetEventRecorderFor returns a new EventRecorder for the provided name GetEventRecorderFor(name string) record.EventRecorder diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 0da179d3f0..ba127d8fe9 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -29,11 +29,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" - logf "sigs.k8s.io/controller-runtime/pkg/internal/log" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) var _ = Describe("cluster.Cluster", func() { @@ -111,78 +108,6 @@ var _ = Describe("cluster.Cluster", func() { }) }) - Describe("SetFields", func() { - It("should inject field values", func() { - c, err := New(cfg, func(o *Options) { - o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { - return &informertest.FakeInformers{}, nil - } - }) - Expect(err).NotTo(HaveOccurred()) - - By("Injecting the dependencies") - err = c.SetFields(&injectable{ - scheme: func(scheme *runtime.Scheme) error { - defer GinkgoRecover() - Expect(scheme).To(Equal(c.GetScheme())) - return nil - }, - config: func(config *rest.Config) error { - defer GinkgoRecover() - Expect(config).To(Equal(c.GetConfig())) - return nil - }, - client: func(client client.Client) error { - defer GinkgoRecover() - Expect(client).To(Equal(c.GetClient())) - return nil - }, - cache: func(cache cache.Cache) error { - defer GinkgoRecover() - Expect(cache).To(Equal(c.GetCache())) - return nil - }, - log: func(logger logr.Logger) error { - defer GinkgoRecover() - Expect(logger).To(Equal(logf.RuntimeLog.WithName("cluster"))) - return nil - }, - }) - Expect(err).NotTo(HaveOccurred()) - - By("Returning an error if dependency injection fails") - - expected := fmt.Errorf("expected error") - err = c.SetFields(&injectable{ - client: func(client client.Client) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = c.SetFields(&injectable{ - scheme: func(scheme *runtime.Scheme) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = c.SetFields(&injectable{ - config: func(config *rest.Config) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = c.SetFields(&injectable{ - cache: func(c cache.Cache) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - }) - }) - It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() @@ -242,56 +167,3 @@ var _ = Describe("cluster.Cluster", func() { Expect(c.GetAPIReader()).NotTo(BeNil()) }) }) - -var _ inject.Cache = &injectable{} -var _ inject.Client = &injectable{} -var _ inject.Scheme = &injectable{} -var _ inject.Config = &injectable{} -var _ inject.Logger = &injectable{} - -type injectable struct { - scheme func(scheme *runtime.Scheme) error - client func(client.Client) error - config func(config *rest.Config) error - cache func(cache.Cache) error - log func(logger logr.Logger) error -} - -func (i *injectable) InjectCache(c cache.Cache) error { - if i.cache == nil { - return nil - } - return i.cache(c) -} - -func (i *injectable) InjectConfig(config *rest.Config) error { - if i.config == nil { - return nil - } - return i.config(config) -} - -func (i *injectable) InjectClient(c client.Client) error { - if i.client == nil { - return nil - } - return i.client(c) -} - -func (i *injectable) InjectScheme(scheme *runtime.Scheme) error { - if i.scheme == nil { - return nil - } - return i.scheme(scheme) -} - -func (i *injectable) InjectLogger(log logr.Logger) error { - if i.log == nil { - return nil - } - return i.log(log) -} - -func (i *injectable) Start(<-chan struct{}) error { - return nil -} diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go index 125e1d144e..a84e4526d6 100644 --- a/pkg/cluster/internal.go +++ b/pkg/cluster/internal.go @@ -28,7 +28,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) type cluster struct { @@ -64,28 +63,6 @@ type cluster struct { logger logr.Logger } -func (c *cluster) SetFields(i interface{}) error { - if _, err := inject.ConfigInto(c.config, i); err != nil { - return err - } - if _, err := inject.ClientInto(c.client, i); err != nil { - return err - } - if _, err := inject.APIReaderInto(c.apiReader, i); err != nil { - return err - } - if _, err := inject.SchemeInto(c.scheme, i); err != nil { - return err - } - if _, err := inject.CacheInto(c.cache, i); err != nil { - return err - } - if _, err := inject.MapperInto(c.mapper, i); err != nil { - return err - } - return nil -} - func (c *cluster) GetConfig() *rest.Config { return c.config } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8e0a9a91de..bd29ba9a57 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -139,11 +139,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller options.RateLimiter = workqueue.DefaultControllerRateLimiter() } - // Inject dependencies into Reconciler - if err := mgr.SetFields(options.Reconciler); err != nil { - return nil, err - } - if options.RecoverPanic == nil { options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic } @@ -156,7 +151,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, CacheSyncTimeout: options.CacheSyncTimeout, - SetFields: mgr.SetFields, Name: name, LogConstructor: options.LogConstructor, RecoverPanic: options.RecoverPanic, diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 3ddd2ccf60..48facf1e94 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -64,12 +64,13 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) By("Watching Resources") - err = instance.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.Deployment{}, - }) + err = instance.Watch( + source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}), + ) Expect(err).NotTo(HaveOccurred()) - err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 66e11b7d50..78c6ffebcc 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -18,7 +18,6 @@ package controller_test import ( "context" - "fmt" "time" . "github.com/onsi/ginkgo/v2" @@ -27,7 +26,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/utils/pointer" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" @@ -35,7 +33,6 @@ import ( internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -62,16 +59,6 @@ var _ = Describe("controller.Controller", func() { Expect(err.Error()).To(ContainSubstring("must specify Reconciler")) }) - It("NewController should return an error if injecting Reconciler fails", func() { - m, err := manager.New(cfg, manager.Options{}) - Expect(err).NotTo(HaveOccurred()) - - c, err := controller.New("foo", m, controller.Options{Reconciler: &failRec{}}) - Expect(c).To(BeNil()) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("expected error")) - }) - It("should not return an error if two controllers are registered with different names", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) @@ -88,6 +75,7 @@ var _ = Describe("controller.Controller", func() { It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() + ctx, cancel := context.WithCancel(context.Background()) watchChan := make(chan event.GenericEvent, 1) watch := &source.Channel{Source: watchChan} watchChan <- event.GenericEvent{Object: &corev1.Pod{}} @@ -114,7 +102,6 @@ var _ = Describe("controller.Controller", func() { Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed()) Expect(err).NotTo(HaveOccurred()) - ctx, cancel := context.WithCancel(context.Background()) go func() { defer GinkgoRecover() Expect(m.Start(ctx)).To(Succeed()) @@ -224,16 +211,3 @@ var _ = Describe("controller.Controller", func() { }) }) }) - -var _ reconcile.Reconciler = &failRec{} -var _ inject.Client = &failRec{} - -type failRec struct{} - -func (*failRec) Reconcile(context.Context, reconcile.Request) (reconcile.Result, error) { - return reconcile.Result{}, nil -} - -func (*failRec) InjectClient(client.Client) error { - return fmt.Errorf("expected error") -} diff --git a/pkg/controller/example_test.go b/pkg/controller/example_test.go index 3d8e399703..d4fa1aef0b 100644 --- a/pkg/controller/example_test.go +++ b/pkg/controller/example_test.go @@ -71,7 +71,7 @@ func ExampleController() { } // Watch for Pod create / update / delete events and call Reconcile - err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}) + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -108,7 +108,7 @@ func ExampleController_unstructured() { Version: "v1", }) // Watch for Pod create / update / delete events and call Reconcile - err = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}) + err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{}) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() { os.Exit(1) } - if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil { log.Error(err, "unable to watch pods") os.Exit(1) } diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index 17401b1fdb..bdbc24bfa6 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -21,7 +21,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) // MapFunc is the signature required for enqueueing requests from a generic function. @@ -85,13 +84,3 @@ func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInter } } } - -// EnqueueRequestsFromMapFunc can inject fields into the mapper. - -// InjectFunc implements inject.Injector. -func (e *enqueueRequestsFromMapFunc) InjectFunc(f inject.Func) error { - if f == nil { - return nil - } - return f(e.toRequests) -} diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 63699893fc..a7cf6f2c4b 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -25,15 +25,18 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) -var _ EventHandler = &EnqueueRequestForOwner{} +var _ EventHandler = &enqueueRequestForOwner{} -var log = logf.RuntimeLog.WithName("eventhandler").WithName("EnqueueRequestForOwner") +var log = logf.RuntimeLog.WithName("eventhandler").WithName("enqueueRequestForOwner") + +// OwnerOption modifies an EnqueueRequestForOwner EventHandler. +type OwnerOption func(e *enqueueRequestForOwner) // EnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created // the object that was the source of the Event. @@ -42,13 +45,34 @@ var log = logf.RuntimeLog.WithName("eventhandler").WithName("EnqueueRequestForOw // // - a source.Kind Source with Type of Pod. // -// - a handler.EnqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and IsController set to true. -type EnqueueRequestForOwner struct { - // OwnerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. - OwnerType runtime.Object +// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. +func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler { + e := &enqueueRequestForOwner{ + ownerType: ownerType, + mapper: mapper, + } + if err := e.parseOwnerTypeGroupKind(scheme); err != nil { + panic(err) + } + for _, opt := range opts { + opt(e) + } + return e +} + +// OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true. +func OnlyControllerOwner() OwnerOption { + return func(e *enqueueRequestForOwner) { + e.isController = true + } +} - // IsController if set will only look at the first OwnerReference with Controller: true. - IsController bool +type enqueueRequestForOwner struct { + // ownerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. + ownerType runtime.Object + + // isController if set will only look at the first OwnerReference with Controller: true. + isController bool // groupKind is the cached Group and Kind from OwnerType groupKind schema.GroupKind @@ -58,7 +82,7 @@ type EnqueueRequestForOwner struct { } // Create implements EventHandler. -func (e *EnqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -67,7 +91,7 @@ func (e *EnqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateL } // Update implements EventHandler. -func (e *EnqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.ObjectOld, reqs) e.getOwnerReconcileRequest(evt.ObjectNew, reqs) @@ -77,7 +101,7 @@ func (e *EnqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateL } // Delete implements EventHandler. -func (e *EnqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -86,7 +110,7 @@ func (e *EnqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateL } // Generic implements EventHandler. -func (e *EnqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -96,17 +120,17 @@ func (e *EnqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.Rat // parseOwnerTypeGroupKind parses the OwnerType into a Group and Kind and caches the result. Returns false // if the OwnerType could not be parsed using the scheme. -func (e *EnqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error { +func (e *enqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error { // Get the kinds of the type - kinds, _, err := scheme.ObjectKinds(e.OwnerType) + kinds, _, err := scheme.ObjectKinds(e.ownerType) if err != nil { - log.Error(err, "Could not get ObjectKinds for OwnerType", "owner type", fmt.Sprintf("%T", e.OwnerType)) + log.Error(err, "Could not get ObjectKinds for OwnerType", "owner type", fmt.Sprintf("%T", e.ownerType)) return err } // Expect only 1 kind. If there is more than one kind this is probably an edge case such as ListOptions. if len(kinds) != 1 { - err := fmt.Errorf("expected exactly 1 kind for OwnerType %T, but found %s kinds", e.OwnerType, kinds) - log.Error(nil, "expected exactly 1 kind for OwnerType", "owner type", fmt.Sprintf("%T", e.OwnerType), "kinds", kinds) + err := fmt.Errorf("expected exactly 1 kind for OwnerType %T, but found %s kinds", e.ownerType, kinds) + log.Error(nil, "expected exactly 1 kind for OwnerType", "owner type", fmt.Sprintf("%T", e.ownerType), "kinds", kinds) return err } // Cache the Group and Kind for the OwnerType @@ -116,7 +140,7 @@ func (e *EnqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) // getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile // owners of object that match e.OwnerType. -func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { +func (e *enqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { // Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested // by the user for _, ref := range e.getOwnersReferences(object) { @@ -138,7 +162,7 @@ func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, Name: ref.Name, }} - // if owner is not namespaced then we should set the namespace to the empty + // if owner is not namespaced then we should not set the namespace mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version) if err != nil { log.Error(err, "Could not retrieve rest mapping", "kind", e.groupKind) @@ -153,16 +177,16 @@ func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, } } -// getOwnersReferences returns the OwnerReferences for an object as specified by the EnqueueRequestForOwner +// getOwnersReferences returns the OwnerReferences for an object as specified by the enqueueRequestForOwner // - if IsController is true: only take the Controller OwnerReference (if found) // - if IsController is false: take all OwnerReferences. -func (e *EnqueueRequestForOwner) getOwnersReferences(object metav1.Object) []metav1.OwnerReference { +func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []metav1.OwnerReference { if object == nil { return nil } // If not filtered as Controller only, then use all the OwnerReferences - if !e.IsController { + if !e.isController { return object.GetOwnerReferences() } // If filtered to a Controller, only take the Controller OwnerReference @@ -172,18 +196,3 @@ func (e *EnqueueRequestForOwner) getOwnersReferences(object metav1.Object) []met // No Controller OwnerReference found return nil } - -var _ inject.Scheme = &EnqueueRequestForOwner{} - -// InjectScheme is called by the Controller to provide a singleton scheme to the EnqueueRequestForOwner. -func (e *EnqueueRequestForOwner) InjectScheme(s *runtime.Scheme) error { - return e.parseOwnerTypeGroupKind(s) -} - -var _ inject.Mapper = &EnqueueRequestForOwner{} - -// InjectMapper is called by the Controller to provide the rest mapper used by the manager. -func (e *EnqueueRequestForOwner) InjectMapper(m meta.RESTMapper) error { - e.mapper = m - return nil -} diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index d8b2211869..6eeb26854d 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" @@ -40,7 +41,6 @@ var _ = Describe("Eventhandler", func() { var instance handler.EnqueueRequestForObject var pod *corev1.Pod var mapper meta.RESTMapper - t := true BeforeEach(func() { q = controllertest.Queue{Interface: workqueue.New()} pod = &corev1.Pod{ @@ -303,11 +303,7 @@ var _ = Describe("Eventhandler", func() { Describe("EnqueueRequestForOwner", func() { It("should enqueue a Request with the Owner of the object in the CreateEvent.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -328,11 +324,7 @@ var _ = Describe("Eventhandler", func() { }) It("should enqueue a Request with the Owner of the object in the DeleteEvent.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -357,11 +349,7 @@ var _ = Describe("Eventhandler", func() { newPod.Name = pod.Name + "2" newPod.Namespace = pod.Namespace + "2" - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -398,11 +386,7 @@ var _ = Describe("Eventhandler", func() { newPod := pod.DeepCopy() newPod.Name = pod.Name + "2" - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -431,12 +415,7 @@ var _ = Describe("Eventhandler", func() { }) It("should enqueue a Request with the Owner of the object in the GenericEvent.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) - + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo-parent", @@ -456,12 +435,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue a Request if there are no owners matching Group and Kind.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) pod.OwnerReferences = []metav1.OwnerReference{ { // Wrong group Name: "foo1-parent", @@ -483,11 +457,7 @@ var _ = Describe("Eventhandler", func() { It("should enqueue a Request if there are owners matching Group "+ "and Kind with a different version.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &autoscalingv1.HorizontalPodAutoscaler{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &autoscalingv1.HorizontalPodAutoscaler{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo-parent", @@ -507,11 +477,7 @@ var _ = Describe("Eventhandler", func() { }) It("should enqueue a Request for a owner that is cluster scoped", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &corev1.Node{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &corev1.Node{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "node-1", @@ -532,11 +498,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue a Request if there are no owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) evt := event.CreateEvent{ Object: pod, } @@ -547,12 +509,7 @@ var _ = Describe("Eventhandler", func() { Context("with the Controller field set to true", func() { It("should enqueue reconcile.Requests for only the first the Controller if there are "+ "multiple Controller owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -563,7 +520,7 @@ var _ = Describe("Eventhandler", func() { Name: "foo2-parent", Kind: "ReplicaSet", APIVersion: "apps/v1", - Controller: &t, + Controller: pointer.Bool(true), }, { Name: "foo3-parent", @@ -574,7 +531,7 @@ var _ = Describe("Eventhandler", func() { Name: "foo4-parent", Kind: "ReplicaSet", APIVersion: "apps/v1", - Controller: &t, + Controller: pointer.Bool(true), }, { Name: "foo5-parent", @@ -593,12 +550,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue reconcile.Requests if there are no Controller owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -624,12 +576,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue reconcile.Requests if there are no owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) evt := event.CreateEvent{ Object: pod, } @@ -640,11 +587,7 @@ var _ = Describe("Eventhandler", func() { Context("with the Controller field set to false", func() { It("should enqueue a reconcile.Requests for all owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -684,11 +627,7 @@ var _ = Describe("Eventhandler", func() { Context("with a nil object", func() { It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -704,76 +643,17 @@ var _ = Describe("Eventhandler", func() { }) }) - Context("with a multiple matching kinds", func() { - It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &metav1.ListOptions{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).NotTo(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) - pod.OwnerReferences = []metav1.OwnerReference{ - { - Name: "foo1-parent", - Kind: "ListOptions", - APIVersion: "meta/v1", - }, - } - evt := event.CreateEvent{ - Object: pod, - } - instance.Create(evt, q) - Expect(q.Len()).To(Equal(0)) - }) - }) - Context("with an OwnerType that cannot be resolved", func() { - It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &controllertest.ErrorType{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).NotTo(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) - pod.OwnerReferences = []metav1.OwnerReference{ - { - Name: "foo1-parent", - Kind: "ListOptions", - APIVersion: "meta/v1", - }, - } - evt := event.CreateEvent{ - Object: pod, - } - instance.Create(evt, q) - Expect(q.Len()).To(Equal(0)) - }) - }) - Context("with a nil OwnerType", func() { - It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{} - Expect(instance.InjectScheme(scheme.Scheme)).NotTo(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) - pod.OwnerReferences = []metav1.OwnerReference{ - { - Name: "foo1-parent", - Kind: "OwnerType", - APIVersion: "meta/v1", - }, - } - evt := event.CreateEvent{ - Object: pod, - } - instance.Create(evt, q) - Expect(q.Len()).To(Equal(0)) + It("should panic", func() { + Expect(func() { + handler.EnqueueRequestForOwner(nil, nil, nil) + }).To(Panic()) }) }) Context("with an invalid APIVersion in the OwnerReference", func() { It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", diff --git a/pkg/handler/example_test.go b/pkg/handler/example_test.go index dbfab46157..9e1ad0a1d4 100644 --- a/pkg/handler/example_test.go +++ b/pkg/handler/example_test.go @@ -25,10 +25,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) +var mgr manager.Manager var c controller.Controller // This example watches Pods and enqueues Requests with the Name and Namespace of the Pod from @@ -36,7 +38,7 @@ var c controller.Controller func ExampleEnqueueRequestForObject() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &corev1.Pod{}}, + source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}, ) if err != nil { @@ -49,11 +51,8 @@ func ExampleEnqueueRequestForObject() { func ExampleEnqueueRequestForOwner() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &appsv1.ReplicaSet{}}, - &handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.Deployment{}, - IsController: true, - }, + source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), ) if err != nil { // handle it @@ -65,7 +64,7 @@ func ExampleEnqueueRequestForOwner() { func ExampleEnqueueRequestsFromMapFunc() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &appsv1.Deployment{}}, + source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { return []reconcile.Request{ {NamespacedName: types.NamespacedName{ @@ -88,7 +87,7 @@ func ExampleEnqueueRequestsFromMapFunc() { func ExampleFuncs() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &corev1.Pod{}}, + source.Kind(mgr.GetCache(), &corev1.Pod{}), handler.Funcs{ CreateFunc: func(e event.CreateEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 1f8f8b7398..969eeeb7d2 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -33,12 +33,9 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" ) -var _ inject.Injector = &Controller{} - // Controller implements controller.Controller. type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. @@ -61,10 +58,6 @@ type Controller struct { // the Queue for processing Queue workqueue.RateLimitingInterface - // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates - // Deprecated: the caller should handle injected fields itself. - SetFields func(i interface{}) error - // mu is used to synchronize Controller setup mu sync.Mutex @@ -130,19 +123,6 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc c.mu.Lock() defer c.mu.Unlock() - // Inject Cache into arguments - if err := c.SetFields(src); err != nil { - return err - } - if err := c.SetFields(evthdler); err != nil { - return err - } - for _, pr := range prct { - if err := c.SetFields(pr); err != nil { - return err - } - } - // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). @@ -362,12 +342,6 @@ func (c *Controller) GetLogger() logr.Logger { return c.LogConstructor(nil) } -// InjectFunc implement SetFields.Injector. -func (c *Controller) InjectFunc(f inject.Func) error { - c.SetFields = f - return nil -} - // updateMetrics updates prometheus metrics within the controller. func (c *Controller) updateMetrics(reconcileTime time.Duration) { ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds()) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 5d3b1c9bea..89048ec550 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -44,7 +44,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -52,7 +51,6 @@ var _ = Describe("controller", func() { var fakeReconcile *fakeReconciler var ctrl *Controller var queue *controllertest.Queue - var informers *informertest.FakeInformers var reconciled chan reconcile.Request var request = reconcile.Request{ NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}, @@ -67,7 +65,6 @@ var _ = Describe("controller", func() { queue = &controllertest.Queue{ Interface: workqueue.New(), } - informers = &informertest.FakeInformers{} ctrl = &Controller{ MaxConcurrentReconciles: 1, Do: fakeReconcile, @@ -76,7 +73,6 @@ var _ = Describe("controller", func() { return log.RuntimeLog.WithName("controller").WithName("test") }, } - Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed()) }) Describe("Reconciler", func() { @@ -131,7 +127,7 @@ var _ = Describe("controller", func() { It("should return an error if there is an error waiting for the informers", func() { f := false ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}), + src: source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}), }} ctrl.Name = "foo" ctx, cancel := context.WithCancel(context.Background()) @@ -149,7 +145,7 @@ var _ = Describe("controller", func() { c = &cacheWithIndefinitelyBlockingGetInformer{c} ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&appsv1.Deployment{}, c), + src: source.Kind(c, &appsv1.Deployment{}), }} ctrl.Name = "testcontroller" @@ -167,7 +163,7 @@ var _ = Describe("controller", func() { c = &cacheWithIndefinitelyBlockingGetInformer{c} ctrl.startWatches = []watchDescription{{ src: &singnallingSourceWrapper{ - SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c), + SyncingSource: source.Kind(c, &appsv1.Deployment{}), cacheSyncDone: sourceSynced, }, }} @@ -195,7 +191,7 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) ctrl.startWatches = []watchDescription{{ src: &singnallingSourceWrapper{ - SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c), + SyncingSource: source.Kind(c, &appsv1.Deployment{}), cacheSyncDone: sourceSynced, }, }} @@ -232,7 +228,6 @@ var _ = Describe("controller", func() { ins := &source.Channel{Source: ch} ins.DestBufferSize = 1 - Expect(inject.StopChannelInto(ctx.Done(), ins)).To(BeTrue()) // send the event to the channel ch <- evt @@ -254,31 +249,13 @@ var _ = Describe("controller", func() { <-processed }) - It("should error when channel is passed as a source but stop channel is not injected", func() { - ch := make(chan event.GenericEvent) - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - ins := &source.Channel{Source: ch} - ctrl.startWatches = []watchDescription{{ - src: ins, - }} - - e := ctrl.Start(ctx) - - Expect(e).NotTo(BeNil()) - Expect(e.Error()).To(ContainSubstring("must call InjectStop on Channel before calling Start")) - }) - It("should error when channel source is not specified", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ins := &source.Channel{} - Expect(inject.StopChannelInto(make(<-chan struct{}), ins)).To(BeTrue()) - ctrl.startWatches = []watchDescription{{ - src: &source.Channel{}, + src: ins, }} e := ctrl.Start(ctx) @@ -336,126 +313,6 @@ var _ = Describe("controller", func() { }) - Describe("Watch", func() { - It("should inject dependencies into the Source", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - found := false - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == src { - found = true - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).NotTo(HaveOccurred()) - Expect(found).To(BeTrue(), "Source not injected") - }) - - It("should return an error if there is an error injecting into the Source", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - expected := fmt.Errorf("expect fail source") - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == src { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).To(Equal(expected)) - }) - - It("should inject dependencies into the EventHandler", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - found := false - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == evthdl { - found = true - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).NotTo(HaveOccurred()) - Expect(found).To(BeTrue(), "EventHandler not injected") - }) - - It("should return an error if there is an error injecting into the EventHandler", func() { - src := &source.Kind{Type: &corev1.Pod{}} - evthdl := &handler.EnqueueRequestForObject{} - expected := fmt.Errorf("expect fail eventhandler") - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == evthdl { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).To(Equal(expected)) - }) - - PIt("should inject dependencies into the Reconciler", func() { - // TODO(community): Write this - }) - - PIt("should return an error if there is an error injecting into the Reconciler", func() { - // TODO(community): Write this - }) - - It("should inject dependencies into all of the Predicates", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - pr1 := &predicate.Funcs{} - pr2 := &predicate.Funcs{} - found1 := false - found2 := false - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == pr1 { - found1 = true - } - if i == pr2 { - found2 = true - } - return nil - } - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred()) - Expect(found1).To(BeTrue(), "First Predicated not injected") - Expect(found2).To(BeTrue(), "Second Predicated not injected") - }) - - It("should return an error if there is an error injecting into any of the Predicates", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - pr1 := &predicate.Funcs{} - pr2 := &predicate.Funcs{} - expected := fmt.Errorf("expect fail predicate") - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == pr1 { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected)) - - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == pr2 { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected)) - }) - }) - Describe("Processing queue items from a Controller", func() { It("should call Reconciler if an item is enqueued", func() { ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index 30928c390f..130a306053 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -56,7 +56,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Watching Resources") - err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) By("Starting the Manager") diff --git a/pkg/source/internal/eventsource.go b/pkg/internal/source/eventsource.go similarity index 100% rename from pkg/source/internal/eventsource.go rename to pkg/internal/source/eventsource.go diff --git a/pkg/source/internal/internal_suite_test.go b/pkg/internal/source/internal_suite_test.go similarity index 100% rename from pkg/source/internal/internal_suite_test.go rename to pkg/internal/source/internal_suite_test.go diff --git a/pkg/source/internal/internal_test.go b/pkg/internal/source/internal_test.go similarity index 99% rename from pkg/source/internal/internal_test.go rename to pkg/internal/source/internal_test.go index 312fd86c17..8a2e1b6fd2 100644 --- a/pkg/source/internal/internal_test.go +++ b/pkg/internal/source/internal_test.go @@ -23,7 +23,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/source/internal" + internal "sigs.k8s.io/controller-runtime/pkg/internal/source" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go new file mode 100644 index 0000000000..d33d97d571 --- /dev/null +++ b/pkg/internal/source/kind.go @@ -0,0 +1,117 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type Kind struct { + // Type is the type of object to watch. e.g. &v1.Pod{} + Type client.Object + + // Cache used to watch APIs + Cache cache.Cache + + // started may contain an error if one was encountered during startup. If its closed and does not + // contain an error, startup and syncing finished. + started chan error + startCancel func() +} + +// Start is internal and should be called only by the Controller to register an EventHandler with the Informer +// to enqueue reconcile.Requests. +func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + if ks.Type == nil { + return fmt.Errorf("must create Kind with a non-nil object") + } + if ks.Cache == nil { + return fmt.Errorf("must create Kind with a non-nil cache") + } + + // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not + // sync that informer (most commonly due to RBAC issues). + ctx, ks.startCancel = context.WithCancel(ctx) + ks.started = make(chan error) + go func() { + var ( + i cache.Informer + lastErr error + ) + + // Tries to get an informer until it returns true, + // an error or the specified context is cancelled or expired. + if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { + // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + i, lastErr = ks.Cache.GetInformer(ctx, ks.Type) + if lastErr != nil { + kindMatchErr := &meta.NoKindMatchError{} + switch { + case errors.As(lastErr, &kindMatchErr): + log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", + "kind", kindMatchErr.GroupKind) + case runtime.IsNotRegisteredError(lastErr): + log.Error(lastErr, "kind must be registered to the Scheme") + default: + log.Error(lastErr, "failed to get informer from cache") + } + return false, nil // Retry. + } + return true, nil + }); err != nil { + if lastErr != nil { + ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr) + return + } + ks.started <- err + return + } + + _, err := i.AddEventHandler(EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + if err != nil { + ks.started <- err + return + } + if !ks.Cache.WaitForCacheSync(ctx) { + // Would be great to return something more informative here + ks.started <- errors.New("cache did not sync") + } + close(ks.started) + }() + + return nil +} + +func (ks *Kind) String() string { + if ks.Type != nil { + return fmt.Sprintf("kind source: %T", ks.Type) + } + return "kind source: unknown type" +} + +// WaitForSync implements SyncingSource to allow controllers to wait with starting +// workers until the cache is synced. +func (ks *Kind) WaitForSync(ctx context.Context) error { + select { + case err := <-ks.started: + return err + case <-ctx.Done(): + ks.startCancel() + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + return errors.New("timed out waiting for cache to be synced") + } +} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 3e79f50bbd..19c8fea6ea 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -192,21 +192,18 @@ func (cm *controllerManager) Add(r Runnable) error { func (cm *controllerManager) add(r Runnable) error { // Set dependencies on the object - if err := cm.SetFields(r); err != nil { + if err := cm.setFields(r); err != nil { return err } return cm.runnables.Add(r) } // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. -func (cm *controllerManager) SetFields(i interface{}) error { - if err := cm.cluster.SetFields(i); err != nil { +func (cm *controllerManager) setFields(i interface{}) error { + if _, err := inject.SchemeInto(cm.cluster.GetScheme(), i); err != nil { return err } - if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { - return err - } - if _, err := inject.StopChannelInto(cm.internalProceduresStop, i); err != nil { + if _, err := inject.InjectorInto(cm.setFields, i); err != nil { return err } if _, err := inject.LoggerInto(cm.logger, i); err != nil { diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index f3b8443a95..4d62aee67a 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -51,11 +51,9 @@ import ( intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" "sigs.k8s.io/controller-runtime/pkg/leaderelection" fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/recorder" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -1516,101 +1514,6 @@ var _ = Describe("manger.Manager", func() { }) }) - Describe("SetFields", func() { - It("should inject field values", func() { - m, err := New(cfg, Options{ - NewCache: func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { - return &informertest.FakeInformers{}, nil - }, - }) - Expect(err).NotTo(HaveOccurred()) - - By("Injecting the dependencies") - err = m.SetFields(&injectable{ - scheme: func(scheme *runtime.Scheme) error { - defer GinkgoRecover() - Expect(scheme).To(Equal(m.GetScheme())) - return nil - }, - config: func(config *rest.Config) error { - defer GinkgoRecover() - Expect(config).To(Equal(m.GetConfig())) - return nil - }, - client: func(client client.Client) error { - defer GinkgoRecover() - Expect(client).To(Equal(m.GetClient())) - return nil - }, - cache: func(c cache.Cache) error { - defer GinkgoRecover() - Expect(c).To(Equal(m.GetCache())) - return nil - }, - stop: func(stop <-chan struct{}) error { - defer GinkgoRecover() - Expect(stop).NotTo(BeNil()) - return nil - }, - f: func(f inject.Func) error { - defer GinkgoRecover() - Expect(f).NotTo(BeNil()) - return nil - }, - log: func(logger logr.Logger) error { - defer GinkgoRecover() - Expect(logger).To(Equal(log.Log)) - return nil - }, - }) - Expect(err).NotTo(HaveOccurred()) - - By("Returning an error if dependency injection fails") - - expected := fmt.Errorf("expected error") - err = m.SetFields(&injectable{ - client: func(client client.Client) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = m.SetFields(&injectable{ - scheme: func(scheme *runtime.Scheme) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = m.SetFields(&injectable{ - config: func(config *rest.Config) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = m.SetFields(&injectable{ - cache: func(c cache.Cache) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = m.SetFields(&injectable{ - f: func(c inject.Func) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - - err = m.SetFields(&injectable{ - stop: func(<-chan struct{}) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) - }) - }) It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() @@ -1721,7 +1624,6 @@ var _ = Describe("manger.Manager", func() { }) var _ reconcile.Reconciler = &failRec{} -var _ inject.Client = &failRec{} type failRec struct{} @@ -1733,81 +1635,10 @@ func (*failRec) Start(context.Context) error { return nil } -func (*failRec) InjectClient(client.Client) error { +func (*failRec) InjectScheme(*runtime.Scheme) error { return fmt.Errorf("expected error") } -var _ inject.Injector = &injectable{} -var _ inject.Cache = &injectable{} -var _ inject.Client = &injectable{} -var _ inject.Scheme = &injectable{} -var _ inject.Config = &injectable{} -var _ inject.Stoppable = &injectable{} -var _ inject.Logger = &injectable{} - -type injectable struct { - scheme func(scheme *runtime.Scheme) error - client func(client.Client) error - config func(config *rest.Config) error - cache func(cache.Cache) error - f func(inject.Func) error - stop func(<-chan struct{}) error - log func(logger logr.Logger) error -} - -func (i *injectable) InjectCache(c cache.Cache) error { - if i.cache == nil { - return nil - } - return i.cache(c) -} - -func (i *injectable) InjectConfig(config *rest.Config) error { - if i.config == nil { - return nil - } - return i.config(config) -} - -func (i *injectable) InjectClient(c client.Client) error { - if i.client == nil { - return nil - } - return i.client(c) -} - -func (i *injectable) InjectScheme(scheme *runtime.Scheme) error { - if i.scheme == nil { - return nil - } - return i.scheme(scheme) -} - -func (i *injectable) InjectFunc(f inject.Func) error { - if i.f == nil { - return nil - } - return i.f(f) -} - -func (i *injectable) InjectStopChannel(stop <-chan struct{}) error { - if i.stop == nil { - return nil - } - return i.stop(stop) -} - -func (i *injectable) InjectLogger(log logr.Logger) error { - if i.log == nil { - return nil - } - return i.log(log) -} - -func (i *injectable) Start(<-chan struct{}) error { - return nil -} - type runnableError struct { } diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 8b0f3634e4..314635875e 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -24,7 +24,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters") @@ -242,15 +241,6 @@ type and struct { predicates []Predicate } -func (a and) InjectFunc(f inject.Func) error { - for _, p := range a.predicates { - if err := f(p); err != nil { - return err - } - } - return nil -} - func (a and) Create(e event.CreateEvent) bool { for _, p := range a.predicates { if !p.Create(e) { @@ -296,15 +286,6 @@ type or struct { predicates []Predicate } -func (o or) InjectFunc(f inject.Func) error { - for _, p := range o.predicates { - if err := f(p); err != nil { - return err - } - } - return nil -} - func (o or) Create(e event.CreateEvent) bool { for _, p := range o.predicates { if p.Create(e) { @@ -350,10 +331,6 @@ type not struct { predicate Predicate } -func (n not) InjectFunc(f inject.Func) error { - return f(n.predicate) -} - func (n not) Create(e event.CreateEvent) bool { return !n.predicate.Create(e) } diff --git a/pkg/predicate/predicate_test.go b/pkg/predicate/predicate_test.go index e328f498a1..17aa808769 100644 --- a/pkg/predicate/predicate_test.go +++ b/pkg/predicate/predicate_test.go @@ -848,12 +848,6 @@ var _ = Describe("Predicate", func() { Expect(a.Delete(event.DeleteEvent{})).To(BeTrue()) Expect(a.Generic(event.GenericEvent{})).To(BeTrue()) }) - It("should inject into its predicates", func() { - prct := &injectablePredicate{} - a := predicate.And(prct) - Expect(injectFunc(a)).To(Succeed()) - Expect(prct.injected).To(BeTrue()) - }) }) Describe("When checking an Or predicate", func() { It("should return true when one of its predicates returns true", func() { @@ -870,12 +864,6 @@ var _ = Describe("Predicate", func() { Expect(o.Delete(event.DeleteEvent{})).To(BeFalse()) Expect(o.Generic(event.GenericEvent{})).To(BeFalse()) }) - It("should inject into its predicates", func() { - prct := &injectablePredicate{} - a := predicate.Or(prct) - Expect(injectFunc(a)).To(Succeed()) - Expect(prct.injected).To(BeTrue()) - }) }) Describe("When checking a Not predicate", func() { It("should return false when its predicate returns true", func() { @@ -892,12 +880,6 @@ var _ = Describe("Predicate", func() { Expect(n.Delete(event.DeleteEvent{})).To(BeTrue()) Expect(n.Generic(event.GenericEvent{})).To(BeTrue()) }) - It("should inject into its predicate", func() { - prct := &injectablePredicate{} - n := predicate.Not(prct) - Expect(injectFunc(n)).To(Succeed()) - Expect(prct.injected).To(BeTrue()) - }) }) }) @@ -982,13 +964,3 @@ var _ = Describe("Predicate", func() { }) }) }) - -type injectablePredicate struct { - injected bool - predicate.Funcs -} - -func (i *injectablePredicate) InjectFunc(f inject.Func) error { - i.injected = true - return nil -} diff --git a/pkg/runtime/doc.go b/pkg/runtime/doc.go deleted file mode 100644 index 34101b3fa4..0000000000 --- a/pkg/runtime/doc.go +++ /dev/null @@ -1,21 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package runtime contains not-quite-internal mechanisms for -// controller-runtime, plus some deprecated exports of functionality -// moved elsewhere. Most users should not need to import anything in -// pkg/runtime. -package runtime diff --git a/pkg/runtime/inject/doc.go b/pkg/runtime/inject/doc.go deleted file mode 100644 index 17c60895f0..0000000000 --- a/pkg/runtime/inject/doc.go +++ /dev/null @@ -1,22 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -Package inject defines interfaces and functions for propagating dependencies from a ControllerManager to -the components registered with it. Dependencies are propagated to Reconciler, Source, EventHandler and Predicate -objects which implement the Injectable interfaces. -*/ -package inject diff --git a/pkg/runtime/inject/inject.go b/pkg/runtime/inject/inject.go index c8c56ba817..1721232ede 100644 --- a/pkg/runtime/inject/inject.go +++ b/pkg/runtime/inject/inject.go @@ -15,86 +15,27 @@ limitations under the License. */ // Package inject is used by a Manager to inject types into Sources, EventHandlers, Predicates, and Reconciles. -// Deprecated: Use manager.Options fields directly. This package will be removed in v0.10. +// +// Deprecated: Use manager.Options fields directly. This package will be removed in a future version. package inject import ( "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" - - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" ) -// Cache is used by the ControllerManager to inject Cache into Sources, EventHandlers, Predicates, and -// Reconciles. -type Cache interface { - InjectCache(cache cache.Cache) error -} - -// CacheInto will set informers on i and return the result if it implements Cache. Returns -// false if i does not implement Cache. -func CacheInto(c cache.Cache, i interface{}) (bool, error) { - if s, ok := i.(Cache); ok { - return true, s.InjectCache(c) - } - return false, nil -} - -// APIReader is used by the Manager to inject the APIReader into necessary types. -type APIReader interface { - InjectAPIReader(client.Reader) error -} - -// APIReaderInto will set APIReader on i and return the result if it implements APIReaderInto. -// Returns false if i does not implement APIReader. -func APIReaderInto(reader client.Reader, i interface{}) (bool, error) { - if s, ok := i.(APIReader); ok { - return true, s.InjectAPIReader(reader) - } - return false, nil -} - -// Config is used by the ControllerManager to inject Config into Sources, EventHandlers, Predicates, and -// Reconciles. -type Config interface { - InjectConfig(*rest.Config) error -} - -// ConfigInto will set config on i and return the result if it implements Config. Returns -// false if i does not implement Config. -func ConfigInto(config *rest.Config, i interface{}) (bool, error) { - if s, ok := i.(Config); ok { - return true, s.InjectConfig(config) - } - return false, nil -} - -// Client is used by the ControllerManager to inject client into Sources, EventHandlers, Predicates, and -// Reconciles. -type Client interface { - InjectClient(client.Client) error -} - -// ClientInto will set client on i and return the result if it implements Client. Returns -// false if i does not implement Client. -func ClientInto(client client.Client, i interface{}) (bool, error) { - if s, ok := i.(Client); ok { - return true, s.InjectClient(client) - } - return false, nil -} - // Scheme is used by the ControllerManager to inject Scheme into Sources, EventHandlers, Predicates, and // Reconciles. +// +// Deprecated: Dependency injection methods are deprecated and going to be removed in a future version. type Scheme interface { InjectScheme(scheme *runtime.Scheme) error } // SchemeInto will set scheme and return the result on i if it implements Scheme. Returns // false if i does not implement Scheme. +// +// Deprecated: Dependency injection methods are deprecated and going to be removed in a future version. func SchemeInto(scheme *runtime.Scheme, i interface{}) (bool, error) { if is, ok := i.(Scheme); ok { return true, is.InjectScheme(scheme) @@ -102,45 +43,22 @@ func SchemeInto(scheme *runtime.Scheme, i interface{}) (bool, error) { return false, nil } -// Stoppable is used by the ControllerManager to inject stop channel into Sources, -// EventHandlers, Predicates, and Reconciles. -type Stoppable interface { - InjectStopChannel(<-chan struct{}) error -} - -// StopChannelInto will set stop channel on i and return the result if it implements Stoppable. -// Returns false if i does not implement Stoppable. -func StopChannelInto(stop <-chan struct{}, i interface{}) (bool, error) { - if s, ok := i.(Stoppable); ok { - return true, s.InjectStopChannel(stop) - } - return false, nil -} - -// Mapper is used to inject the rest mapper to components that may need it. -type Mapper interface { - InjectMapper(meta.RESTMapper) error -} - -// MapperInto will set the rest mapper on i and return the result if it implements Mapper. -// Returns false if i does not implement Mapper. -func MapperInto(mapper meta.RESTMapper, i interface{}) (bool, error) { - if m, ok := i.(Mapper); ok { - return true, m.InjectMapper(mapper) - } - return false, nil -} - // Func injects dependencies into i. +// +// Deprecated: Dependency injection methods are deprecated and going to be removed in a future version. type Func func(i interface{}) error // Injector is used by the ControllerManager to inject Func into Controllers. +// +// Deprecated: Dependency injection methods are deprecated and going to be removed in a future version. type Injector interface { InjectFunc(f Func) error } // InjectorInto will set f and return the result on i if it implements Injector. Returns // false if i does not implement Injector. +// +// Deprecated: Dependency injection methods are deprecated and going to be removed in a future version. func InjectorInto(f Func, i interface{}) (bool, error) { if ii, ok := i.(Injector); ok { return true, ii.InjectFunc(f) @@ -150,12 +68,16 @@ func InjectorInto(f Func, i interface{}) (bool, error) { // Logger is used to inject Loggers into components that need them // and don't otherwise have opinions. +// +// Deprecated: Dependency injection methods are deprecated and going to be removed in a future version. type Logger interface { InjectLogger(l logr.Logger) error } // LoggerInto will set the logger on the given object if it implements inject.Logger, // returning true if a InjectLogger was called, and false otherwise. +// +// Deprecated: Dependency injection methods are deprecated and going to be removed in a future version. func LoggerInto(l logr.Logger, i interface{}) (bool, error) { if injectable, wantsLogger := i.(Logger); wantsLogger { return true, injectable.InjectLogger(l) diff --git a/pkg/runtime/inject/inject_test.go b/pkg/runtime/inject/inject_test.go index 7818909221..14d48ae172 100644 --- a/pkg/runtime/inject/inject_test.go +++ b/pkg/runtime/inject/inject_test.go @@ -23,17 +23,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/cache/informertest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" ) var instance *testSource var uninjectable *failSource var errInjectFail = fmt.Errorf("injection fails") -var expectedFalse = false var _ = Describe("runtime inject", func() { @@ -42,138 +36,6 @@ var _ = Describe("runtime inject", func() { uninjectable = &failSource{} }) - It("should set informers", func() { - injectedCache := &informertest.FakeInformers{} - - By("Validating injecting the informer") - res, err := CacheInto(injectedCache, instance) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(true)) - Expect(injectedCache).To(Equal(instance.GetCache())) - - By("Returning false if the type does not implement inject.Cache") - res, err = CacheInto(injectedCache, uninjectable) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(expectedFalse)) - Expect(uninjectable.GetCache()).To(BeNil()) - - By("Returning an error if informer injection fails") - res, err = CacheInto(nil, instance) - Expect(err).To(Equal(errInjectFail)) - Expect(res).To(Equal(true)) - - }) - - It("should set config", func() { - - cfg := &rest.Config{} - - By("Validating injecting config") - res, err := ConfigInto(cfg, instance) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(true)) - Expect(cfg).To(Equal(instance.GetConfig())) - - By("Returning false if the type does not implement inject.Config") - res, err = ConfigInto(cfg, uninjectable) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(false)) - Expect(uninjectable.GetConfig()).To(BeNil()) - - By("Returning an error if config injection fails") - res, err = ConfigInto(nil, instance) - Expect(err).To(Equal(errInjectFail)) - Expect(res).To(Equal(true)) - }) - - It("should set client", func() { - client, err := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewClientBuilder().Build()}) - Expect(err).NotTo(HaveOccurred()) - - By("Validating injecting client") - res, err := ClientInto(client, instance) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(true)) - Expect(client).To(Equal(instance.GetClient())) - - By("Returning false if the type does not implement inject.Client") - res, err = ClientInto(client, uninjectable) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(false)) - Expect(uninjectable.GetClient()).To(BeNil()) - - By("Returning an error if client injection fails") - res, err = ClientInto(nil, instance) - Expect(err).To(Equal(errInjectFail)) - Expect(res).To(Equal(true)) - }) - - It("should set scheme", func() { - - scheme := runtime.NewScheme() - - By("Validating injecting scheme") - res, err := SchemeInto(scheme, instance) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(true)) - Expect(scheme).To(Equal(instance.GetScheme())) - - By("Returning false if the type does not implement inject.Scheme") - res, err = SchemeInto(scheme, uninjectable) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(false)) - Expect(uninjectable.GetScheme()).To(BeNil()) - - By("Returning an error if scheme injection fails") - res, err = SchemeInto(nil, instance) - Expect(err).To(Equal(errInjectFail)) - Expect(res).To(Equal(true)) - }) - - It("should set stop channel", func() { - - stop := make(<-chan struct{}) - - By("Validating injecting stop channel") - res, err := StopChannelInto(stop, instance) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(true)) - Expect(stop).To(Equal(instance.GetStop())) - - By("Returning false if the type does not implement inject.Stoppable") - res, err = StopChannelInto(stop, uninjectable) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(false)) - Expect(uninjectable.GetStop()).To(BeNil()) - - By("Returning an error if stop channel injection fails") - res, err = StopChannelInto(nil, instance) - Expect(err).To(Equal(errInjectFail)) - Expect(res).To(Equal(true)) - }) - - It("should set api reader", func() { - apiReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewClientBuilder().Build()}) - Expect(err).NotTo(HaveOccurred()) - - By("Validating injecting client") - res, err := APIReaderInto(apiReader, instance) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(true)) - Expect(apiReader).To(Equal(instance.GetAPIReader())) - - By("Returning false if the type does not implement inject.Client") - res, err = APIReaderInto(apiReader, uninjectable) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(false)) - Expect(uninjectable.GetAPIReader()).To(BeNil()) - - By("Returning an error if client injection fails") - res, err = APIReaderInto(nil, instance) - Expect(err).To(Equal(errInjectFail)) - Expect(res).To(Equal(true)) - }) - It("should set dependencies", func() { f := func(interface{}) error { return nil } @@ -199,37 +61,8 @@ var _ = Describe("runtime inject", func() { }) type testSource struct { - scheme *runtime.Scheme - cache cache.Cache - config *rest.Config - client client.Client - apiReader client.Reader - f Func - stop <-chan struct{} -} - -func (s *testSource) InjectCache(c cache.Cache) error { - if c != nil { - s.cache = c - return nil - } - return fmt.Errorf("injection fails") -} - -func (s *testSource) InjectConfig(config *rest.Config) error { - if config != nil { - s.config = config - return nil - } - return fmt.Errorf("injection fails") -} - -func (s *testSource) InjectClient(client client.Client) error { - if client != nil { - s.client = client - return nil - } - return fmt.Errorf("injection fails") + scheme *runtime.Scheme + f Func } func (s *testSource) InjectScheme(scheme *runtime.Scheme) error { @@ -240,22 +73,6 @@ func (s *testSource) InjectScheme(scheme *runtime.Scheme) error { return fmt.Errorf("injection fails") } -func (s *testSource) InjectStopChannel(stop <-chan struct{}) error { - if stop != nil { - s.stop = stop - return nil - } - return fmt.Errorf("injection fails") -} - -func (s *testSource) InjectAPIReader(reader client.Reader) error { - if reader != nil { - s.apiReader = reader - return nil - } - return fmt.Errorf("injection fails") -} - func (s *testSource) InjectFunc(f Func) error { if f != nil { s.f = f @@ -264,68 +81,23 @@ func (s *testSource) InjectFunc(f Func) error { return fmt.Errorf("injection fails") } -func (s *testSource) GetCache() cache.Cache { - return s.cache -} - -func (s *testSource) GetConfig() *rest.Config { - return s.config -} - func (s *testSource) GetScheme() *runtime.Scheme { return s.scheme } -func (s *testSource) GetClient() client.Client { - return s.client -} - -func (s *testSource) GetAPIReader() client.Reader { - return s.apiReader -} - func (s *testSource) GetFunc() Func { return s.f } -func (s *testSource) GetStop() <-chan struct{} { - return s.stop -} - type failSource struct { - scheme *runtime.Scheme - cache cache.Cache - config *rest.Config - client client.Client - apiReader client.Reader - f Func - stop <-chan struct{} -} - -func (s *failSource) GetCache() cache.Cache { - return s.cache -} - -func (s *failSource) GetConfig() *rest.Config { - return s.config + scheme *runtime.Scheme + f Func } func (s *failSource) GetScheme() *runtime.Scheme { return s.scheme } -func (s *failSource) GetClient() client.Client { - return s.client -} - -func (s *failSource) GetAPIReader() client.Reader { - return s.apiReader -} - func (s *failSource) GetFunc() Func { return s.f } - -func (s *failSource) GetStop() <-chan struct{} { - return s.stop -} diff --git a/pkg/source/example_test.go b/pkg/source/example_test.go index d306eaf583..77857729de 100644 --- a/pkg/source/example_test.go +++ b/pkg/source/example_test.go @@ -21,15 +21,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/source" ) +var mgr manager.Manager var ctrl controller.Controller // This example Watches for Pod Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request // with the Name and Namespace of the Pod. func ExampleKind() { - err := ctrl.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}) + err := ctrl.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}) if err != nil { // handle it } diff --git a/pkg/source/source.go b/pkg/source/source.go index 6b67563924..e824613074 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -18,28 +18,19 @@ package source import ( "context" - "errors" "fmt" "sync" - "time" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - logf "sigs.k8s.io/controller-runtime/pkg/internal/log" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" - "sigs.k8s.io/controller-runtime/pkg/source/internal" + internal "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/predicate" ) -var log = logf.RuntimeLog.WithName("source") - const ( // defaultBufferSize is the default number of event notifications that can be buffered. defaultBufferSize = 1024 @@ -52,8 +43,7 @@ const ( // // * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls). // -// Users may build their own Source implementations. If their implementations implement any of the inject package -// interfaces, the dependencies will be injected by the Controller when Watch is called. +// Users may build their own Source implementations. type Source interface { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. @@ -67,144 +57,9 @@ type SyncingSource interface { WaitForSync(ctx context.Context) error } -// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used -// and not overwritten. It can be used to watch objects in a different cluster by passing the cache -// from that other cluster. -func NewKindWithCache(object client.Object, cache cache.Cache) SyncingSource { - return &kindWithCache{kind: Kind{Type: object, cache: cache}} -} - -type kindWithCache struct { - kind Kind -} - -func (ks *kindWithCache) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { - return ks.kind.Start(ctx, handler, queue, prct...) -} - -func (ks *kindWithCache) String() string { - return ks.kind.String() -} - -func (ks *kindWithCache) WaitForSync(ctx context.Context) error { - return ks.kind.WaitForSync(ctx) -} - -// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Kind struct { - // Type is the type of object to watch. e.g. &v1.Pod{} - Type client.Object - - // cache used to watch APIs - cache cache.Cache - - // started may contain an error if one was encountered during startup. If its closed and does not - // contain an error, startup and syncing finished. - started chan error - startCancel func() -} - -var _ SyncingSource = &Kind{} - -// Start is internal and should be called only by the Controller to register an EventHandler with the Informer -// to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { - // Type should have been specified by the user. - if ks.Type == nil { - return fmt.Errorf("must specify Kind.Type") - } - - // cache should have been injected before Start was called - if ks.cache == nil { - return fmt.Errorf("must call CacheInto on Kind before calling Start") - } - - // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not - // sync that informer (most commonly due to RBAC issues). - ctx, ks.startCancel = context.WithCancel(ctx) - ks.started = make(chan error) - go func() { - var ( - i cache.Informer - lastErr error - ) - - // Tries to get an informer until it returns true, - // an error or the specified context is cancelled or expired. - if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { - // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, lastErr = ks.cache.GetInformer(ctx, ks.Type) - if lastErr != nil { - kindMatchErr := &meta.NoKindMatchError{} - switch { - case errors.As(lastErr, &kindMatchErr): - log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", - "kind", kindMatchErr.GroupKind) - case runtime.IsNotRegisteredError(lastErr): - log.Error(lastErr, "kind must be registered to the Scheme") - default: - log.Error(lastErr, "failed to get informer from cache") - } - return false, nil // Retry. - } - return true, nil - }); err != nil { - if lastErr != nil { - ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr) - return - } - ks.started <- err - return - } - - _, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) - if err != nil { - ks.started <- err - return - } - if !ks.cache.WaitForCacheSync(ctx) { - // Would be great to return something more informative here - ks.started <- errors.New("cache did not sync") - } - close(ks.started) - }() - - return nil -} - -func (ks *Kind) String() string { - if ks.Type != nil { - return fmt.Sprintf("kind source: %T", ks.Type) - } - return "kind source: unknown type" -} - -// WaitForSync implements SyncingSource to allow controllers to wait with starting -// workers until the cache is synced. -func (ks *Kind) WaitForSync(ctx context.Context) error { - select { - case err := <-ks.started: - return err - case <-ctx.Done(): - ks.startCancel() - if errors.Is(ctx.Err(), context.Canceled) { - return nil - } - return errors.New("timed out waiting for cache to be synced") - } -} - -var _ inject.Cache = &Kind{} - -// InjectCache is internal should be called only by the Controller. InjectCache is used to inject -// the Cache dependency initialized by the ControllerManager. -func (ks *Kind) InjectCache(c cache.Cache) error { - if ks.cache == nil { - ks.cache = c - } - return nil +// Kind creates a KindSource with the given cache provider. +func Kind(cache cache.Cache, object client.Object) SyncingSource { + return &internal.Kind{Type: object, Cache: cache} } var _ Source = &Channel{} @@ -237,18 +92,6 @@ func (cs *Channel) String() string { return fmt.Sprintf("channel source: %p", cs) } -var _ inject.Stoppable = &Channel{} - -// InjectStopChannel is internal should be called only by the Controller. -// It is used to inject the stop channel initialized by the ControllerManager. -func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error { - if cs.stop == nil { - cs.stop = stop - } - - return nil -} - // Start implements Source and should only be called by the Controller. func (cs *Channel) Start( ctx context.Context, @@ -260,10 +103,8 @@ func (cs *Channel) Start( return fmt.Errorf("must specify Channel.Source") } - // stop should have been injected before Start was called - if cs.stop == nil { - return fmt.Errorf("must call InjectStop on Channel before calling Start") - } + // set the stop channel to be the context. + cs.stop = ctx.Done() // use default value if DestBufferSize not specified if cs.DestBufferSize == 0 { diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index c7b3da39e2..2527cf0034 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -23,7 +23,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" . "github.com/onsi/ginkgo/v2" @@ -37,7 +36,7 @@ import ( ) var _ = Describe("Source", func() { - var instance1, instance2 *source.Kind + var instance1, instance2 source.Source var obj client.Object var q workqueue.RateLimitingInterface var c1, c2 chan interface{} @@ -59,11 +58,8 @@ var _ = Describe("Source", func() { }) JustBeforeEach(func() { - instance1 = &source.Kind{Type: obj} - Expect(inject.CacheInto(icache, instance1)).To(BeTrue()) - - instance2 = &source.Kind{Type: obj} - Expect(inject.CacheInto(icache, instance2)).To(BeTrue()) + instance1 = source.Kind(icache, obj) + instance2 = source.Kind(icache, obj) }) AfterEach(func() { diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index c2e6904180..f80a00f02d 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -27,7 +27,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" corev1 "k8s.io/api/core/v1" @@ -65,10 +64,7 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(inject.CacheInto(ic, instance)).To(BeTrue()) + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -105,10 +101,7 @@ var _ = Describe("Source", func() { ic := &informertest.FakeInformers{} q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(instance.InjectCache(ic)).To(Succeed()) + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -153,10 +146,7 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(inject.CacheInto(ic, instance)).To(BeTrue()) + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -188,25 +178,23 @@ var _ = Describe("Source", func() { }) }) - It("should return an error from Start if informers were not injected", func() { - instance := source.Kind{Type: &corev1.Pod{}} + It("should return an error from Start cache was not provided", func() { + instance := source.Kind(nil, &corev1.Pod{}) err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("must call CacheInto on Kind before calling Start")) + Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil cache")) }) It("should return an error from Start if a type was not provided", func() { - instance := source.Kind{} - Expect(instance.InjectCache(&informertest.FakeInformers{})).To(Succeed()) + instance := source.Kind(ic, nil) err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("must specify Kind.Type")) + Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object")) }) It("should return an error if syncing fails", func() { - instance := source.Kind{Type: &corev1.Pod{}} f := false - Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed()) + instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}) Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) @@ -222,28 +210,16 @@ var _ = Describe("Source", func() { ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(instance.InjectCache(ic)).To(Succeed()) + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).NotTo(HaveOccurred()) Eventually(instance.WaitForSync(context.Background())).Should(HaveOccurred()) }) }) - }) - - Describe("KindWithCache", func() { - It("should not allow injecting a cache", func() { - instance := source.NewKindWithCache(nil, nil) - injected, err := inject.CacheInto(&informertest.FakeInformers{}, instance) - Expect(err).To(BeNil()) - Expect(injected).To(BeFalse()) - }) It("should return an error if syncing fails", func() { f := false - instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}) + instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}) Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) @@ -313,7 +289,6 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -353,7 +328,6 @@ var _ = Describe("Source", func() { // Add a handler to get distribution blocked instance := &source.Channel{Source: ch} instance.DestBufferSize = 1 - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -410,7 +384,6 @@ var _ = Describe("Source", func() { // Add a handler to get distribution blocked instance := &source.Channel{Source: ch} instance.DestBufferSize = 1 - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { @@ -449,7 +422,6 @@ var _ = Describe("Source", func() { By("feeding that channel to a channel source") src := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(ctx.Done(), src)).To(BeTrue()) processed := make(chan struct{}) defer close(processed) @@ -482,16 +454,9 @@ var _ = Describe("Source", func() { It("should get error if no source specified", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{ /*no source specified*/ } - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source"))) }) - It("should get error if no stop channel injected", func() { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{Source: ch} - err := instance.Start(ctx, handler.Funcs{}, q) - Expect(err).To(Equal(fmt.Errorf("must call InjectStop on Channel before calling Start"))) - }) }) Context("for multi sources (handlers)", func() { It("should provide GenericEvents for all handlers", func() { @@ -509,7 +474,6 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover()