Skip to content

Commit

Permalink
✨ support non-leaderelection Runnanles in controller manager
Browse files Browse the repository at this point in the history
webhook server can be run in the non-leaderelection mode
  • Loading branch information
Mengqi Yu committed May 16, 2019
1 parent 0893332 commit 257089c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 13 deletions.
51 changes: 40 additions & 11 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ type controllerManager struct {
// to scheme.scheme.
scheme *runtime.Scheme

// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
runnables []Runnable
// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
// These Runnables are managed by lead election.
leaderElectionRunnables []Runnable
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
// These Runnables are in HA mode (not blocked by lead election)
nonLeaderElectionRunnables []Runnable

cache cache.Cache

Expand Down Expand Up @@ -121,7 +125,7 @@ type controllerManager struct {
retryPeriod time.Duration
}

// Add sets dependencies on i, and adds it to the list of runnables to start.
// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
Expand All @@ -131,8 +135,13 @@ func (cm *controllerManager) Add(r Runnable) error {
return err
}

// Add the runnable to the list
cm.runnables = append(cm.runnables, r)
// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else {
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
}

if cm.started {
// If already started, start the controller
go func() {
Expand Down Expand Up @@ -254,13 +263,15 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
go cm.serveMetrics(cm.internalStop)
}

go cm.startNonLeaderElectionRunnables()

if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.start()
go cm.startLeaderElectionRunnables()
}

select {
Expand All @@ -273,11 +284,11 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start() {
func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

// Start the Cache. Allow the function to start the cache to be mocked out for testing
// Start the Cache. Allow the function to startLeaderElectionRunnables the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
Expand All @@ -291,8 +302,26 @@ func (cm *controllerManager) start() {
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.started = true
}

func (cm *controllerManager) startLeaderElectionRunnables() {
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
Expand All @@ -312,7 +341,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
cm.start()
cm.startLeaderElectionRunnables()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
Expand Down
13 changes: 11 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import (
// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
// A Manager is required to create Controllers.
type Manager interface {
// Add will set reqeusted dependencies on the component, and cause the component to be
// Add will set requested dependencies on the component, and cause the component to be
// started when Start is called. Add will inject any dependencies for which the argument
// implements the inject interface - e.g. inject.Client
// implements the inject interface - e.g. inject.Client.
// Depending on if a Runnable implements HARunnable interface, a Runnable can be run in either
// HA mode (always running) or non-HA mode (managed by leader election if enabled).
Add(Runnable) error

// SetFields will set any dependencies on an object for which the object has implemented the inject
Expand Down Expand Up @@ -183,6 +185,13 @@ func (r RunnableFunc) Start(s <-chan struct{}) error {
return r(s)
}

// LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode.
type LeaderElectionRunnable interface {
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode.
// e.g. controllers need to be run in leader election mode, while webhook server doesn't.
NeedLeaderElection() bool
}

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Initialize a rest.config if none was specified
Expand Down
5 changes: 5 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ var _ = Describe("manger.Manager", func() {
<-c2
<-c3
})

It("should return an error if any HA Components fail to Start", func() {
// TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429
// Example: ListOptions
})
}

Context("with defaults", func() {
Expand Down

0 comments on commit 257089c

Please sign in to comment.