diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 5e07d298bd..7f40a32a52 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -154,10 +154,7 @@ func (c *Controller) Start(stop <-chan struct{}) error { log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { // Process work items - go wait.Until(func() { - for c.processNextWorkItem() { - } - }, c.JitterPeriod, stop) + go wait.Until(c.worker, c.JitterPeriod, stop) } c.Started = true @@ -168,17 +165,16 @@ func (c *Controller) Start(stop <-chan struct{}) error { return nil } +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the reconcileHandler is never invoked concurrently with the same object. +func (c *Controller) worker() { + for c.processNextWorkItem() { + } +} + // processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. +// attempt to process it, by calling the reconcileHandler. func (c *Controller) processNextWorkItem() bool { - // This code copy-pasted from the sample-Controller. - - // Update metrics after processing each item - reconcileStartTS := time.Now() - defer func() { - c.updateMetrics(time.Now().Sub(reconcileStartTS)) - }() - obj, shutdown := c.Queue.Get() if shutdown { // Stop working @@ -192,6 +188,17 @@ func (c *Controller) processNextWorkItem() bool { // put back on the workqueue and attempted again after a back-off // period. defer c.Queue.Done(obj) + + return c.reconcileHandler(obj) +} + +func (c *Controller) reconcileHandler(obj interface{}) bool { + // Update metrics after processing each item + reconcileStartTS := time.Now() + defer func() { + c.updateMetrics(time.Now().Sub(reconcileStartTS)) + }() + var req reconcile.Request var ok bool if req, ok = obj.(reconcile.Request); !ok { @@ -204,7 +211,6 @@ func (c *Controller) processNextWorkItem() bool { // Return true, don't take a break return true } - // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the // resource to be synced. if result, err := c.Do.Reconcile(req); err != nil {