From be88210db0cc56953f9bd388f5dd94f1698e251b Mon Sep 17 00:00:00 2001 From: Michael Hrivnak Date: Wed, 19 Sep 2018 15:39:05 -0400 Subject: [PATCH] fixes nil stop value for source.Channel fixes #103 --- pkg/manager/internal.go | 87 +++++++++++++++++++++---------------- pkg/manager/manager.go | 4 ++ pkg/manager/manager_test.go | 3 +- 3 files changed, 54 insertions(+), 40 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 87108df300..af833627a0 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -75,6 +75,9 @@ type controllerManager struct { errChan chan error stop <-chan struct{} + // stopper is the write side of the stop channel. They should have the same value. + stopper chan<- struct{} + startCache func(stop <-chan struct{}) error } @@ -159,9 +162,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { func (cm *controllerManager) Start(stop <-chan struct{}) error { if cm.resourceLock == nil { - go cm.start(stop) + // join the passed-in stop channel as an upstream feeding into cm.stopper + go func() { + <-stop + close(cm.stopper) + }() + + go cm.start() select { - case <-stop: + case <-cm.stop: // we are done return nil case err := <-cm.errChan: @@ -178,7 +187,19 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: cm.start, + // This type changes in k8s 1.12 to func(context.Context) + OnStartedLeading: func(stopleading <-chan struct{}) { + // join both stop and stopleading so they feed into cm.stopper + go func() { + select { + case <-stop: + close(cm.stopper) + case <-stopleading: + close(cm.stopper) + } + }() + cm.start() + }, OnStoppedLeading: func() { // Most implementations of leader election log.Fatal() here. // Since Start is wrapped in log.Fatal when called, we can just return @@ -194,7 +215,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { go l.Run() select { - case <-stop: + case <-cm.stop: // We are done return nil case err := <-cm.errChan: @@ -203,43 +224,33 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { } } -func (cm *controllerManager) start(stop <-chan struct{}) { - func() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.stop = stop - - // Start the Cache. Allow the function to start the cache to be mocked out for testing - if cm.startCache == nil { - cm.startCache = cm.cache.Start - } - go func() { - if err := cm.startCache(stop); err != nil { - cm.errChan <- err - } - }() +func (cm *controllerManager) start() { + cm.mu.Lock() + defer cm.mu.Unlock() - // Wait for the caches to sync. - // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(stop) - - // Start the runnables after the cache has synced - for _, c := range cm.runnables { - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - ctrl := c - go func() { - cm.errChan <- ctrl.Start(stop) - }() + // Start the Cache. Allow the function to start the cache to be mocked out for testing + if cm.startCache == nil { + cm.startCache = cm.cache.Start + } + go func() { + if err := cm.startCache(cm.stop); err != nil { + cm.errChan <- err } - - cm.started = true }() - select { - case <-stop: - // We are done - return + // Wait for the caches to sync. + // TODO(community): Check the return value and write a test + cm.cache.WaitForCacheSync(cm.stop) + + // Start the runnables after the cache has synced + for _, c := range cm.runnables { + // Controllers block, but we want to return an error if any have an error starting. + // Write any Start errors to a channel so we can return them + ctrl := c + go func() { + cm.errChan <- ctrl.Start(cm.stop) + }() } + + cm.started = true } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index cd093a2d96..8f9ed015ff 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -180,6 +180,8 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + stop := make(chan struct{}) + return &controllerManager{ config: config, scheme: options.Scheme, @@ -191,6 +193,8 @@ func New(config *rest.Config, options Options) (Manager, error) { recorderProvider: recorderProvider, resourceLock: resourceLock, mapper: mapper, + stop: stop, + stopper: stop, }, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index a3de89d053..044b06409e 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -327,8 +327,7 @@ var _ = Describe("manger.Manager", func() { }, stop: func(stop <-chan struct{}) error { defer GinkgoRecover() - // Manager stop chan has not been initialized. - Expect(stop).To(BeNil()) + Expect(stop).NotTo(BeNil()) return nil }, f: func(f inject.Func) error {