diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 5b68ae2299..4b0a2ee914 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -19,16 +19,21 @@ package controller_test import ( "context" "fmt" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "go.uber.org/goleak" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" + "sigs.k8s.io/controller-runtime/pkg/source" ) var _ = Describe("controller.Controller", func() { @@ -88,15 +93,42 @@ var _ = Describe("controller.Controller", func() { It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() + watchChan := make(chan event.GenericEvent, 1) + watch := &source.Channel{Source: watchChan} + watchChan <- event.GenericEvent{Object: &corev1.Pod{}} + + reconcileStarted := make(chan struct{}) + controllerFinished := make(chan struct{}) + rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { + defer GinkgoRecover() + close(reconcileStarted) + // Make sure reconciliation takes a moment and is not quicker than the controllers + // shutdown. + time.Sleep(50 * time.Millisecond) + // Explicitly test this on top of the leakdetection, as the latter uses Eventually + // so might succeed even when the controller does not wait for all reconciliations + // to finish. + Expect(controllerFinished).NotTo(BeClosed()) + return reconcile.Result{}, nil + }) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - _, err = controller.New("new-controller", m, controller.Options{Reconciler: rec}) + c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec}) + Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed()) Expect(err).NotTo(HaveOccurred()) ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).To(Succeed()) + close(controllerFinished) + }() + + <-reconcileStarted cancel() - Expect(m.Start(ctx)).To(Succeed()) + <-controllerFinished // force-close keep-alive connections. These'll time anyway (after // like 30s or so) but force it to speed up the tests. diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 6164d58efc..f5024502d9 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -25,7 +25,6 @@ import ( "github.com/go-logr/logr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" @@ -67,9 +66,6 @@ type Controller struct { // mu is used to synchronize Controller setup mu sync.Mutex - // JitterPeriod allows tests to reduce the JitterPeriod so they complete faster - JitterPeriod time.Duration - // Started is true if the Controller has been Started Started bool @@ -150,8 +146,12 @@ func (c *Controller) Start(ctx context.Context) error { c.ctx = ctx c.Queue = c.MakeQueue() - defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed + go func() { + <-ctx.Done() + c.Queue.ShutDown() + }() + wg := &sync.WaitGroup{} err := func() error { defer c.mu.Unlock() @@ -203,19 +203,17 @@ func (c *Controller) Start(ctx context.Context) error { // which won't be garbage collected if we hold a reference to it. c.startWatches = nil - if c.JitterPeriod == 0 { - c.JitterPeriod = 1 * time.Second - } - // Launch workers to process resources c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles) + wg.Add(c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { - go wait.UntilWithContext(ctx, func(ctx context.Context) { + go func() { + defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. for c.processNextWorkItem(ctx) { } - }, c.JitterPeriod) + }() } c.Started = true @@ -226,7 +224,9 @@ func (c *Controller) Start(ctx context.Context) error { } <-ctx.Done() - c.Log.Info("Stopping workers") + c.Log.Info("Shutdown signal received, waiting for all workers to finish") + wg.Wait() + c.Log.Info("All workers finished") return nil } diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index c954581f92..35cd5744ef 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -469,8 +469,6 @@ var _ = Describe("controller", func() { }) It("should requeue a Request if there is an error and continue processing items", func(done Done) { - // Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun. - ctrl.JitterPeriod = time.Millisecond ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -597,7 +595,6 @@ var _ = Describe("controller", func() { It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } - ctrl.JitterPeriod = time.Millisecond ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -790,9 +787,6 @@ var _ = Describe("controller", func() { }() queue.Add(request) - // Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun. - ctrl.JitterPeriod = time.Millisecond - By("Invoking Reconciler which will give an error") fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile")) Expect(<-reconciled).To(Equal(request))