From 8f4937b0f576334c3c6ec83759fa83847bed4464 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sun, 14 Mar 2021 00:06:44 -0500 Subject: [PATCH] :bug Controller: Wait for all reconciliations before shutting down Currently, the controller will instantly shutdown and return when its context gets cancelled, leaving active reconciliations be. This change makes it wait for those before finishing shutdown. --- pkg/controller/controller_test.go | 36 ++++++++++++++++++++-- pkg/internal/controller/controller.go | 24 +++++++-------- pkg/internal/controller/controller_test.go | 6 ---- 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 5b68ae2299..4b0a2ee914 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -19,16 +19,21 @@ package controller_test import ( "context" "fmt" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "go.uber.org/goleak" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" + "sigs.k8s.io/controller-runtime/pkg/source" ) var _ = Describe("controller.Controller", func() { @@ -88,15 +93,42 @@ var _ = Describe("controller.Controller", func() { It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() + watchChan := make(chan event.GenericEvent, 1) + watch := &source.Channel{Source: watchChan} + watchChan <- event.GenericEvent{Object: &corev1.Pod{}} + + reconcileStarted := make(chan struct{}) + controllerFinished := make(chan struct{}) + rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { + defer GinkgoRecover() + close(reconcileStarted) + // Make sure reconciliation takes a moment and is not quicker than the controllers + // shutdown. + time.Sleep(50 * time.Millisecond) + // Explicitly test this on top of the leakdetection, as the latter uses Eventually + // so might succeed even when the controller does not wait for all reconciliations + // to finish. + Expect(controllerFinished).NotTo(BeClosed()) + return reconcile.Result{}, nil + }) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - _, err = controller.New("new-controller", m, controller.Options{Reconciler: rec}) + c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec}) + Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed()) Expect(err).NotTo(HaveOccurred()) ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).To(Succeed()) + close(controllerFinished) + }() + + <-reconcileStarted cancel() - Expect(m.Start(ctx)).To(Succeed()) + <-controllerFinished // force-close keep-alive connections. These'll time anyway (after // like 30s or so) but force it to speed up the tests. diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 6164d58efc..f5024502d9 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -25,7 +25,6 @@ import ( "github.com/go-logr/logr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" @@ -67,9 +66,6 @@ type Controller struct { // mu is used to synchronize Controller setup mu sync.Mutex - // JitterPeriod allows tests to reduce the JitterPeriod so they complete faster - JitterPeriod time.Duration - // Started is true if the Controller has been Started Started bool @@ -150,8 +146,12 @@ func (c *Controller) Start(ctx context.Context) error { c.ctx = ctx c.Queue = c.MakeQueue() - defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed + go func() { + <-ctx.Done() + c.Queue.ShutDown() + }() + wg := &sync.WaitGroup{} err := func() error { defer c.mu.Unlock() @@ -203,19 +203,17 @@ func (c *Controller) Start(ctx context.Context) error { // which won't be garbage collected if we hold a reference to it. c.startWatches = nil - if c.JitterPeriod == 0 { - c.JitterPeriod = 1 * time.Second - } - // Launch workers to process resources c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles) + wg.Add(c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { - go wait.UntilWithContext(ctx, func(ctx context.Context) { + go func() { + defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. for c.processNextWorkItem(ctx) { } - }, c.JitterPeriod) + }() } c.Started = true @@ -226,7 +224,9 @@ func (c *Controller) Start(ctx context.Context) error { } <-ctx.Done() - c.Log.Info("Stopping workers") + c.Log.Info("Shutdown signal received, waiting for all workers to finish") + wg.Wait() + c.Log.Info("All workers finished") return nil } diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index c954581f92..35cd5744ef 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -469,8 +469,6 @@ var _ = Describe("controller", func() { }) It("should requeue a Request if there is an error and continue processing items", func(done Done) { - // Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun. - ctrl.JitterPeriod = time.Millisecond ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -597,7 +595,6 @@ var _ = Describe("controller", func() { It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } - ctrl.JitterPeriod = time.Millisecond ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -790,9 +787,6 @@ var _ = Describe("controller", func() { }() queue.Add(request) - // Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun. - ctrl.JitterPeriod = time.Millisecond - By("Invoking Reconciler which will give an error") fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile")) Expect(<-reconciled).To(Equal(request))