From 26660e7839eeb5614e558cad89675fdaf3512969 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 9 May 2020 18:31:23 -0400 Subject: [PATCH] Start caches before everything else --- pkg/clusterconnector/clusterconnector.go | 32 ++++++++++++++- pkg/clusterconnector/clusterconnector_test.go | 4 +- pkg/clusterconnector/internal.go | 4 ++ pkg/manager/internal.go | 41 ++++++++++++------- pkg/manager/manager.go | 23 +++++------ pkg/manager/manager_test.go | 22 +++++++++- pkg/manager/runner/runner.go | 27 ++++++++++++ 7 files changed, 120 insertions(+), 33 deletions(-) create mode 100644 pkg/manager/runner/runner.go diff --git a/pkg/clusterconnector/clusterconnector.go b/pkg/clusterconnector/clusterconnector.go index 344ee2edd0..45709a7c8b 100644 --- a/pkg/clusterconnector/clusterconnector.go +++ b/pkg/clusterconnector/clusterconnector.go @@ -32,9 +32,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/controller-runtime/pkg/manager/runner" "sigs.k8s.io/controller-runtime/pkg/recorder" ) +// ClusterConnector contains everything thats needed to build a controller +// that watches and interacts with objects in the given cluster. type ClusterConnector interface { // SetFields will set any dependencies on an object for which the object has implemented the inject // interface - e.g. inject.Client. @@ -68,6 +71,9 @@ type ClusterConnector interface { // This should be used sparingly and only when the client does not fit your // use case. GetAPIReader() client.Reader + + // AddToManager adds the ClusterConnector to a manager + AddToManager(mgr Manager) error } // NewClientFunc allows a user to define how to create a client @@ -91,6 +97,7 @@ func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Opt }, nil } +// Options holds all possible Options for a ClusterConnector type Options struct { // Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources // Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better @@ -137,6 +144,7 @@ type Options struct { newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) } +// Apply implements Option func (o Options) Apply(target *Options) { if o.Scheme != nil { target.Scheme = o.Scheme @@ -173,6 +181,7 @@ func (o Options) Apply(target *Options) { var _ Option = Options{} +// Option can be used to customize a ClusterConnector type Option interface { Apply(o *Options) } @@ -222,7 +231,28 @@ func setOptionsDefaults(options Options) Options { return options } -func New(config *rest.Config, name string, opts ...Option) (ClusterConnector, error) { +// Manager manages the lifecycle of Runnables +type Manager interface { + Add(r runner.Runnable) error +} + +// New creates a new ClusterConnector +func New(config *rest.Config, mgr Manager, name string, opts ...Option) (ClusterConnector, error) { + cc, err := NewUnmanaged(config, name, opts...) + if err != nil { + return nil, err + } + + if err := cc.AddToManager(mgr); err != nil { + return nil, err + } + + return cc, nil +} + +// NewUnmanaged creates a new unmanaged ClusterConnector. It must be manually added to a +// Manager by calling its AddToManager. +func NewUnmanaged(config *rest.Config, name string, opts ...Option) (ClusterConnector, error) { log := logf.RuntimeLog.WithName("clusterconnector").WithValues("name", name) if config == nil { return nil, fmt.Errorf("must specify Config") diff --git a/pkg/clusterconnector/clusterconnector_test.go b/pkg/clusterconnector/clusterconnector_test.go index 0f596d1838..f9298a3126 100644 --- a/pkg/clusterconnector/clusterconnector_test.go +++ b/pkg/clusterconnector/clusterconnector_test.go @@ -95,7 +95,7 @@ var _ = Describe("clusterconnector.ClusterConnector", func() { Describe("New", func() { It("should return an error it can't create a recorder.Provider", func(done Done) { - m, err := New(cfg, "", Options{ + m, err := NewUnmanaged(cfg, "", Options{ newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) { return nil, fmt.Errorf("expected error") }, @@ -111,7 +111,7 @@ var _ = Describe("clusterconnector.ClusterConnector", func() { Describe("SetFields", func() { It("should inject field values", func(done Done) { - c, err := New(cfg, "", Options{}) + c, err := NewUnmanaged(cfg, "", Options{}) Expect(err).NotTo(HaveOccurred()) cc, ok := c.(*clusterConnector) Expect(ok).To(BeTrue()) diff --git a/pkg/clusterconnector/internal.go b/pkg/clusterconnector/internal.go index 9bb7612cb0..853b18e1b5 100644 --- a/pkg/clusterconnector/internal.go +++ b/pkg/clusterconnector/internal.go @@ -115,3 +115,7 @@ func (cc *clusterConnector) GetRESTMapper() meta.RESTMapper { func (cc *clusterConnector) GetAPIReader() client.Reader { return cc.apiReader } + +func (cc *clusterConnector) AddToManager(mgr Manager) error { + return mgr.Add(cc.GetCache()) +} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 524194e01e..0b43622535 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -28,9 +28,11 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/clusterconnector" "sigs.k8s.io/controller-runtime/pkg/healthz" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + "sigs.k8s.io/controller-runtime/pkg/manager/runner" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -49,13 +51,21 @@ const ( var log = logf.RuntimeLog.WithName("manager") +type runnableCache interface { + runner.Runnable + WaitForCacheSync(stop <-chan struct{}) bool +} + type controllerManager struct { + // caches holds all caches this controllerManager handles. They must be started before everything else. + caches []runnableCache + // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. // These Runnables are managed by lead election. - leaderElectionRunnables []Runnable + leaderElectionRunnables []runner.Runnable // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. // These Runnables will not be blocked by lead election. - nonLeaderElectionRunnables []Runnable + nonLeaderElectionRunnables []runner.Runnable // resourceLock forms the basis for leader election resourceLock resourcelock.Interface @@ -106,8 +116,6 @@ type controllerManager struct { // election was configured. elected chan struct{} - startCache func(stop <-chan struct{}) error - // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. @@ -178,7 +186,7 @@ func (r *errSignaler) GotError() chan struct{} { } // Add sets dependencies on i, and adds it to the list of Runnables to start. -func (cm *controllerManager) Add(r Runnable) error { +func (cm *controllerManager) Add(r runner.Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() @@ -190,7 +198,10 @@ func (cm *controllerManager) Add(r Runnable) error { var shouldStart bool // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + if cache, ok := r.(cache.Cache); ok { + shouldStart = cm.started + cm.caches = append(cm.caches, cache) + } else if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else { @@ -449,19 +460,19 @@ func (cm *controllerManager) waitForCache() { return } - // Start the Cache. Allow the function to start the cache to be mocked out for testing - if cm.startCache == nil { - cm.startCache = cm.GetCache().Start + for idx := range cm.caches { + go func(idx int) { + if err := cm.caches[idx].Start(cm.internalStop); err != nil { + cm.errSignal.SignalError(err) + } + }(idx) } - go func() { - if err := cm.startCache(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - }() // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.GetCache().WaitForCacheSync(cm.internalStop) + for _, cache := range cm.caches { + cache.WaitForCacheSync(cm.internalStop) + } cm.started = true } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index cea8f30cf4..6d0cdd41b9 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/clusterconnector" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/leaderelection" + "sigs.k8s.io/controller-runtime/pkg/manager/runner" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/recorder" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -46,7 +47,7 @@ type Manager interface { // implements the inject interface - e.g. inject.Client. // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled). - Add(Runnable) error + Add(runner.Runnable) error // Elected is closed when this manager is elected leader of a group of // managers, either because it won a leader election or because no leader @@ -177,16 +178,6 @@ type Options struct { newHealthProbeListener func(addr string) (net.Listener, error) } -// Runnable allows a component to be started. -// It's very important that Start blocks until -// it's done running. -type Runnable interface { - // Start starts running the component. The component will stop running - // when the channel is closed. Start blocks until the channel is closed or - // an error occurs. - Start(<-chan struct{}) error -} - // RunnableFunc implements Runnable using a function. // It's very important that the given function block // until it's done running. @@ -258,7 +249,7 @@ func New(config *rest.Config, options Options) (Manager, error) { stop := make(chan struct{}) - return &controllerManager{ + cm := &controllerManager{ resourceLock: resourceLock, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, @@ -275,7 +266,13 @@ func New(config *rest.Config, options Options) (Manager, error) { readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, ClusterConnector: clusterConnector, - }, nil + } + + if err := cm.ClusterConnector.AddToManager(cm); err != nil { + return nil, err + } + + return cm, nil } // DefaultNewClient creates the default caching client diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index b90c0f4381..07a5512a1f 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" "sigs.k8s.io/controller-runtime/pkg/leaderelection" fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -41,6 +42,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) +// ControllerManager is a clusterconnector.Manager +var _ clusterconnector.Manager = &controllerManager{} + var _ = Describe("manger.Manager", func() { var stop chan struct{} @@ -320,9 +324,9 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.startCache = func(stop <-chan struct{}) error { + mgr.caches = []runnableCache{&fakeRunnableCache{start: func(stop <-chan struct{}) error { return fmt.Errorf("expected error") - } + }}} Expect(m.Start(stop)).To(MatchError(ContainSubstring("expected error"))) close(done) @@ -956,3 +960,17 @@ func (i *injectable) InjectStopChannel(stop <-chan struct{}) error { func (i *injectable) Start(<-chan struct{}) error { return nil } + +var _ runnableCache = &fakeRunnableCache{} + +type fakeRunnableCache struct { + start func(<-chan struct{}) error +} + +func (frc *fakeRunnableCache) Start(c <-chan struct{}) error { + return frc.start(c) +} + +func (frc *fakeRunnableCache) WaitForCacheSync(_ <-chan struct{}) bool { + return true +} diff --git a/pkg/manager/runner/runner.go b/pkg/manager/runner/runner.go new file mode 100644 index 0000000000..78d113c34f --- /dev/null +++ b/pkg/manager/runner/runner.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runner + +// Runnable allows a component to be started. +// It's very important that Start blocks until +// it's done running. +type Runnable interface { + // Start starts running the component. The component will stop running + // when the channel is closed. Start blocks until the channel is closed or + // an error occurs. + Start(<-chan struct{}) error +}