Skip to content

Commit

Permalink
update internal controller for more clean code and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
adohe committed May 24, 2019
1 parent 38483b2 commit 09954f4
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// Process work items
go wait.Until(func() {
for c.processNextWorkItem() {
}
}, c.JitterPeriod, stop)
go wait.Until(c.worker, c.JitterPeriod, stop)
}

c.Started = true
Expand All @@ -168,30 +165,32 @@ func (c *Controller) Start(stop <-chan struct{}) error {
return nil
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem() bool {
// This code copy-pasted from the sample-Controller.
obj, shutdown := c.Queue.Get()
if shutdown {
return false
}
defer c.Queue.Done(obj)

return c.reconcileHandler(obj)
}

func (c *Controller) reconcileHandler(obj interface{}) bool {
// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
c.updateMetrics(time.Now().Sub(reconcileStartTS))
}()

obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}

// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
var req reconcile.Request
var ok bool
if req, ok = obj.(reconcile.Request); !ok {
Expand All @@ -204,7 +203,6 @@ func (c *Controller) processNextWorkItem() bool {
// Return true, don't take a break
return true
}

// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
if result, err := c.Do.Reconcile(req); err != nil {
Expand Down

0 comments on commit 09954f4

Please sign in to comment.