From a7f0df057a61f0e6f8b6b56627228b4b1e2df084 Mon Sep 17 00:00:00 2001 From: Michael Hrivnak Date: Wed, 19 Sep 2018 15:39:05 -0400 Subject: [PATCH 1/4] fix nil stop value for source.Channel fixes #103 Creates a stop channel for the manager in New(), which will get passed to any source.Channel instances that are added. When the manager's start method is called and a new stop channel is passed in, that channel will be joined in a goroutine with the manager's existing channel so that if the newer channel gets closed, so will the manager's. --- pkg/controller/controller.go | 16 ++++++------- pkg/internal/controller/controller_test.go | 6 ++--- pkg/manager/internal.go | 26 +++++++++++++--------- pkg/manager/manager.go | 4 ++++ pkg/manager/manager_test.go | 3 +-- 5 files changed, 31 insertions(+), 24 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 64a27a7a22..164167a8c2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) // Create controller with dependencies set c := &controller.Controller{ - Do: options.Reconciler, - Cache: mgr.GetCache(), - Config: mgr.GetConfig(), - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Recorder: mgr.GetRecorder(name), - Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + Do: options.Reconciler, + Cache: mgr.GetCache(), + Config: mgr.GetConfig(), + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Recorder: mgr.GetRecorder(name), + Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), MaxConcurrentReconciles: options.MaxConcurrentReconciles, - Name: name, + Name: name, } // Add the controller as a Manager components diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index f5c2ec7524..1269c07a29 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -63,9 +63,9 @@ var _ = Describe("controller", func() { informers = &informertest.FakeInformers{} ctrl = &Controller{ MaxConcurrentReconciles: 1, - Do: fakeReconcile, - Queue: queue, - Cache: informers, + Do: fakeReconcile, + Queue: queue, + Cache: informers, } ctrl.InjectFunc(func(interface{}) error { return nil }) }) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index ce723e36ae..a35d06ca20 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -83,6 +83,9 @@ type controllerManager struct { errChan chan error stop <-chan struct{} + // stopper is the write side of the stop channel. They should have the same value. + stopper chan<- struct{} + startCache func(stop <-chan struct{}) error } @@ -192,13 +195,16 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { } func (cm *controllerManager) Start(stop <-chan struct{}) error { + // join the passed-in stop channel as an upstream feeding into cm.stopper + defer close(cm.stopper) + if cm.resourceLock != nil { - err := cm.startLeaderElection(stop) + err := cm.startLeaderElection() if err != nil { return err } } else { - go cm.start(stop) + go cm.start() } select { @@ -211,30 +217,28 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { } } -func (cm *controllerManager) start(stop <-chan struct{}) { +func (cm *controllerManager) start() { cm.mu.Lock() defer cm.mu.Unlock() - cm.stop = stop - // Start the Cache. Allow the function to start the cache to be mocked out for testing if cm.startCache == nil { cm.startCache = cm.cache.Start } go func() { - if err := cm.startCache(stop); err != nil { + if err := cm.startCache(cm.stop); err != nil { cm.errChan <- err } }() // Start the metrics server if cm.metricsListener != nil { - go cm.serveMetrics(stop) + go cm.serveMetrics(cm.stop) } // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(stop) + cm.cache.WaitForCacheSync(cm.stop) // Start the runnables after the cache has synced for _, c := range cm.runnables { @@ -242,14 +246,14 @@ func (cm *controllerManager) start(stop <-chan struct{}) { // Write any Start errors to a channel so we can return them ctrl := c go func() { - cm.errChan <- ctrl.Start(stop) + cm.errChan <- ctrl.Start(cm.stop) }() } cm.started = true } -func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err error) { +func (cm *controllerManager) startLeaderElection() (err error) { l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, // Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go @@ -259,7 +263,7 @@ func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err erro RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ <-chan struct{}) { - cm.start(stop) + cm.start() }, OnStoppedLeading: func() { // Most implementations of leader election log.Fatal() here. diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c2adce3f70..5966714663 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -200,6 +200,8 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + stop := make(chan struct{}) + return &controllerManager{ config: config, scheme: options.Scheme, @@ -219,6 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) { resourceLock: resourceLock, mapper: mapper, metricsListener: metricsListener, + stop: stop, + stopper: stop, }, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index f9a2ceb4e4..759750708d 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -497,8 +497,7 @@ var _ = Describe("manger.Manager", func() { }, stop: func(stop <-chan struct{}) error { defer GinkgoRecover() - // Manager stop chan has not been initialized. - Expect(stop).To(BeNil()) + Expect(stop).NotTo(BeNil()) return nil }, f: func(f inject.Func) error { From e0ee559eb78bf816b85faf85625a80acc1f14caa Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Fri, 9 Nov 2018 10:56:30 -0800 Subject: [PATCH 2/4] Stop channel commenting/renaming for clarity This cleans up, adds comments, and renames things in the stop channel fix commits for clarity. --- pkg/manager/internal.go | 30 ++++++++++++++++++------------ pkg/manager/manager.go | 4 ++-- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a35d06ca20..34eb7092fa 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -69,7 +69,7 @@ type controllerManager struct { // (and EventHandlers, Sources and Predicates). recorderProvider recorder.Provider - // resourceLock + // resourceLock forms the basis for leader election resourceLock resourcelock.Interface // mapper is used to map resources to kind, and map kind and version. @@ -81,10 +81,16 @@ type controllerManager struct { mu sync.Mutex started bool errChan chan error - stop <-chan struct{} - // stopper is the write side of the stop channel. They should have the same value. - stopper chan<- struct{} + // internalStop is the stop channel *actually* used by everything involved + // with the manager as a stop channel, so that we can pass a stop channel + // to things that need it off the bat (like the Channel source). It can + // be closed via `internalStopper` (by being the same underlying channel). + internalStop <-chan struct{} + + // internalStopper is the write side of the internal stop channel, allowing us to close it. + // It and `internalStop` should point to the same channel. + internalStopper chan<- struct{} startCache func(stop <-chan struct{}) error } @@ -104,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error { if cm.started { // If already started, start the controller go func() { - cm.errChan <- r.Start(cm.stop) + cm.errChan <- r.Start(cm.internalStop) }() } @@ -127,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error { if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } - if _, err := inject.StopChannelInto(cm.stop, i); err != nil { + if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil { return err } if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil { @@ -195,8 +201,8 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { } func (cm *controllerManager) Start(stop <-chan struct{}) error { - // join the passed-in stop channel as an upstream feeding into cm.stopper - defer close(cm.stopper) + // join the passed-in stop channel as an upstream feeding into cm.internalStopper + defer close(cm.internalStopper) if cm.resourceLock != nil { err := cm.startLeaderElection() @@ -226,19 +232,19 @@ func (cm *controllerManager) start() { cm.startCache = cm.cache.Start } go func() { - if err := cm.startCache(cm.stop); err != nil { + if err := cm.startCache(cm.internalStop); err != nil { cm.errChan <- err } }() // Start the metrics server if cm.metricsListener != nil { - go cm.serveMetrics(cm.stop) + go cm.serveMetrics(cm.internalStop) } // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(cm.stop) + cm.cache.WaitForCacheSync(cm.internalStop) // Start the runnables after the cache has synced for _, c := range cm.runnables { @@ -246,7 +252,7 @@ func (cm *controllerManager) start() { // Write any Start errors to a channel so we can return them ctrl := c go func() { - cm.errChan <- ctrl.Start(cm.stop) + cm.errChan <- ctrl.Start(cm.internalStop) }() } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5966714663..1230c74573 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -221,8 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) { resourceLock: resourceLock, mapper: mapper, metricsListener: metricsListener, - stop: stop, - stopper: stop, + internalStop: stop, + internalStopper: stop, }, nil } From fc38a77331f82f58d5a9ea4b6785d90b22bde489 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Mon, 12 Nov 2018 10:47:32 -0800 Subject: [PATCH 3/4] [travis] Bump Go to 1.11 This will match the version currently used in Kubernetes. --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 87989d1e6c..de5687f36e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ os: - osx go: -- "1.10" +- "1.11" git: depth: 3 From 071c3a27e8e21246809b2da02b61fd5dd8844db9 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Mon, 12 Nov 2018 17:02:37 -0800 Subject: [PATCH 4/4] Unused param in cache.Reader Unparam is complaining of an usused param in cache.Reader in Travis (but somehow only on Travis?). This fixes that. --- pkg/cache/internal/cache_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/internal/cache_reader.go b/pkg/cache/internal/cache_reader.go index 199d0bf835..0d82c6396d 100644 --- a/pkg/cache/internal/cache_reader.go +++ b/pkg/cache/internal/cache_reader.go @@ -86,7 +86,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out runtime.O } // List lists items out of the indexer and writes them to out -func (c *CacheReader) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error { +func (c *CacheReader) List(_ context.Context, opts *client.ListOptions, out runtime.Object) error { var objs []interface{} var err error