Skip to content

Commit

Permalink
Merge pull request #1427 from alvaroaleman/fix-wait
Browse files Browse the repository at this point in the history
🐛 Controller: Wait for all reconciliations before shutting down
  • Loading branch information
k8s-ci-robot committed Mar 14, 2021
2 parents 253f275 + aca66a0 commit df2c43d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 20 deletions.
36 changes: 34 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@ package controller_test
import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var _ = Describe("controller.Controller", func() {
Expand Down Expand Up @@ -88,15 +93,42 @@ var _ = Describe("controller.Controller", func() {
It("should not leak goroutines when stopped", func() {
currentGRs := goleak.IgnoreCurrent()

watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
controllerFinished := make(chan struct{})
rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
defer GinkgoRecover()
close(reconcileStarted)
// Make sure reconciliation takes a moment and is not quicker than the controllers
// shutdown.
time.Sleep(50 * time.Millisecond)
// Explicitly test this on top of the leakdetection, as the latter uses Eventually
// so might succeed even when the controller does not wait for all reconciliations
// to finish.
Expect(controllerFinished).NotTo(BeClosed())
return reconcile.Result{}, nil
})

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).To(Succeed())
close(controllerFinished)
}()

<-reconcileStarted
cancel()
Expect(m.Start(ctx)).To(Succeed())
<-controllerFinished

// force-close keep-alive connections. These'll time anyway (after
// like 30s or so) but force it to speed up the tests.
Expand Down
24 changes: 12 additions & 12 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/go-logr/logr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -67,9 +66,6 @@ type Controller struct {
// mu is used to synchronize Controller setup
mu sync.Mutex

// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
JitterPeriod time.Duration

// Started is true if the Controller has been Started
Started bool

Expand Down Expand Up @@ -150,8 +146,12 @@ func (c *Controller) Start(ctx context.Context) error {
c.ctx = ctx

c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}()

wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()

Expand Down Expand Up @@ -203,19 +203,17 @@ func (c *Controller) Start(ctx context.Context) error {
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil

if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}

// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}()
}

c.Started = true
Expand All @@ -226,7 +224,9 @@ func (c *Controller) Start(ctx context.Context) error {
}

<-ctx.Done()
c.Log.Info("Stopping workers")
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.Log.Info("All workers finished")
return nil
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,6 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request if there is an error and continue processing items", func(done Done) {
// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
ctrl.JitterPeriod = time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -597,7 +595,6 @@ var _ = Describe("controller", func() {
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
ctrl.JitterPeriod = time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -790,9 +787,6 @@ var _ = Describe("controller", func() {
}()
queue.Add(request)

// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
ctrl.JitterPeriod = time.Millisecond

By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
Expect(<-reconciled).To(Equal(request))
Expand Down

0 comments on commit df2c43d

Please sign in to comment.