From c38c00f7c32d64c49430f81ec8e93433453f66dd Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 16 Jun 2017 17:51:01 -0700 Subject: [PATCH 1/2] lessor: extend leases on promote if expires will be rate limited Instead of unconditionally randomizing, extend leases on promotion if too many leases expire within the same time span. If the server has few leases or spread out expires, there will be no extension. --- lease/lessor.go | 63 +++++++++++++++++++++++++++++++++----------- lease/lessor_test.go | 40 +++++++++++++++++++--------- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index df8596ee3c2..3418cf565ed 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "errors" "math" - "math/rand" "sort" "sync" "sync/atomic" @@ -33,15 +32,14 @@ const ( // NoLease is a special LeaseID representing the absence of a lease. NoLease = LeaseID(0) - // maximum number of leases to revoke per iteration - // TODO: make this configurable? - leaseRevokeRate = 1000 + forever = monotime.Time(math.MaxInt64) ) var ( leaseBucketName = []byte("lease") - forever = monotime.Time(math.MaxInt64) + // maximum number of leases to revoke per second; configurable for tests + leaseRevokeRate = 1000 ErrNotPrimary = errors.New("not a primary lessor") ErrLeaseNotFound = errors.New("lease not found") @@ -327,22 +325,55 @@ func (le *lessor) Promote(extend time.Duration) { // refresh the expiries of all leases. for _, l := range le.leaseMap { - // randomize expiry with 士10%, otherwise leases of same TTL - // will expire all at the same time, - l.refresh(extend + computeRandomDelta(l.ttl)) + l.refresh(extend) } -} -func computeRandomDelta(seconds int64) time.Duration { - var delta int64 - if seconds > 10 { - delta = int64(float64(seconds) * 0.1 * rand.Float64()) - } else { - delta = rand.Int63n(10) + if len(le.leaseMap) < leaseRevokeRate { + // no possibility of lease pile-up + return + } + + // adjust expiries in case of overlap + leases := make([]*Lease, 0, len(le.leaseMap)) + for _, l := range le.leaseMap { + leases = append(leases, l) + } + sort.Sort(leasesByExpiry(leases)) + + baseWindow := leases[0].Remaining() + nextWindow := baseWindow + time.Second + expires := 0 + // have fewer expires than the total revoke rate so piled up leases + // don't consume the entire revoke limit + targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 + for _, l := range leases { + remaining := l.Remaining() + if remaining > nextWindow { + baseWindow = remaining + nextWindow = baseWindow + time.Second + expires = 1 + continue + } + expires++ + if expires <= targetExpiresPerSecond { + continue + } + rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) + // If leases are extended by n seconds, leases n seconds ahead of the + // base window should be extended by only one second. + rateDelay -= float64(remaining - baseWindow) + delay := time.Duration(rateDelay) + nextWindow = baseWindow + delay + l.refresh(delay + extend) } - return time.Duration(delta) * time.Second } +type leasesByExpiry []*Lease + +func (le leasesByExpiry) Len() int { return len(le) } +func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() } +func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] } + func (le *lessor) Demote() { le.mu.Lock() defer le.mu.Unlock() diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 93ea91c881e..e70c56d6b9b 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/coreos/etcd/mvcc/backend" - "github.com/coreos/etcd/pkg/monotime" ) const ( @@ -211,14 +210,24 @@ func TestLessorRenew(t *testing.T) { } } -// TestLessorRenewRandomize ensures Lessor renews with randomized expiry. -func TestLessorRenewRandomize(t *testing.T) { +// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many +// expire at the same time. +func TestLessorRenewExtendPileup(t *testing.T) { + oldRevokeRate := leaseRevokeRate + defer func() { leaseRevokeRate = oldRevokeRate }() + leaseRevokeRate = 10 + dir, be := NewTestBackend(t) defer os.RemoveAll(dir) le := newLessor(be, minLeaseTTL) - for i := LeaseID(1); i <= 10; i++ { - if _, err := le.Grant(i, 3600); err != nil { + ttl := int64(10) + for i := 1; i <= leaseRevokeRate*10; i++ { + if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { + t.Fatal(err) + } + // ttls that overlap spillover for ttl=10 + if _, err := le.Grant(LeaseID(2*i+1), ttl+1); err != nil { t.Fatal(err) } } @@ -232,16 +241,23 @@ func TestLessorRenewRandomize(t *testing.T) { defer be.Close() le = newLessor(be, minLeaseTTL) - now := monotime.Now() - - // extend after recovery should randomize expiries + // extend after recovery should extend expiration on lease pile-up le.Promote(0) + windowCounts := make(map[int64]int) for _, l := range le.leaseMap { - leftSeconds := uint64(float64(l.expiry-now) * float64(1e-9)) - pc := (float64(leftSeconds-3600) / float64(3600)) * 100 - if pc > 10.0 || pc < -10.0 || pc == 0 { // should be within 士10% - t.Fatalf("expected randomized expiry, got %d seconds (ttl: 3600)", leftSeconds) + // round up slightly for baseline ttl + s := int64(l.Remaining().Seconds() + 0.1) + windowCounts[s]++ + } + + for i := ttl; i < ttl+20; i++ { + c := windowCounts[i] + if c > leaseRevokeRate { + t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c) + } + if c < leaseRevokeRate/2 { + t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c) } } } From ac061671d51f5ce0f99ecac9cdb86d04edadb7c6 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 21 Jun 2017 11:57:36 -0700 Subject: [PATCH 2/2] Revert "integration: remove lease exist checking on randomized expiry" This reverts commit 95bc33f37f7c31a4cd06287d44879a60baaee40c. The new lease extension algorithm should pass this test. --- integration/v3_lease_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index b270e1c1b6b..7bb72ba131f 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -66,10 +66,16 @@ func TestV3LeasePrmote(t *testing.T) { // it was going to expire anyway. time.Sleep(3 * time.Second) - // expiring lease should be renewed with randomized delta if !leaseExist(t, clus, lresp.ID) { t.Error("unexpected lease not exists") } + + // let lease expires. total lease = 5 seconds and we already + // waits for 3 seconds, so 3 seconds more is enough. + time.Sleep(3 * time.Second) + if leaseExist(t, clus, lresp.ID) { + t.Error("unexpected lease exists") + } } // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.