Skip to content

Commit

Permalink
Merge pull request #8149 from heyitsanthony/lease-sort-promote
Browse files Browse the repository at this point in the history
lessor: extend leases on promote if expires will be rate limited
  • Loading branch information
heyitsanthony committed Jun 22, 2017
2 parents cdc7d77 + ac06167 commit f7df3c8
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 29 deletions.
8 changes: 7 additions & 1 deletion integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,16 @@ func TestV3LeasePrmote(t *testing.T) {
// it was going to expire anyway.
time.Sleep(3 * time.Second)

// expiring lease should be renewed with randomized delta
if !leaseExist(t, clus, lresp.ID) {
t.Error("unexpected lease not exists")
}

// let lease expires. total lease = 5 seconds and we already
// waits for 3 seconds, so 3 seconds more is enough.
time.Sleep(3 * time.Second)
if leaseExist(t, clus, lresp.ID) {
t.Error("unexpected lease exists")
}
}

// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
Expand Down
63 changes: 47 additions & 16 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"encoding/binary"
"errors"
"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
Expand All @@ -33,15 +32,14 @@ const (
// NoLease is a special LeaseID representing the absence of a lease.
NoLease = LeaseID(0)

// maximum number of leases to revoke per iteration
// TODO: make this configurable?
leaseRevokeRate = 1000
forever = monotime.Time(math.MaxInt64)
)

var (
leaseBucketName = []byte("lease")

forever = monotime.Time(math.MaxInt64)
// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000

ErrNotPrimary = errors.New("not a primary lessor")
ErrLeaseNotFound = errors.New("lease not found")
Expand Down Expand Up @@ -327,22 +325,55 @@ func (le *lessor) Promote(extend time.Duration) {

// refresh the expiries of all leases.
for _, l := range le.leaseMap {
// randomize expiry with 士10%, otherwise leases of same TTL
// will expire all at the same time,
l.refresh(extend + computeRandomDelta(l.ttl))
l.refresh(extend)
}
}

func computeRandomDelta(seconds int64) time.Duration {
var delta int64
if seconds > 10 {
delta = int64(float64(seconds) * 0.1 * rand.Float64())
} else {
delta = rand.Int63n(10)
if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}

// adjust expiries in case of overlap
leases := make([]*Lease, 0, len(le.leaseMap))
for _, l := range le.leaseMap {
leases = append(leases, l)
}
sort.Sort(leasesByExpiry(leases))

baseWindow := leases[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
}
expires++
if expires <= targetExpiresPerSecond {
continue
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
}
return time.Duration(delta) * time.Second
}

type leasesByExpiry []*Lease

func (le leasesByExpiry) Len() int { return len(le) }
func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] }

func (le *lessor) Demote() {
le.mu.Lock()
defer le.mu.Unlock()
Expand Down
40 changes: 28 additions & 12 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/monotime"
)

const (
Expand Down Expand Up @@ -211,14 +210,24 @@ func TestLessorRenew(t *testing.T) {
}
}

// TestLessorRenewRandomize ensures Lessor renews with randomized expiry.
func TestLessorRenewRandomize(t *testing.T) {
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
// expire at the same time.
func TestLessorRenewExtendPileup(t *testing.T) {
oldRevokeRate := leaseRevokeRate
defer func() { leaseRevokeRate = oldRevokeRate }()
leaseRevokeRate = 10

dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)

le := newLessor(be, minLeaseTTL)
for i := LeaseID(1); i <= 10; i++ {
if _, err := le.Grant(i, 3600); err != nil {
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
t.Fatal(err)
}
// ttls that overlap spillover for ttl=10
if _, err := le.Grant(LeaseID(2*i+1), ttl+1); err != nil {
t.Fatal(err)
}
}
Expand All @@ -232,16 +241,23 @@ func TestLessorRenewRandomize(t *testing.T) {
defer be.Close()
le = newLessor(be, minLeaseTTL)

now := monotime.Now()

// extend after recovery should randomize expiries
// extend after recovery should extend expiration on lease pile-up
le.Promote(0)

windowCounts := make(map[int64]int)
for _, l := range le.leaseMap {
leftSeconds := uint64(float64(l.expiry-now) * float64(1e-9))
pc := (float64(leftSeconds-3600) / float64(3600)) * 100
if pc > 10.0 || pc < -10.0 || pc == 0 { // should be within 士10%
t.Fatalf("expected randomized expiry, got %d seconds (ttl: 3600)", leftSeconds)
// round up slightly for baseline ttl
s := int64(l.Remaining().Seconds() + 0.1)
windowCounts[s]++
}

for i := ttl; i < ttl+20; i++ {
c := windowCounts[i]
if c > leaseRevokeRate {
t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
}
if c < leaseRevokeRate/2 {
t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
}
}
}
Expand Down

0 comments on commit f7df3c8

Please sign in to comment.