From 09954f431a630f1f5e2ee3fef815ff90a65fd57c Mon Sep 17 00:00:00 2001 From: adohe Date: Fri, 24 May 2019 15:32:00 +0800 Subject: [PATCH] update internal controller for more clean code and metrics --- pkg/internal/controller/controller.go | 38 +++++++++++++-------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 5e07d298bd..96f5df6f09 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -154,10 +154,7 @@ func (c *Controller) Start(stop <-chan struct{}) error { log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { // Process work items - go wait.Until(func() { - for c.processNextWorkItem() { - } - }, c.JitterPeriod, stop) + go wait.Until(c.worker, c.JitterPeriod, stop) } c.Started = true @@ -168,30 +165,32 @@ func (c *Controller) Start(stop <-chan struct{}) error { return nil } +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the reconcileHandler is never invoked concurrently with the same object. +func (c *Controller) worker() { + for c.processNextWorkItem() { + } +} + // processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. +// attempt to process it, by calling the reconcileHandler. func (c *Controller) processNextWorkItem() bool { - // This code copy-pasted from the sample-Controller. + obj, shutdown := c.Queue.Get() + if shutdown { + return false + } + defer c.Queue.Done(obj) + return c.reconcileHandler(obj) +} + +func (c *Controller) reconcileHandler(obj interface{}) bool { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { c.updateMetrics(time.Now().Sub(reconcileStartTS)) }() - obj, shutdown := c.Queue.Get() - if shutdown { - // Stop working - return false - } - - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.Queue.Done(obj) var req reconcile.Request var ok bool if req, ok = obj.(reconcile.Request); !ok { @@ -204,7 +203,6 @@ func (c *Controller) processNextWorkItem() bool { // Return true, don't take a break return true } - // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the // resource to be synced. if result, err := c.Do.Reconcile(req); err != nil {