Skip to content

Commit

Permalink
lease: Add new configuration flag to toggle persisting expiration of …
Browse files Browse the repository at this point in the history
…leases

Add a persist expiry flag that allows the expiry field of leases to be
persisted along with the ID and ttl. Defaults to false.

Addresses #9496
  • Loading branch information
braintreeps committed Apr 23, 2018
1 parent 353f938 commit 4f56765
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 47 deletions.
4 changes: 4 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ type Config struct {
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`

PersistExpiry bool `json:"persist-expiry"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`

Expand Down Expand Up @@ -327,6 +329,8 @@ func NewConfig() *Config {

PreVote: false, // TODO: enable by default in v3.5

PersistExpiry: false,

loggerMu: new(sync.RWMutex),
logger: nil,
Logger: "capnslog",
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
LoggerConfig: cfg.loggerConfig,
Debug: cfg.Debug,
ForceNewCluster: cfg.ForceNewCluster,
PersistExpiry: cfg.PersistExpiry,
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")

fs.BoolVar(&cfg.ec.PersistExpiry, "persist-expiry", cfg.ec.PersistExpiry, "Persist expiry values for leases.")

// unsafe
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")

Expand Down
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type ServerConfig struct {
Debug bool

ForceNewCluster bool

PersistExpiry bool
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())), cfg.PersistExpiry)
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
Expand Down
6 changes: 3 additions & 3 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestRenewHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(be, int64(5), false)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand All @@ -56,7 +56,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(be, int64(5), false)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down Expand Up @@ -97,7 +97,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(be, int64(5), false)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down
55 changes: 42 additions & 13 deletions lease/leasepb/lease.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lease/leasepb/lease.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ option (gogoproto.goproto_enum_prefix_all) = false;
message Lease {
int64 ID = 1;
int64 TTL = 2;
int64 Expiry = 3;
}

message LeaseInternalRequest {
Expand Down
47 changes: 34 additions & 13 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ type lessor struct {
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}

persistExpiry bool
}

func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
return newLessor(b, minLeaseTTL)
func NewLessor(b backend.Backend, minLeaseTTL int64, persistExpiry bool) Lessor {
return newLessor(b, minLeaseTTL, persistExpiry)
}

func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
func newLessor(b backend.Backend, minLeaseTTL int64, persistExpiry bool) *lessor {
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
Expand All @@ -167,7 +169,9 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
persistExpiry: persistExpiry,
}

l.initAndRecover()

go l.runLoop()
Expand Down Expand Up @@ -228,7 +232,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
l.ttl = le.minLeaseTTL
}

if le.isPrimary() {
if le.isPrimary() || le.persistExpiry {
l.refresh(0)
} else {
l.forever()
Expand All @@ -237,7 +241,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
le.leaseMap[id] = l
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
l.persistTo(le.b)
l.persistTo(le.b, le.persistExpiry)

return l, nil
}
Expand Down Expand Up @@ -321,6 +325,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
l.refresh(0)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
if le.persistExpiry {
l.persistTo(le.b, le.persistExpiry)
}
return l.ttl, nil
}

Expand Down Expand Up @@ -351,7 +358,9 @@ func (le *lessor) Promote(extend time.Duration) {
defer le.mu.Unlock()

le.demotec = make(chan struct{})

if le.persistExpiry {
return
}
// refresh the expiries of all leases.
for _, l := range le.leaseMap {
l.refresh(extend)
Expand Down Expand Up @@ -407,9 +416,11 @@ func (le *lessor) Demote() {
le.mu.Lock()
defer le.mu.Unlock()

// set the expiries of all leases to forever
for _, l := range le.leaseMap {
l.forever()
if !le.persistExpiry {
// set the expiries of all leases to forever
for _, l := range le.leaseMap {
l.forever()
}
}

if le.demotec != nil {
Expand Down Expand Up @@ -597,13 +608,19 @@ func (le *lessor) initAndRecover() {
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
}
var expiry time.Time
if le.persistExpiry && lpb.Expiry != 0{
expiry = time.Unix(0, lpb.Expiry)
}else{
expiry = forever
}
le.leaseMap[ID] = &Lease{
ID: ID,
ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
expiry: expiry,
revokec: make(chan struct{}),
}
}
Expand Down Expand Up @@ -631,10 +648,14 @@ func (l *Lease) expired() bool {
return l.Remaining() <= 0
}

func (l *Lease) persistTo(b backend.Backend) {
func (l *Lease) persistTo(b backend.Backend, expiry bool) {
key := int64ToBytes(int64(l.ID))

lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)}
var lpb leasepb.Lease
if expiry{
lpb = leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl), Expiry: l.expiry.UnixNano()}
}else{
lpb = leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)}
}
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
Expand Down
8 changes: 4 additions & 4 deletions lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000,

func benchmarkLessorFindExpired(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(be, minLeaseTTL, false)
defer le.Stop()
defer cleanup(be, tmpPath)
le.Promote(0)
Expand All @@ -72,7 +72,7 @@ func benchmarkLessorFindExpired(size int, b *testing.B) {

func benchmarkLessorGrant(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(be, minLeaseTTL, false)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
Expand All @@ -86,7 +86,7 @@ func benchmarkLessorGrant(size int, b *testing.B) {

func benchmarkLessorRevoke(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(be, minLeaseTTL, false)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
Expand All @@ -103,7 +103,7 @@ func benchmarkLessorRevoke(size int, b *testing.B) {

func benchmarkLessorRenew(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(be, minLeaseTTL, false)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
Expand Down
Loading

0 comments on commit 4f56765

Please sign in to comment.