Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Controller: Wait for all reconciliations before shutting down #1427

Merged
merged 1 commit into from
Mar 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@ package controller_test
import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var _ = Describe("controller.Controller", func() {
Expand Down Expand Up @@ -88,15 +93,42 @@ var _ = Describe("controller.Controller", func() {
It("should not leak goroutines when stopped", func() {
currentGRs := goleak.IgnoreCurrent()

watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
controllerFinished := make(chan struct{})
rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
defer GinkgoRecover()
close(reconcileStarted)
// Make sure reconciliation takes a moment and is not quicker than the controllers
// shutdown.
time.Sleep(50 * time.Millisecond)
// Explicitly test this on top of the leakdetection, as the latter uses Eventually
// so might succeed even when the controller does not wait for all reconciliations
// to finish.
Expect(controllerFinished).NotTo(BeClosed())
return reconcile.Result{}, nil
})

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).To(Succeed())
close(controllerFinished)
}()

<-reconcileStarted
cancel()
Expect(m.Start(ctx)).To(Succeed())
<-controllerFinished

// force-close keep-alive connections. These'll time anyway (after
// like 30s or so) but force it to speed up the tests.
Expand Down
24 changes: 12 additions & 12 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/go-logr/logr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -67,9 +66,6 @@ type Controller struct {
// mu is used to synchronize Controller setup
mu sync.Mutex

// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
JitterPeriod time.Duration

// Started is true if the Controller has been Started
Started bool

Expand Down Expand Up @@ -150,8 +146,12 @@ func (c *Controller) Start(ctx context.Context) error {
c.ctx = ctx

c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}()

wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()

Expand Down Expand Up @@ -203,19 +203,17 @@ func (c *Controller) Start(ctx context.Context) error {
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil

if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}

// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
Copy link
Member Author

@alvaroaleman alvaroaleman Mar 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see, the wait.UntilWithContext was harmless, but not useful because it would start the closure every c.JitterPeriod for as long as the ctx is not cancelled. The closure will however stay in the for c.processNextWorkItem(ctx) loop until the workqueue is shutdown and the workqueue is only shutdown when the ctx is cancelled.

I think we still have this because before b4a0212 it was possible for the closure to end.

With the changes in this PR, using wait.UntilWithContext may lead to a deadlock:

  • We add the workers to the WaitGroup
  • Context gets cancelled before the closure is started
  • wait.UntilWithContext will not start the closure so nothing will ever call wg.Done()
  • The wg.Wait is deadlocked

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes lots of sense, and it'd actually explain why I was still noticing some errors during tests that wouldn't properly clean up or reconcile everything before shutting stuff down (including the event recorder, which panics).

go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}()
}

c.Started = true
Expand All @@ -226,7 +224,9 @@ func (c *Controller) Start(ctx context.Context) error {
}

<-ctx.Done()
c.Log.Info("Stopping workers")
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.Log.Info("All workers finished")
return nil
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,6 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request if there is an error and continue processing items", func(done Done) {
// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
ctrl.JitterPeriod = time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -597,7 +595,6 @@ var _ = Describe("controller", func() {
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
ctrl.JitterPeriod = time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -790,9 +787,6 @@ var _ = Describe("controller", func() {
}()
queue.Add(request)

// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
ctrl.JitterPeriod = time.Millisecond

By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
Expect(<-reconciled).To(Equal(request))
Expand Down