Skip to content

Commit

Permalink
Merge pull request #114872 from Iceber/fix_acquire_leader
Browse files Browse the repository at this point in the history
client-go: fix the wait time for trying to acquire the leader lease

Kubernetes-commit: 6e213e7390b60c6db85210f322a213768cefb172
  • Loading branch information
k8s-publishing-bot committed Feb 24, 2023
2 parents ba2fddd + 899a109 commit 6e69fba
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
6 changes: 3 additions & 3 deletions tools/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
now := metav1.Now()
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1,
Expand All @@ -312,7 +312,7 @@ func (le *LeaderElector) release() bool {
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
Expand Down Expand Up @@ -344,7 +344,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
Expand Down
51 changes: 44 additions & 7 deletions tools/leaderelection/leaderelection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ type Reactor struct {
}

func testTryAcquireOrRenew(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour)
clock := clock.RealClock{}
future := clock.Now().Add(1000 * time.Hour)
past := clock.Now().Add(-1000 * time.Hour)

tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
retryAfter time.Duration
reactors []Reactor
expectedEvents []string

Expand Down Expand Up @@ -127,6 +129,33 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led object with the lease duration seconds",
reactors: []Reactor{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
},
},
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
},
},
{
verb: "update",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
retryAfter: 3 * time.Second,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from unled object",
reactors: []Reactor{
Expand Down Expand Up @@ -283,10 +312,17 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
clock: clock,
}
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
if test.retryAfter != 0 {
time.Sleep(test.retryAfter)
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
} else {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
}

le.observedRecord.AcquireTime = metav1.Time{}
Expand Down Expand Up @@ -378,8 +414,9 @@ func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRec
}

func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour)
clock := clock.RealClock{}
future := clock.Now().Add(1000 * time.Hour)
past := clock.Now().Add(-1000 * time.Hour)
primaryType, secondaryType := multiLockType(t, objectType)
tests := []struct {
name string
Expand Down Expand Up @@ -838,7 +875,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
observedRecord: test.observedRecord,
observedRawRecord: test.observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
clock: clock,
}
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
Expand Down

0 comments on commit 6e69fba

Please sign in to comment.