From 0fdf465bc21be27b20c5b480a1aced84a3347d43 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Thu, 17 Oct 2019 18:21:08 -0700 Subject: [PATCH] Don't leak goroutines in controller.New This quashes a goroutine leak caused by calling controller.New repeatedly without calling Start. controller.New was creating a new workqueue, which was starting goroutines and then expecting to be shut down (by the shutdown method, which is only called at the end of Start). To solve that, we move workqueue initialization to controller.Start. This means that we also move watch starting to controller.Start, but this seems pretty sensible anyway. --- pkg/builder/controller_test.go | 16 +-- pkg/controller/controller.go | 16 ++- pkg/internal/controller/controller.go | 100 ++++++++++---- pkg/internal/controller/controller_test.go | 151 +++++++++++---------- 4 files changed, 168 insertions(+), 115 deletions(-) diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index f534f1bd51..cfe3e98236 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -66,26 +66,22 @@ var _ = Describe("application", func() { Expect(instance).NotTo(BeNil()) }) - It("should return an error if there is no GVK for an object", func() { + It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() { By("creating a controller manager") m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) + By("creating a controller with a bad For type") instance, err := ControllerManagedBy(m). For(&fakeType{}). Owns(&appsv1.ReplicaSet{}). Build(noop) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType")) + Expect(err).To(MatchError(ContainSubstring("no kind is registered for the type builder.fakeType"))) Expect(instance).To(BeNil()) - instance, err = ControllerManagedBy(m). - For(&appsv1.ReplicaSet{}). - Owns(&fakeType{}). - Build(noop) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType")) - Expect(instance).To(BeNil()) + // NB(directxman12): we don't test non-for types, since errors for + // them now manifest on controller.Start, not controller.Watch. Errors on the For type + // manifest when we try to default the controller name, which is good to double check. }) It("should return an error if it cannot create the controller", func() { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3411e95f75..a4e2a982d9 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -80,13 +80,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) // Create controller with dependencies set c := &controller.Controller{ - Do: options.Reconciler, - Cache: mgr.GetCache(), - Config: mgr.GetConfig(), - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Recorder: mgr.GetEventRecorderFor(name), - Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + Do: options.Reconciler, + Cache: mgr.GetCache(), + Config: mgr.GetConfig(), + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor(name), + MakeQueue: func() workqueue.RateLimitingInterface { + return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name) + }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, Name: name, } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 7f40a32a52..0938fd4ec7 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -68,6 +68,11 @@ type Controller struct { // specified, or the ~/.kube/Config. Config *rest.Config + // MakeQueue constructs the queue for this controller once the controller is ready to start. + // This exists because the standard Kubernetes workqueues start themselves immediately, which + // leads to goroutine leaks if something calls controller.New repeatedly. + MakeQueue func() workqueue.RateLimitingInterface + // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing Queue workqueue.RateLimitingInterface @@ -93,6 +98,16 @@ type Controller struct { Recorder record.EventRecorder // TODO(community): Consider initializing a logger with the Controller Name as the tag + + // watches maintains a list of sources, handlers, and predicates to start when the controller is started. + watches []watchDescription +} + +// watchDescription contains all the information necessary to start a watch. +type watchDescription struct { + src source.Source + handler handler.EventHandler + predicates []predicate.Predicate } // Reconcile implements reconcile.Reconciler @@ -118,47 +133,72 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } } - log.Info("Starting EventSource", "controller", c.Name, "source", src) - return src.Start(evthdler, c.Queue, prct...) + c.watches = append(c.watches, watchDescription{src: src, handler: evthdler, predicates: prct}) + if c.Started { + log.Info("Starting EventSource", "controller", c.Name, "source", src) + return src.Start(evthdler, c.Queue, prct...) + } + + return nil } // Start implements controller.Controller func (c *Controller) Start(stop <-chan struct{}) error { + // use an IIFE to get proper lock handling + // but lock outside to get proper handling of the queue shutdown c.mu.Lock() - // TODO(pwittrock): Reconsider HandleCrash - defer utilruntime.HandleCrash() - defer c.Queue.ShutDown() + c.Queue = c.MakeQueue() + defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed - // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches - log.Info("Starting Controller", "controller", c.Name) + err := func() error { + defer c.mu.Unlock() - // Wait for the caches to be synced before starting workers - if c.WaitForCacheSync == nil { - c.WaitForCacheSync = c.Cache.WaitForCacheSync - } - if ok := c.WaitForCacheSync(stop); !ok { - // This code is unreachable right now since WaitForCacheSync will never return an error - // Leaving it here because that could happen in the future - err := fmt.Errorf("failed to wait for %s caches to sync", c.Name) - log.Error(err, "Could not wait for Cache to sync", "controller", c.Name) - c.mu.Unlock() - return err - } + // TODO(pwittrock): Reconsider HandleCrash + defer utilruntime.HandleCrash() - if c.JitterPeriod == 0 { - c.JitterPeriod = 1 * time.Second - } + // NB(directxman12): launch the sources *before* trying to wait for the + // caches to sync so that they have a chance to register their intendeded + // caches. + for _, watch := range c.watches { + log.Info("Starting EventSource", "controller", c.Name, "source", watch.src) + if err := watch.src.Start(watch.handler, c.Queue, watch.predicates...); err != nil { + return err + } + } - // Launch workers to process resources - log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles) - for i := 0; i < c.MaxConcurrentReconciles; i++ { - // Process work items - go wait.Until(c.worker, c.JitterPeriod, stop) - } + // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches + log.Info("Starting Controller", "controller", c.Name) + + // Wait for the caches to be synced before starting workers + if c.WaitForCacheSync == nil { + c.WaitForCacheSync = c.Cache.WaitForCacheSync + } + if ok := c.WaitForCacheSync(stop); !ok { + // This code is unreachable right now since WaitForCacheSync will never return an error + // Leaving it here because that could happen in the future + err := fmt.Errorf("failed to wait for %s caches to sync", c.Name) + log.Error(err, "Could not wait for Cache to sync", "controller", c.Name) + return err + } + + if c.JitterPeriod == 0 { + c.JitterPeriod = 1 * time.Second + } + + // Launch workers to process resources + log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles) + for i := 0; i < c.MaxConcurrentReconciles; i++ { + // Process work items + go wait.Until(c.worker, c.JitterPeriod, stop) + } - c.Started = true - c.mu.Unlock() + c.Started = true + return nil + }() + if err != nil { + return err + } <-stop log.Info("Stopping workers", "controller", c.Name) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 4e7f8cc81c..aa27e10bc5 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -64,7 +64,7 @@ var _ = Describe("controller", func() { ctrl = &Controller{ MaxConcurrentReconciles: 1, Do: fakeReconcile, - Queue: queue, + MakeQueue: func() workqueue.RateLimitingInterface { return queue }, Cache: informers, } Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed()) @@ -117,6 +117,45 @@ var _ = Describe("controller", func() { close(done) }) + + It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { + pr1 := &predicate.Funcs{} + pr2 := &predicate.Funcs{} + evthdl := &handler.EnqueueRequestForObject{} + started := false + src := source.Func(func(e handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error { + defer GinkgoRecover() + Expect(e).To(Equal(evthdl)) + Expect(q).To(Equal(ctrl.Queue)) + Expect(p).To(ConsistOf(pr1, pr2)) + + started = true + return nil + }) + Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred()) + + // Use a stopped channel so Start doesn't block + stopped := make(chan struct{}) + close(stopped) + Expect(ctrl.Start(stopped)).To(Succeed()) + Expect(started).To(BeTrue()) + }) + + It("should return an error if there is an error starting sources", func() { + err := fmt.Errorf("Expected Error: could not start source") + src := source.Func(func(handler.EventHandler, + workqueue.RateLimitingInterface, + ...predicate.Predicate) error { + defer GinkgoRecover() + return err + }) + Expect(ctrl.Watch(src, &handler.EnqueueRequestForObject{})).To(Succeed()) + + // Use a stopped channel so Start doesn't block + stopped := make(chan struct{}) + close(stopped) + Expect(ctrl.Start(stopped)).To(Equal(err)) + }) }) Describe("Watch", func() { @@ -237,32 +276,6 @@ var _ = Describe("controller", func() { } Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected)) }) - - It("should call Start the Source with the EventHandler, Queue, and Predicates", func() { - pr1 := &predicate.Funcs{} - pr2 := &predicate.Funcs{} - evthdl := &handler.EnqueueRequestForObject{} - src := source.Func(func(e handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error { - defer GinkgoRecover() - Expect(e).To(Equal(evthdl)) - Expect(q).To(Equal(ctrl.Queue)) - Expect(p).To(ConsistOf(pr1, pr2)) - return nil - }) - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred()) - - }) - - It("should return an error if there is an error starting the Source", func() { - err := fmt.Errorf("Expected Error: could not start source") - src := source.Func(func(handler.EventHandler, - workqueue.RateLimitingInterface, - ...predicate.Predicate) error { - defer GinkgoRecover() - return err - }) - Expect(ctrl.Watch(src, &handler.EnqueueRequestForObject{})).To(Equal(err)) - }) }) Describe("Processing queue items from a Controller", func() { @@ -271,15 +284,15 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) By("Invoking Reconciler") fakeReconcile.AddResult(reconcile.Result{}, nil) Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }) @@ -294,13 +307,14 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add("foo/bar") - By("checking that process next work item indicates that we should continue processing") - Expect(ctrl.processNextWorkItem()).To(BeTrue()) + By("adding two bad items to the queue") + queue.Add("foo/bar1") + queue.Add("foo/bar2") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + By("expecting both of them to be skipped") + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }) @@ -318,7 +332,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) By("Invoking Reconciler which will give an error") fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile")) @@ -329,8 +343,8 @@ var _ = Describe("controller", func() { Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }, 1.0) @@ -338,14 +352,14 @@ var _ = Describe("controller", func() { // TODO(directxman12): we should ensure that backoff occurrs with error requeue It("should not reset backoff until there's a non-error result", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which returns an error") @@ -366,19 +380,19 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 2})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which will ask for requeue") @@ -393,19 +407,19 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 1})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which will ask for requeue & requeueafter") @@ -420,20 +434,20 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{Trying: -1 /* we don't increment the count in addafter */, AddAfter: 2})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue} - ctrl.Queue = dq + dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} + ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } ctrl.JitterPeriod = time.Millisecond go func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + dq.Add(request) Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1})) By("Invoking Reconciler which will ask for requeueafter with an error") @@ -447,8 +461,8 @@ var _ = Describe("controller", func() { Eventually(dq.getCounts).Should(Equal(countInfo{AddAfter: 1, AddRateLimited: 1})) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(dq.Len).Should(Equal(0)) + Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) PIt("should return if the queue is shutdown", func() { @@ -485,7 +499,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will succeed") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{}, nil) Expect(<-reconciled).To(Equal(request)) @@ -514,7 +528,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will give an error") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile")) Expect(<-reconciled).To(Equal(request)) @@ -542,8 +556,9 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() + By("Invoking Reconciler which will return result with Requeue enabled") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil) Expect(<-reconciled).To(Equal(request)) @@ -572,7 +587,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will return result with requeueAfter enabled") - ctrl.Queue.Add(request) + queue.Add(request) fakeReconcile.AddResult(reconcile.Result{RequeueAfter: 5 * time.Hour}, nil) Expect(<-reconciled).To(Equal(request)) @@ -604,7 +619,7 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) // Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun. ctrl.JitterPeriod = time.Millisecond @@ -625,8 +640,8 @@ var _ = Describe("controller", func() { Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) close(done) }, 2.0) @@ -649,15 +664,15 @@ var _ = Describe("controller", func() { defer GinkgoRecover() Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) }() - ctrl.Queue.Add(request) + queue.Add(request) By("Invoking Reconciler") fakeReconcile.AddResult(reconcile.Result{}, nil) Expect(<-reconciled).To(Equal(request)) By("Removing the item from the queue") - Eventually(ctrl.Queue.Len).Should(Equal(0)) - Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0)) + Eventually(queue.Len).Should(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) Eventually(func() error { histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)