Skip to content

Commit

Permalink
Don't leak goroutines in controller.New
Browse files Browse the repository at this point in the history
This quashes a goroutine leak caused by calling controller.New repeatedly without
calling Start.  controller.New was creating a new workqueue, which was
starting goroutines and then expecting to be shut down (by the shutdown
method, which is only called at the end of Start).

To solve that, we move workqueue initialization to controller.Start.
This means that we also move watch starting to controller.Start, but
this seems pretty sensible anyway.
  • Loading branch information
DirectXMan12 committed Oct 19, 2019
1 parent ecdbe54 commit 0fdf465
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 115 deletions.
16 changes: 6 additions & 10 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,22 @@ var _ = Describe("application", func() {
Expect(instance).NotTo(BeNil())
})

It("should return an error if there is no GVK for an object", func() {
It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() {
By("creating a controller manager")
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("creating a controller with a bad For type")
instance, err := ControllerManagedBy(m).
For(&fakeType{}).
Owns(&appsv1.ReplicaSet{}).
Build(noop)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType"))
Expect(err).To(MatchError(ContainSubstring("no kind is registered for the type builder.fakeType")))
Expect(instance).To(BeNil())

instance, err = ControllerManagedBy(m).
For(&appsv1.ReplicaSet{}).
Owns(&fakeType{}).
Build(noop)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("no kind is registered for the type builder.fakeType"))
Expect(instance).To(BeNil())
// NB(directxman12): we don't test non-for types, since errors for
// them now manifest on controller.Start, not controller.Watch. Errors on the For type
// manifest when we try to default the controller name, which is good to double check.
})

It("should return an error if it cannot create the controller", func() {
Expand Down
16 changes: 9 additions & 7 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)

// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(name),
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
}
Expand Down
100 changes: 70 additions & 30 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type Controller struct {
// specified, or the ~/.kube/Config.
Config *rest.Config

// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface

// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
Expand All @@ -93,6 +98,16 @@ type Controller struct {
Recorder record.EventRecorder

// TODO(community): Consider initializing a logger with the Controller Name as the tag

// watches maintains a list of sources, handlers, and predicates to start when the controller is started.
watches []watchDescription
}

// watchDescription contains all the information necessary to start a watch.
type watchDescription struct {
src source.Source
handler handler.EventHandler
predicates []predicate.Predicate
}

// Reconcile implements reconcile.Reconciler
Expand All @@ -118,47 +133,72 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
}
}

log.Info("Starting EventSource", "controller", c.Name, "source", src)
return src.Start(evthdler, c.Queue, prct...)
c.watches = append(c.watches, watchDescription{src: src, handler: evthdler, predicates: prct})
if c.Started {
log.Info("Starting EventSource", "controller", c.Name, "source", src)
return src.Start(evthdler, c.Queue, prct...)
}

return nil
}

// Start implements controller.Controller
func (c *Controller) Start(stop <-chan struct{}) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()

// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()
defer c.Queue.ShutDown()
c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed

// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
log.Info("Starting Controller", "controller", c.Name)
err := func() error {
defer c.mu.Unlock()

// Wait for the caches to be synced before starting workers
if c.WaitForCacheSync == nil {
c.WaitForCacheSync = c.Cache.WaitForCacheSync
}
if ok := c.WaitForCacheSync(stop); !ok {
// This code is unreachable right now since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
c.mu.Unlock()
return err
}
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()

if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.watches {
log.Info("Starting EventSource", "controller", c.Name, "source", watch.src)
if err := watch.src.Start(watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}

// Launch workers to process resources
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// Process work items
go wait.Until(c.worker, c.JitterPeriod, stop)
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
log.Info("Starting Controller", "controller", c.Name)

// Wait for the caches to be synced before starting workers
if c.WaitForCacheSync == nil {
c.WaitForCacheSync = c.Cache.WaitForCacheSync
}
if ok := c.WaitForCacheSync(stop); !ok {
// This code is unreachable right now since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
return err
}

if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}

// Launch workers to process resources
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// Process work items
go wait.Until(c.worker, c.JitterPeriod, stop)
}

c.Started = true
c.mu.Unlock()
c.Started = true
return nil
}()
if err != nil {
return err
}

<-stop
log.Info("Stopping workers", "controller", c.Name)
Expand Down
Loading

0 comments on commit 0fdf465

Please sign in to comment.