diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a35d06ca20..34eb7092fa 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -69,7 +69,7 @@ type controllerManager struct { // (and EventHandlers, Sources and Predicates). recorderProvider recorder.Provider - // resourceLock + // resourceLock forms the basis for leader election resourceLock resourcelock.Interface // mapper is used to map resources to kind, and map kind and version. @@ -81,10 +81,16 @@ type controllerManager struct { mu sync.Mutex started bool errChan chan error - stop <-chan struct{} - // stopper is the write side of the stop channel. They should have the same value. - stopper chan<- struct{} + // internalStop is the stop channel *actually* used by everything involved + // with the manager as a stop channel, so that we can pass a stop channel + // to things that need it off the bat (like the Channel source). It can + // be closed via `internalStopper` (by being the same underlying channel). + internalStop <-chan struct{} + + // internalStopper is the write side of the internal stop channel, allowing us to close it. + // It and `internalStop` should point to the same channel. + internalStopper chan<- struct{} startCache func(stop <-chan struct{}) error } @@ -104,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error { if cm.started { // If already started, start the controller go func() { - cm.errChan <- r.Start(cm.stop) + cm.errChan <- r.Start(cm.internalStop) }() } @@ -127,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error { if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } - if _, err := inject.StopChannelInto(cm.stop, i); err != nil { + if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil { return err } if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil { @@ -195,8 +201,8 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { } func (cm *controllerManager) Start(stop <-chan struct{}) error { - // join the passed-in stop channel as an upstream feeding into cm.stopper - defer close(cm.stopper) + // join the passed-in stop channel as an upstream feeding into cm.internalStopper + defer close(cm.internalStopper) if cm.resourceLock != nil { err := cm.startLeaderElection() @@ -226,19 +232,19 @@ func (cm *controllerManager) start() { cm.startCache = cm.cache.Start } go func() { - if err := cm.startCache(cm.stop); err != nil { + if err := cm.startCache(cm.internalStop); err != nil { cm.errChan <- err } }() // Start the metrics server if cm.metricsListener != nil { - go cm.serveMetrics(cm.stop) + go cm.serveMetrics(cm.internalStop) } // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(cm.stop) + cm.cache.WaitForCacheSync(cm.internalStop) // Start the runnables after the cache has synced for _, c := range cm.runnables { @@ -246,7 +252,7 @@ func (cm *controllerManager) start() { // Write any Start errors to a channel so we can return them ctrl := c go func() { - cm.errChan <- ctrl.Start(cm.stop) + cm.errChan <- ctrl.Start(cm.internalStop) }() } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5966714663..1230c74573 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -221,8 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) { resourceLock: resourceLock, mapper: mapper, metricsListener: metricsListener, - stop: stop, - stopper: stop, + internalStop: stop, + internalStopper: stop, }, nil }