Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 update internal controller for more clean code and metrics #448

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,7 @@ func (c *Controller) Start(stop <-chan struct{}) error {
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// Process work items
go wait.Until(func() {
for c.processNextWorkItem() {
}
}, c.JitterPeriod, stop)
go wait.Until(c.worker, c.JitterPeriod, stop)
}

c.Started = true
Expand All @@ -168,17 +165,16 @@ func (c *Controller) Start(stop <-chan struct{}) error {
return nil
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem() bool {
// This code copy-pasted from the sample-Controller.

// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
c.updateMetrics(time.Now().Sub(reconcileStartTS))
}()

obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
Expand All @@ -192,6 +188,17 @@ func (c *Controller) processNextWorkItem() bool {
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)

return c.reconcileHandler(obj)
}

func (c *Controller) reconcileHandler(obj interface{}) bool {
// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
c.updateMetrics(time.Now().Sub(reconcileStartTS))
}()

var req reconcile.Request
var ok bool
if req, ok = obj.(reconcile.Request); !ok {
Expand All @@ -204,7 +211,6 @@ func (c *Controller) processNextWorkItem() bool {
// Return true, don't take a break
return true
}

// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
if result, err := c.Do.Reconcile(req); err != nil {
Expand Down