diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 64a27a7a22..164167a8c2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) // Create controller with dependencies set c := &controller.Controller{ - Do: options.Reconciler, - Cache: mgr.GetCache(), - Config: mgr.GetConfig(), - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Recorder: mgr.GetRecorder(name), - Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + Do: options.Reconciler, + Cache: mgr.GetCache(), + Config: mgr.GetConfig(), + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Recorder: mgr.GetRecorder(name), + Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), MaxConcurrentReconciles: options.MaxConcurrentReconciles, - Name: name, + Name: name, } // Add the controller as a Manager components diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 9015eaa518..c0e6efa96d 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -63,9 +63,9 @@ var _ = Describe("controller", func() { informers = &informertest.FakeInformers{} ctrl = &Controller{ MaxConcurrentReconciles: 1, - Do: fakeReconcile, - Queue: queue, - Cache: informers, + Do: fakeReconcile, + Queue: queue, + Cache: informers, } ctrl.InjectFunc(func(interface{}) error { return nil }) }) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index ce723e36ae..a35d06ca20 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -83,6 +83,9 @@ type controllerManager struct { errChan chan error stop <-chan struct{} + // stopper is the write side of the stop channel. They should have the same value. + stopper chan<- struct{} + startCache func(stop <-chan struct{}) error } @@ -192,13 +195,16 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { } func (cm *controllerManager) Start(stop <-chan struct{}) error { + // join the passed-in stop channel as an upstream feeding into cm.stopper + defer close(cm.stopper) + if cm.resourceLock != nil { - err := cm.startLeaderElection(stop) + err := cm.startLeaderElection() if err != nil { return err } } else { - go cm.start(stop) + go cm.start() } select { @@ -211,30 +217,28 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { } } -func (cm *controllerManager) start(stop <-chan struct{}) { +func (cm *controllerManager) start() { cm.mu.Lock() defer cm.mu.Unlock() - cm.stop = stop - // Start the Cache. Allow the function to start the cache to be mocked out for testing if cm.startCache == nil { cm.startCache = cm.cache.Start } go func() { - if err := cm.startCache(stop); err != nil { + if err := cm.startCache(cm.stop); err != nil { cm.errChan <- err } }() // Start the metrics server if cm.metricsListener != nil { - go cm.serveMetrics(stop) + go cm.serveMetrics(cm.stop) } // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(stop) + cm.cache.WaitForCacheSync(cm.stop) // Start the runnables after the cache has synced for _, c := range cm.runnables { @@ -242,14 +246,14 @@ func (cm *controllerManager) start(stop <-chan struct{}) { // Write any Start errors to a channel so we can return them ctrl := c go func() { - cm.errChan <- ctrl.Start(stop) + cm.errChan <- ctrl.Start(cm.stop) }() } cm.started = true } -func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err error) { +func (cm *controllerManager) startLeaderElection() (err error) { l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, // Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go @@ -259,7 +263,7 @@ func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err erro RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ <-chan struct{}) { - cm.start(stop) + cm.start() }, OnStoppedLeading: func() { // Most implementations of leader election log.Fatal() here. diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c2adce3f70..5966714663 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -200,6 +200,8 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + stop := make(chan struct{}) + return &controllerManager{ config: config, scheme: options.Scheme, @@ -219,6 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) { resourceLock: resourceLock, mapper: mapper, metricsListener: metricsListener, + stop: stop, + stopper: stop, }, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index f9a2ceb4e4..759750708d 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -497,8 +497,7 @@ var _ = Describe("manger.Manager", func() { }, stop: func(stop <-chan struct{}) error { defer GinkgoRecover() - // Manager stop chan has not been initialized. - Expect(stop).To(BeNil()) + Expect(stop).NotTo(BeNil()) return nil }, f: func(f inject.Func) error {