Skip to content

Commit

Permalink
Start caches before everything else
Browse files Browse the repository at this point in the history
  • Loading branch information
alvaroaleman committed May 15, 2020
1 parent 06e3745 commit 26660e7
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 33 deletions.
32 changes: 31 additions & 1 deletion pkg/clusterconnector/clusterconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/manager/runner"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)

// ClusterConnector contains everything thats needed to build a controller
// that watches and interacts with objects in the given cluster.
type ClusterConnector interface {
// SetFields will set any dependencies on an object for which the object has implemented the inject
// interface - e.g. inject.Client.
Expand Down Expand Up @@ -68,6 +71,9 @@ type ClusterConnector interface {
// This should be used sparingly and only when the client does not fit your
// use case.
GetAPIReader() client.Reader

// AddToManager adds the ClusterConnector to a manager
AddToManager(mgr Manager) error
}

// NewClientFunc allows a user to define how to create a client
Expand All @@ -91,6 +97,7 @@ func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Opt
}, nil
}

// Options holds all possible Options for a ClusterConnector
type Options struct {
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
// Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
Expand Down Expand Up @@ -137,6 +144,7 @@ type Options struct {
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
}

// Apply implements Option
func (o Options) Apply(target *Options) {
if o.Scheme != nil {
target.Scheme = o.Scheme
Expand Down Expand Up @@ -173,6 +181,7 @@ func (o Options) Apply(target *Options) {

var _ Option = Options{}

// Option can be used to customize a ClusterConnector
type Option interface {
Apply(o *Options)
}
Expand Down Expand Up @@ -222,7 +231,28 @@ func setOptionsDefaults(options Options) Options {
return options
}

func New(config *rest.Config, name string, opts ...Option) (ClusterConnector, error) {
// Manager manages the lifecycle of Runnables
type Manager interface {
Add(r runner.Runnable) error
}

// New creates a new ClusterConnector
func New(config *rest.Config, mgr Manager, name string, opts ...Option) (ClusterConnector, error) {
cc, err := NewUnmanaged(config, name, opts...)
if err != nil {
return nil, err
}

if err := cc.AddToManager(mgr); err != nil {
return nil, err
}

return cc, nil
}

// NewUnmanaged creates a new unmanaged ClusterConnector. It must be manually added to a
// Manager by calling its AddToManager.
func NewUnmanaged(config *rest.Config, name string, opts ...Option) (ClusterConnector, error) {
log := logf.RuntimeLog.WithName("clusterconnector").WithValues("name", name)
if config == nil {
return nil, fmt.Errorf("must specify Config")
Expand Down
4 changes: 2 additions & 2 deletions pkg/clusterconnector/clusterconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var _ = Describe("clusterconnector.ClusterConnector", func() {
Describe("New", func() {

It("should return an error it can't create a recorder.Provider", func(done Done) {
m, err := New(cfg, "", Options{
m, err := NewUnmanaged(cfg, "", Options{
newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
return nil, fmt.Errorf("expected error")
},
Expand All @@ -111,7 +111,7 @@ var _ = Describe("clusterconnector.ClusterConnector", func() {

Describe("SetFields", func() {
It("should inject field values", func(done Done) {
c, err := New(cfg, "", Options{})
c, err := NewUnmanaged(cfg, "", Options{})
Expect(err).NotTo(HaveOccurred())
cc, ok := c.(*clusterConnector)
Expect(ok).To(BeTrue())
Expand Down
4 changes: 4 additions & 0 deletions pkg/clusterconnector/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ func (cc *clusterConnector) GetRESTMapper() meta.RESTMapper {
func (cc *clusterConnector) GetAPIReader() client.Reader {
return cc.apiReader
}

func (cc *clusterConnector) AddToManager(mgr Manager) error {
return mgr.Add(cc.GetCache())
}
41 changes: 26 additions & 15 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/clusterconnector"
"sigs.k8s.io/controller-runtime/pkg/healthz"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/manager/runner"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand All @@ -49,13 +51,21 @@ const (

var log = logf.RuntimeLog.WithName("manager")

type runnableCache interface {
runner.Runnable
WaitForCacheSync(stop <-chan struct{}) bool
}

type controllerManager struct {
// caches holds all caches this controllerManager handles. They must be started before everything else.
caches []runnableCache

// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
// These Runnables are managed by lead election.
leaderElectionRunnables []Runnable
leaderElectionRunnables []runner.Runnable
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
// These Runnables will not be blocked by lead election.
nonLeaderElectionRunnables []Runnable
nonLeaderElectionRunnables []runner.Runnable

// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface
Expand Down Expand Up @@ -106,8 +116,6 @@ type controllerManager struct {
// election was configured.
elected chan struct{}

startCache func(stop <-chan struct{}) error

// port is the port that the webhook server serves at.
port int
// host is the hostname that the webhook server binds to.
Expand Down Expand Up @@ -178,7 +186,7 @@ func (r *errSignaler) GotError() chan struct{} {
}

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
func (cm *controllerManager) Add(r runner.Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()

Expand All @@ -190,7 +198,10 @@ func (cm *controllerManager) Add(r Runnable) error {
var shouldStart bool

// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
if cache, ok := r.(cache.Cache); ok {
shouldStart = cm.started
cm.caches = append(cm.caches, cache)
} else if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else {
Expand Down Expand Up @@ -449,19 +460,19 @@ func (cm *controllerManager) waitForCache() {
return
}

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.GetCache().Start
for idx := range cm.caches {
go func(idx int) {
if err := cm.caches[idx].Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}(idx)
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.GetCache().WaitForCacheSync(cm.internalStop)
for _, cache := range cm.caches {
cache.WaitForCacheSync(cm.internalStop)
}
cm.started = true
}

Expand Down
23 changes: 10 additions & 13 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/clusterconnector"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
"sigs.k8s.io/controller-runtime/pkg/manager/runner"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/recorder"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand All @@ -46,7 +47,7 @@ type Manager interface {
// implements the inject interface - e.g. inject.Client.
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
Add(Runnable) error
Add(runner.Runnable) error

// Elected is closed when this manager is elected leader of a group of
// managers, either because it won a leader election or because no leader
Expand Down Expand Up @@ -177,16 +178,6 @@ type Options struct {
newHealthProbeListener func(addr string) (net.Listener, error)
}

// Runnable allows a component to be started.
// It's very important that Start blocks until
// it's done running.
type Runnable interface {
// Start starts running the component. The component will stop running
// when the channel is closed. Start blocks until the channel is closed or
// an error occurs.
Start(<-chan struct{}) error
}

// RunnableFunc implements Runnable using a function.
// It's very important that the given function block
// until it's done running.
Expand Down Expand Up @@ -258,7 +249,7 @@ func New(config *rest.Config, options Options) (Manager, error) {

stop := make(chan struct{})

return &controllerManager{
cm := &controllerManager{
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
Expand All @@ -275,7 +266,13 @@ func New(config *rest.Config, options Options) (Manager, error) {
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
ClusterConnector: clusterConnector,
}, nil
}

if err := cm.ClusterConnector.AddToManager(cm); err != nil {
return nil, err
}

return cm, nil
}

// DefaultNewClient creates the default caching client
Expand Down
22 changes: 20 additions & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/clusterconnector"
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake"
"sigs.k8s.io/controller-runtime/pkg/metrics"
Expand All @@ -41,6 +42,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)

// ControllerManager is a clusterconnector.Manager
var _ clusterconnector.Manager = &controllerManager{}

var _ = Describe("manger.Manager", func() {
var stop chan struct{}

Expand Down Expand Up @@ -320,9 +324,9 @@ var _ = Describe("manger.Manager", func() {
Expect(err).NotTo(HaveOccurred())
mgr, ok := m.(*controllerManager)
Expect(ok).To(BeTrue())
mgr.startCache = func(stop <-chan struct{}) error {
mgr.caches = []runnableCache{&fakeRunnableCache{start: func(stop <-chan struct{}) error {
return fmt.Errorf("expected error")
}
}}}
Expect(m.Start(stop)).To(MatchError(ContainSubstring("expected error")))

close(done)
Expand Down Expand Up @@ -956,3 +960,17 @@ func (i *injectable) InjectStopChannel(stop <-chan struct{}) error {
func (i *injectable) Start(<-chan struct{}) error {
return nil
}

var _ runnableCache = &fakeRunnableCache{}

type fakeRunnableCache struct {
start func(<-chan struct{}) error
}

func (frc *fakeRunnableCache) Start(c <-chan struct{}) error {
return frc.start(c)
}

func (frc *fakeRunnableCache) WaitForCacheSync(_ <-chan struct{}) bool {
return true
}
27 changes: 27 additions & 0 deletions pkg/manager/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runner

// Runnable allows a component to be started.
// It's very important that Start blocks until
// it's done running.
type Runnable interface {
// Start starts running the component. The component will stop running
// when the channel is closed. Start blocks until the channel is closed or
// an error occurs.
Start(<-chan struct{}) error
}

0 comments on commit 26660e7

Please sign in to comment.