From 977e1fae5c08a82972928acb7e4f91c21f0320a1 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Fri, 9 Nov 2018 10:56:30 -0800 Subject: [PATCH] Stop channel commenting/renaming for clarity This cleans up, adds comments, and renames things in the stop channel fix commits for clarity. --- pkg/controller/controller.go | 16 +++++----- pkg/internal/controller/controller_test.go | 6 ++-- pkg/manager/internal.go | 37 ++++++++++++++-------- pkg/manager/manager.go | 4 +-- 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 64a27a7a22..164167a8c2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) // Create controller with dependencies set c := &controller.Controller{ - Do: options.Reconciler, - Cache: mgr.GetCache(), - Config: mgr.GetConfig(), - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Recorder: mgr.GetRecorder(name), - Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + Do: options.Reconciler, + Cache: mgr.GetCache(), + Config: mgr.GetConfig(), + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Recorder: mgr.GetRecorder(name), + Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), MaxConcurrentReconciles: options.MaxConcurrentReconciles, - Name: name, + Name: name, } // Add the controller as a Manager components diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index ebef1dbb66..79ee4a8b53 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -60,9 +60,9 @@ var _ = Describe("controller", func() { informers = &informertest.FakeInformers{} ctrl = &Controller{ MaxConcurrentReconciles: 1, - Do: fakeReconcile, - Queue: queue, - Cache: informers, + Do: fakeReconcile, + Queue: queue, + Cache: informers, } ctrl.InjectFunc(func(interface{}) error { return nil }) }) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 22459deaec..e8a84bad6e 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -64,7 +64,7 @@ type controllerManager struct { // (and EventHandlers, Sources and Predicates). recorderProvider recorder.Provider - // resourceLock + // resourceLock forms the basis for leader election resourceLock resourcelock.Interface // mapper is used to map resources to kind, and map kind and version. @@ -73,10 +73,16 @@ type controllerManager struct { mu sync.Mutex started bool errChan chan error - stop <-chan struct{} - // stopper is the write side of the stop channel. They should have the same value. - stopper chan<- struct{} + // internalStop is the stop channel *actually* used by everything involved + // with the manager as a stop channel, so that we can pass a stop channel + // to things that need it off the bat (like the Channel source). It can + // be closed via `internalStopper` (by being the same underlying channel). + internalStop <-chan struct{} + + // internalStopper is the write side of the internal stop channel, allowing us to close it. + // It and `internalStop` should point to the same channel. + internalStopper chan<- struct{} startCache func(stop <-chan struct{}) error } @@ -96,7 +102,7 @@ func (cm *controllerManager) Add(r Runnable) error { if cm.started { // If already started, start the controller go func() { - cm.errChan <- r.Start(cm.stop) + cm.errChan <- r.Start(cm.internalStop) }() } @@ -119,7 +125,7 @@ func (cm *controllerManager) SetFields(i interface{}) error { if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } - if _, err := inject.StopChannelInto(cm.stop, i); err != nil { + if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil { return err } if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil { @@ -161,13 +167,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { } func (cm *controllerManager) Start(stop <-chan struct{}) error { - defer close(cm.stopper) + // stop everything we start when we exit this method for any reason + defer close(cm.internalStopper) + // if we're not using resource election, start directly if cm.resourceLock == nil { - go cm.start() + cm.start() select { // Only this function should receive from stop, and everything else - // should receive from cm.stop. + // should receive from cm.internalStop. case <-stop: // we are done return nil @@ -177,7 +185,8 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { } } - l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + // otherwise, start when we acquire a resource election lock + leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, // Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go // TODO(joelspeed): These timings should be configurable @@ -204,7 +213,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { return err } - go l.Run() + go leaderElector.Run() select { case <-stop: @@ -225,14 +234,14 @@ func (cm *controllerManager) start() { cm.startCache = cm.cache.Start } go func() { - if err := cm.startCache(cm.stop); err != nil { + if err := cm.startCache(cm.internalStop); err != nil { cm.errChan <- err } }() // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(cm.stop) + cm.cache.WaitForCacheSync(cm.internalStop) // Start the runnables after the cache has synced for _, c := range cm.runnables { @@ -240,7 +249,7 @@ func (cm *controllerManager) start() { // Write any Start errors to a channel so we can return them ctrl := c go func() { - cm.errChan <- ctrl.Start(cm.stop) + cm.errChan <- ctrl.Start(cm.internalStop) }() } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 8f9ed015ff..6f6534afbd 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -193,8 +193,8 @@ func New(config *rest.Config, options Options) (Manager, error) { recorderProvider: recorderProvider, resourceLock: resourceLock, mapper: mapper, - stop: stop, - stopper: stop, + internalStop: stop, + internalStopper: stop, }, nil }