From f3efe498edef7b0a38b42d658012e1b163cf13ab Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Mon, 11 Sep 2017 18:56:20 -0300 Subject: [PATCH] Fix multiple leader election --- core/pkg/ingress/status/status.go | 41 +++++++++----------------- core/pkg/ingress/status/status_test.go | 6 +--- 2 files changed, 15 insertions(+), 32 deletions(-) diff --git a/core/pkg/ingress/status/status.go b/core/pkg/ingress/status/status.go index 7239a7f49f..219642d698 100644 --- a/core/pkg/ingress/status/status.go +++ b/core/pkg/ingress/status/status.go @@ -85,13 +85,11 @@ type statusSync struct { // workqueue used to keep in sync the status IP/s // in the Ingress rules syncQueue *task.Queue - - runLock *sync.Mutex } // Run starts the loop to keep the status in sync func (s statusSync) Run(stopCh <-chan struct{}) { - go wait.Forever(s.elector.Run, 0) + go s.elector.Run() go wait.Forever(s.update, updateInterval) go s.syncQueue.Run(time.Second, stopCh) <-stopCh @@ -140,9 +138,6 @@ func (s statusSync) Shutdown() { } func (s *statusSync) sync(key interface{}) error { - s.runLock.Lock() - defer s.runLock.Unlock() - if s.syncQueue.IsShuttingDown() { glog.V(2).Infof("skipping Ingress status update (shutting down in progress)") return nil @@ -162,18 +157,6 @@ func (s *statusSync) sync(key interface{}) error { return nil } -// callback invoked function when a new leader is elected -func (s *statusSync) callback(leader string) { - if s.syncQueue.IsShuttingDown() { - return - } - - glog.V(2).Infof("new leader elected (%v)", leader) - if leader == s.pod.Name { - glog.V(2).Infof("I am the new status update leader") - } -} - func (s statusSync) keyfunc(input interface{}) (interface{}, error) { return input, nil } @@ -186,9 +169,9 @@ func NewStatusSyncer(config Config) Sync { } st := statusSync{ - pod: pod, - runLock: &sync.Mutex{}, - Config: config, + pod: pod, + + Config: config, } st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc) @@ -201,10 +184,13 @@ func NewStatusSyncer(config Config) Sync { callbacks := leaderelection.LeaderCallbacks{ OnStartedLeading: func(stop <-chan struct{}) { - st.callback(pod.Name) + glog.V(2).Infof("I am the new status update leader") }, OnStoppedLeading: func() { - st.callback("") + glog.V(2).Infof("I am not status update leader anymore") + }, + OnNewLeader: func(identity string) { + glog.Infof("new leader elected: %v", identity) }, } @@ -220,16 +206,17 @@ func NewStatusSyncer(config Config) Sync { ConfigMapMeta: meta_v1.ObjectMeta{Namespace: pod.Namespace, Name: electionID}, Client: config.Client.CoreV1(), LockConfig: resourcelock.ResourceLockConfig{ - Identity: electionID, + Identity: pod.Name, EventRecorder: recorder, }, } + ttl := 30 * time.Second le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: &lock, - LeaseDuration: 30 * time.Second, - RenewDeadline: 15 * time.Second, - RetryPeriod: 5 * time.Second, + LeaseDuration: ttl, + RenewDeadline: ttl / 2, + RetryPeriod: ttl / 4, Callbacks: callbacks, }) diff --git a/core/pkg/ingress/status/status_test.go b/core/pkg/ingress/status/status_test.go index 17991d6be3..d248deb843 100644 --- a/core/pkg/ingress/status/status_test.go +++ b/core/pkg/ingress/status/status_test.go @@ -19,7 +19,6 @@ package status import ( "os" "sort" - "sync" "testing" "time" @@ -245,7 +244,6 @@ func buildStatusSync() statusSync { "lable_sig": "foo_pod", }, }, - runLock: &sync.Mutex{}, syncQueue: task.NewTaskQueue(fakeSynFn), Config: Config{ Client: buildSimpleClientSet(), @@ -328,9 +326,7 @@ func TestStatusActions(t *testing.T) { } func TestCallback(t *testing.T) { - fk := buildStatusSync() - // do nothing - fk.callback("foo_base_pod") + buildStatusSync() } func TestKeyfunc(t *testing.T) {