diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index f534f1bd51..cfe3e98236 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -66,26 +66,22 @@ var _ = Describe("application", func() { Expect(instance).NotTo(BeNil()) }) - It("should return an error if there is no GVK for an object", func() { + It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() { By("creating a controller manager") m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) + By("creating a controller with a bad For type") instance, err := ControllerManagedBy(m). For(&fakeType{}). Owns(&appsv1.ReplicaSet{}). Build(noop) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType")) + Expect(err).To(MatchError(ContainSubstring("no kind is registered for the type builder.fakeType"))) Expect(instance).To(BeNil()) - instance, err = ControllerManagedBy(m). - For(&appsv1.ReplicaSet{}). - Owns(&fakeType{}). - Build(noop) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType")) - Expect(instance).To(BeNil()) + // NB(directxman12): we don't test non-for types, since errors for + // them now manifest on controller.Start, not controller.Watch. Errors on the For type + // manifest when we try to default the controller name, which is good to double check. }) It("should return an error if it cannot create the controller", func() { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3411e95f75..a4e2a982d9 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -80,13 +80,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) // Create controller with dependencies set c := &controller.Controller{ - Do: options.Reconciler, - Cache: mgr.GetCache(), - Config: mgr.GetConfig(), - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Recorder: mgr.GetEventRecorderFor(name), - Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + Do: options.Reconciler, + Cache: mgr.GetCache(), + Config: mgr.GetConfig(), + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor(name), + MakeQueue: func() workqueue.RateLimitingInterface { + return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name) + }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, Name: name, } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 7f40a32a52..0938fd4ec7 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -68,6 +68,11 @@ type Controller struct { // specified, or the ~/.kube/Config. Config *rest.Config + // MakeQueue constructs the queue for this controller once the controller is ready to start. + // This exists because the standard Kubernetes workqueues start themselves immediately, which + // leads to goroutine leaks if something calls controller.New repeatedly. + MakeQueue func() workqueue.RateLimitingInterface + // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing Queue workqueue.RateLimitingInterface @@ -93,6 +98,16 @@ type Controller struct { Recorder record.EventRecorder // TODO(community): Consider initializing a logger with the Controller Name as the tag + + // watches maintains a list of sources, handlers, and predicates to start when the controller is started. + watches []watchDescription +} + +// watchDescription contains all the information necessary to start a watch. +type watchDescription struct { + src source.Source + handler handler.EventHandler + predicates []predicate.Predicate } // Reconcile implements reconcile.Reconciler @@ -118,47 +133,72 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } } - log.Info("Starting EventSource", "controller", c.Name, "source", src) - return src.Start(evthdler, c.Queue, prct...) + c.watches = append(c.watches, watchDescription{src: src, handler: evthdler, predicates: prct}) + if c.Started { + log.Info("Starting EventSource", "controller", c.Name, "source", src) + return src.Start(evthdler, c.Queue, prct...) + } + + return nil } // Start implements controller.Controller func (c *Controller) Start(stop <-chan struct{}) error { + // use an IIFE to get proper lock handling + // but lock outside to get proper handling of the queue shutdown c.mu.Lock() - // TODO(pwittrock): Reconsider HandleCrash - defer utilruntime.HandleCrash() - defer c.Queue.ShutDown() + c.Queue = c.MakeQueue() + defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed - // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches - log.Info("Starting Controller", "controller", c.Name) + err := func() error { + defer c.mu.Unlock() - // Wait for the caches to be synced before starting workers - if c.WaitForCacheSync == nil { - c.WaitForCacheSync = c.Cache.WaitForCacheSync - } - if ok := c.WaitForCacheSync(stop); !ok { - // This code is unreachable right now since WaitForCacheSync will never return an error - // Leaving it here because that could happen in the future - err := fmt.Errorf("failed to wait for %s caches to sync", c.Name) - log.Error(err, "Could not wait for Cache to sync", "controller", c.Name) - c.mu.Unlock() - return err - } + // TODO(pwittrock): Reconsider HandleCrash + defer utilruntime.HandleCrash() - if c.JitterPeriod == 0 { - c.JitterPeriod = 1 * time.Second - } + // NB(directxman12): launch the sources *before* trying to wait for the + // caches to sync so that they have a chance to register their intendeded + // caches. + for _, watch := range c.watches { + log.Info("Starting EventSource", "controller", c.Name, "source", watch.src) + if err := watch.src.Start(watch.handler, c.Queue, watch.predicates...); err != nil { + return err + } + } - // Launch workers to process resources - log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles) - for i := 0; i < c.MaxConcurrentReconciles; i++ { - // Process work items - go wait.Until(c.worker, c.JitterPeriod, stop) - } + // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches + log.Info("Starting Controller", "controller", c.Name) + + // Wait for the caches to be synced before starting workers + if c.WaitForCacheSync == nil { + c.WaitForCacheSync = c.Cache.WaitForCacheSync + } + if ok := c.WaitForCacheSync(stop); !ok { + // This code is unreachable right now since WaitForCacheSync will never return an error + // Leaving it here because that could happen in the future + err := fmt.Errorf("failed to wait for %s caches to sync", c.Name) + log.Error(err, "Could not wait for Cache to sync", "controller", c.Name) + return err + } + + if c.JitterPeriod == 0 { + c.JitterPeriod = 1 * time.Second + } + + // Launch workers to process resources + log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles) + for i := 0; i < c.MaxConcurrentReconciles; i++ { + // Process work items + go wait.Until(c.worker, c.JitterPeriod, stop) + } - c.Started = true - c.mu.Unlock() + c.Started = true + return nil + }() + if err != nil { + return err + } <-stop log.Info("Stopping workers", "controller", c.Name) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 4e7f8cc81c..aa27e10bc5 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -64,7 +64,7 @@ var _ = Describe("controller", func() { ctrl = &Controller{ MaxConcurrentReconciles: 1, Do: fakeReconcile, - Queue: queue, + MakeQueue: func() workqueue.RateLimitingInterface { return queue }, Cache: informers, } Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed()) @@ -117,6 +117,45 @@ var _ = Describe("controller", func() { close(done) }) + + It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { + pr1 := &predicate.Funcs{} + pr2 := &predicate.Funcs{} + evthdl := &handler.EnqueueRequestForObject{} + started := false + src := source.Func(func(e handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error { + defer GinkgoRecover() + Expect(e).To(Equal(evthdl)) + Expect(q).To(Equal(ctrl.Queue)) + Expect(p).To(ConsistOf(pr1, pr2)) + + started = true + return nil + }) + Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred()) + + // Use a stopped channel so Start doesn't block + stopped := make(chan struct{}) + close(stopped) + Expect(ctrl.Start(stopped)).To(Succeed()) + Expect(started).To(BeTrue()) + }) + + It("should return an error if there is an error starting sources", func() { + err := fmt.Errorf("Expected Error: could not start source") + src := source.Func(func(handler.EventHandler, + workqueue.RateLimitingInterface, + ...predicate.Predicate) error { + defer GinkgoRecover() + return err + }) + Expect(ctrl.Watch(src, &handler.EnqueueRequestForObject{})).To(Succeed()) + + // Use a stopped channel so Start doesn't block + stopped := make(chan struct{}) + close(stopped) + Expect(ctrl.Start(stopped)).To(Equal(err)) + }) }) Describe("Watch", func() { @@ -237,32 +276,6 @@ var _ = Describe("controller", func() { } Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected)) }) - - It("should call Start the Source with the EventHandler, Queue, and Predicates", func() { - pr1 := &predicate.Funcs{} - pr2 := &predicate.Funcs{} - evthdl := &handler.EnqueueRequestForObject{} - src := source.Func(func(e handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error { - defer GinkgoRecover() - Expect(e).To(Equal(evthdl)) - Expect(q).To(Equal(ctrl.Queue)) - Expect(p).To(ConsistOf(pr1, pr2)) - return nil - }) - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred()) - - }) - - It("should return an error if there is an error starting the Source", func() { - err := fmt.Errorf("Expected Error: could not start source") - src := source.Func(func(handler.EventHandler, - workqueue.RateLimitingInterface, - ...predicate.Predicate) error { - defer GinkgoRecover() - return err - }) - Expect(ctrl.Watch(src, &handler.EnqueueRequestForObject{})).To(Equal(err)) - }) }) Describe("Processing queue items from a Controller", func() { @@ -271,15 +284,15 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) By("Invoking Reconciler") fakeReconcile.AddResult(reconcile.Result{}, nil) Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }) @@ -294,13 +307,14 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add("foo/bar") - By("checking that process next work item indicates that we should continue processing") - Expect(ctrl.processNextWorkItem()).To(BeTrue()) + By("adding two bad items to the queue") + queue.Add("foo/bar1") + queue.Add("foo/bar2") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + By("expecting both of them to be skipped") + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }) @@ -318,7 +332,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) By("Invoking Reconciler which will give an error") fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile")) @@ -329,8 +343,8 @@ var _ = Describe("controller", func() { Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }, 1.0) @@ -338,14 +352,14 @@ var _ = Describe("controller", func() { // TODO(directxman12): we should ensure that backoff occurrs with error requeue It("should not reset backoff until there's a non-error result", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which returns an error") @@ -366,19 +380,19 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 2})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which will ask for requeue") @@ -393,19 +407,19 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 1})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which will ask for requeue & requeueafter") @@ -420,20 +434,20 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{Trying: -1 /* we don't increment the count in addafter */, AddAfter: 2})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } ctrl.JitterPeriod = time.Millisecond go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which will ask for requeueafter with an error") @@ -447,8 +461,8 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{AddAfter: 1, AddRateLimited: 1})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) PIt("should return if the queue is shutdown", func() { @@ -485,7 +499,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will succeed") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{}, nil) Expect(<-reconciled).To(Equal(request)) @@ -514,7 +528,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will give an error") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile")) Expect(<-reconciled).To(Equal(request)) @@ -542,8 +556,9 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() + By("Invoking Reconciler which will return result with Requeue enabled") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil) Expect(<-reconciled).To(Equal(request)) @@ -572,7 +587,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will return result with requeueAfter enabled") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{RequeueAfter: 5 * time.Hour}, nil) Expect(<-reconciled).To(Equal(request)) @@ -604,7 +619,7 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) // Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun. ctrl.JitterPeriod = time.Millisecond @@ -625,8 +640,8 @@ var _ = Describe("controller", func() { Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }, 2.0) @@ -649,15 +664,15 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) By("Invoking Reconciler") fakeReconcile.AddResult(reconcile.Result{}, nil) Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) Eventually(func() error { histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)