From 591a3776c21cf1ab2e25a86446b410ea35b18556 Mon Sep 17 00:00:00 2001 From: micagates Date: Fri, 27 Apr 2018 19:47:33 +0000 Subject: [PATCH] lease: do lease pile-up reduction in the background This moves lease pile-up reduction into a goroutine which mostly operates on a copy of the lease list, to avoid locking. This prevents timeouts when the lessor is locked for a long time (when there are a lot of leases, mostly). This should solve https://github.com/coreos/etcd/issues/9496. We had a problem where when we had a lot of leases (100kish), and a leader election happened the lessor was locked for a long time causing timeouts when we tried to grant a lease. This changes it so that the lessor is locked in batches, which allows for creation of leases while the leader is initializing the lessor. It still won't start expiring leases until it's all done rewriting them, though. Before: ``` BenchmarkLessorPromote1-16 500000 4036 ns/op BenchmarkLessorPromote10-16 500000 3932 ns/op BenchmarkLessorPromote100-16 500000 3954 ns/op BenchmarkLessorPromote1000-16 300000 3906 ns/op BenchmarkLessorPromote10000-16 300000 4639 ns/op BenchmarkLessorPromote100000-16 100 27216481 ns/op BenchmarkLessorPromote1000000-16 100 325164684 ns/op ``` After: ``` BenchmarkLessorPromote1-16 500000 3769 ns/op BenchmarkLessorPromote10-16 500000 3835 ns/op BenchmarkLessorPromote100-16 500000 3829 ns/op BenchmarkLessorPromote1000-16 500000 3665 ns/op BenchmarkLessorPromote10000-16 500000 3800 ns/op BenchmarkLessorPromote100000-16 300000 4114 ns/op BenchmarkLessorPromote1000000-16 300000 5143 ns/op ``` --- lease/lessor.go | 108 ++++++++++++++++++++++--------------- lease/lessor_bench_test.go | 26 +++++++++ lease/lessor_test.go | 19 ++++++- 3 files changed, 108 insertions(+), 45 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index db5b34cc27b6..7a40570bdb49 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -144,6 +144,9 @@ type lessor struct { stopC chan struct{} // doneC is a channel whose closure indicates that the lessor is stopped. doneC chan struct{} + + // when the lease pile-up reduction is done this is true + Ready bool } func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor { @@ -329,7 +332,6 @@ func (le *lessor) unsafeLeases() []*Lease { for _, l := range le.leaseMap { leases = append(leases, l) } - sort.Sort(leasesByExpiry(leases)) return leases } @@ -337,58 +339,76 @@ func (le *lessor) Leases() []*Lease { le.mu.RLock() ls := le.unsafeLeases() le.mu.RUnlock() + sort.Sort(leasesByExpiry(ls)) return ls } func (le *lessor) Promote(extend time.Duration) { le.mu.Lock() - defer le.mu.Unlock() + le.Ready = false + le.mu.Unlock() + go func() { + le.mu.Lock() + le.demotec = make(chan struct{}) + leaseCopy := le.unsafeLeases() + le.mu.Unlock() + var updateList []*LeaseWithTime + defer func() { + le.mu.Lock() + defer le.mu.Unlock() + for _, item := range updateList { + heap.Push(&le.leaseHeap, item) + } + le.Ready = true + }() + + // refresh the expiries of all leases. + for _, l := range leaseCopy { + l.refresh(extend) + // check that the lease hasn't been revoked + item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + updateList = append(updateList, item) + } - le.demotec = make(chan struct{}) + if len(le.leaseMap) < leaseRevokeRate { + // no possibility of lease pile-up + return + } - // refresh the expiries of all leases. - for _, l := range le.leaseMap { - l.refresh(extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} - heap.Push(&le.leaseHeap, item) - } + // adjust expiries in case of overlap - if len(le.leaseMap) < leaseRevokeRate { - // no possibility of lease pile-up - return - } + sort.Sort(leasesByExpiry(leaseCopy)) - // adjust expiries in case of overlap - leases := le.unsafeLeases() - - baseWindow := leases[0].Remaining() - nextWindow := baseWindow + time.Second - expires := 0 - // have fewer expires than the total revoke rate so piled up leases - // don't consume the entire revoke limit - targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 - for _, l := range leases { - remaining := l.Remaining() - if remaining > nextWindow { - baseWindow = remaining - nextWindow = baseWindow + time.Second - expires = 1 - continue - } - expires++ - if expires <= targetExpiresPerSecond { - continue + baseWindow := leaseCopy[0].Remaining() + nextWindow := baseWindow + time.Second + expires := 0 + // have fewer expires than the total revoke rate so piled up leases + // don't consume the entire revoke limit + targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 + + for _, l := range leaseCopy { + remaining := l.Remaining() + if remaining > nextWindow { + baseWindow = remaining + nextWindow = baseWindow + time.Second + expires = 1 + continue + } + expires++ + if expires <= targetExpiresPerSecond { + continue + } + rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) + // If leases are extended by n seconds, leases n seconds ahead of the + // base window should be extended by only one second. + rateDelay -= float64(remaining - baseWindow) + delay := time.Duration(rateDelay) + nextWindow = baseWindow + delay + l.refresh(delay + extend) + item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + updateList = append(updateList, item) } - rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) - // If leases are extended by n seconds, leases n seconds ahead of the - // base window should be extended by only one second. - rateDelay -= float64(remaining - baseWindow) - delay := time.Duration(rateDelay) - nextWindow = baseWindow + delay - l.refresh(delay + extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} - heap.Push(&le.leaseHeap, item) - } + }() } type leasesByExpiry []*Lease @@ -490,7 +510,7 @@ func (le *lessor) runLoop() { revokeLimit := leaseRevokeRate / 2 le.mu.RLock() - if le.isPrimary() { + if le.isPrimary() && le.Ready { ls = le.findExpiredLeases(revokeLimit) } le.mu.RUnlock() diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go index a3be6aa95b25..7a9638fc95a2 100644 --- a/lease/lessor_bench_test.go +++ b/lease/lessor_bench_test.go @@ -17,6 +17,7 @@ package lease import ( "os" "testing" + "time" "github.com/coreos/etcd/mvcc/backend" ) @@ -53,6 +54,14 @@ func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) } func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) } +func BenchmarkLessorPromote1(b *testing.B) { benchmarkLessorPromote(1, b) } +func BenchmarkLessorPromote10(b *testing.B) { benchmarkLessorPromote(10, b) } +func BenchmarkLessorPromote100(b *testing.B) { benchmarkLessorPromote(100, b) } +func BenchmarkLessorPromote1000(b *testing.B) { benchmarkLessorPromote(1000, b) } +func BenchmarkLessorPromote10000(b *testing.B) { benchmarkLessorPromote(10000, b) } +func BenchmarkLessorPromote100000(b *testing.B) { benchmarkLessorPromote(100000, b) } +func BenchmarkLessorPromote1000000(b *testing.B) { benchmarkLessorPromote(1000000, b) } + func benchmarkLessorFindExpired(size int, b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() le := newLessor(be, minLeaseTTL) @@ -115,6 +124,23 @@ func benchmarkLessorRenew(size int, b *testing.B) { } } +func benchmarkLessorPromote(size int, b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + le := newLessor(be, minLeaseTTL) + defer le.Stop() + defer cleanup(be, tmpPath) + for i := 0; i < size; i++ { + le.Grant(LeaseID(i), int64(100+i)) + } + + b.ResetTimer() + + go func() { le.Promote(100 * time.Second) }() + for i := 0; i < b.N; i++ { + le.Grant(LeaseID(i+size), int64(100+i+size)) + } +} + func cleanup(b backend.Backend, path string) { b.Close() os.Remove(path) diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 3a39e846f729..4e995fac286d 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "sort" "sync" "testing" @@ -44,6 +45,7 @@ func TestLessorGrant(t *testing.T) { le := newLessor(be, minLeaseTTL) defer le.Stop() le.Promote(0) + waitForPromotion(le) l, err := le.Grant(1, 1) if err != nil { @@ -205,7 +207,7 @@ func TestLessorRenew(t *testing.T) { le := newLessor(be, minLeaseTTL) defer le.Stop() le.Promote(0) - + waitForPromotion(le) l, err := le.Grant(1, minLeaseTTL) if err != nil { t.Fatalf("failed to grant lease (%v)", err) @@ -263,6 +265,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { // extend after recovery should extend expiration on lease pile-up le.Promote(0) + waitForPromotion(le) windowCounts := make(map[int64]int) for _, l := range le.leaseMap { @@ -360,6 +363,7 @@ func TestLessorExpire(t *testing.T) { defer le.Stop() le.Promote(1 * time.Second) + waitForPromotion(le) l, err := le.Grant(1, testMinTTL) if err != nil { t.Fatalf("failed to create lease: %v", err) @@ -412,6 +416,7 @@ func TestLessorExpireAndDemote(t *testing.T) { defer le.Stop() le.Promote(1 * time.Second) + waitForPromotion(le) l, err := le.Grant(1, testMinTTL) if err != nil { t.Fatalf("failed to create lease: %v", err) @@ -492,3 +497,15 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) { bcfg.Path = filepath.Join(tmpPath, "be") return tmpPath, backend.New(bcfg) } +func waitForPromotion(le *lessor) { + for { + le.mu.RLock() + ready := le.Ready + le.mu.RUnlock() + if ready { + return + } else { + runtime.Gosched() + } + } +}