Skip to content

Commit

Permalink
Refactor leader election in manager.Start
Browse files Browse the repository at this point in the history
  • Loading branch information
JoelSpeed committed Oct 19, 2018
1 parent 6eabf5c commit 35f10c9
Showing 1 changed file with 34 additions and 31 deletions.
65 changes: 34 additions & 31 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,41 +158,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
if cm.resourceLock == nil {
go cm.start(stop)
select {
case <-stop:
// we are done
return nil
case err := <-cm.errChan:
// Error starting a controller
if cm.resourceLock != nil {
err := cm.startLeaderElection(stop)
if err != nil {
return err
}
} else {
go cm.start(stop)
}

l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
// TODO(joelspeed): These timings should be configurable
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: cm.start,
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- fmt.Errorf("leader election lost")
},
},
})
if err != nil {
return err
}

go l.Run()

select {
case <-stop:
// We are done
Expand Down Expand Up @@ -243,3 +217,32 @@ func (cm *controllerManager) start(stop <-chan struct{}) {
return
}
}

func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
// TODO(joelspeed): These timings should be configurable
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ <-chan struct{}) {
cm.start(stop)
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- fmt.Errorf("leader election lost")
},
},
})
if err != nil {
return err
}

// Start the leader elector process
go l.Run()
return nil
}

0 comments on commit 35f10c9

Please sign in to comment.