Skip to content

Commit

Permalink
lease: process renew request via raft
Browse files Browse the repository at this point in the history
Previously, the renew request can only be processed by the leader.
If a follower receives the renew request, it just forwards the
request to the leader via a internal http channel. This isn't
accurate because the leader may change during the process.

When a leader receives the renew request, the previous implementation
follows a three stage workflow: pre-raft, raft and post-raft. It's
too complicated and error prone, and the raft is more like just a
network transport channel instead of a concensus mechanism in this
case.

So we process the renew request via raft directly, it can greatly
simplify the code.
  • Loading branch information
ahrtr committed Jun 10, 2022
1 parent 23d9874 commit 96bf559
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 123 deletions.
11 changes: 6 additions & 5 deletions server/etcdserver/api/v3rpc/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,21 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
// or remote leader.
// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
// at rev 4.
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)
// todo(ahrtr): remove respForLeaseNotFound, we don't need to ErrLeaseNotFound separately.
respForLeaseNotFound := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(respForLeaseNotFound.Header)

ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
resp, err := ls.le.LeaseRenew(stream.Context(), req)
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
respForLeaseNotFound.TTL = 0
resp = respForLeaseNotFound
}

if err != nil {
return togRPCError(err)
}

resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
if isClientCtxErr(stream.Context().Err(), err) {
Expand Down
6 changes: 6 additions & 0 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type applierV3 interface {

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)

Expand Down Expand Up @@ -206,6 +207,11 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return &pb.LeaseRevokeResponse{Header: a.newHeader()}, err
}

func (a *applierV3backend) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
ttl, err := a.lessor.Renew(lease.LeaseID(lc.ID))
return &pb.LeaseKeepAliveResponse{Header: a.newHeader(), ID: lc.ID, TTL: ttl}, err
}

func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
for _, c := range lc.Checkpoints {
err := a.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func (a *applierV3Corrupt) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantRe
func (a *applierV3Corrupt) LeaseRevoke(_ *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
return nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
return nil, errors.ErrCorrupt
}
3 changes: 3 additions & 0 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.LeaseRenew != nil:
op = "LeaseRenew"
ar.Resp, ar.Err = a.applyV3.LeaseRenew(r.LeaseRenew)
case r.Alarm != nil:
op = "Alarm"
ar.Resp, ar.Err = a.Alarm(r.Alarm)
Expand Down
48 changes: 7 additions & 41 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ type Lessor interface {
// LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed.
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)

// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
// LeaseRenew renews the lease.
LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
Expand Down Expand Up @@ -276,45 +275,12 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return resp.(*pb.LeaseRevokeResponse), nil
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
// Throttle in case of e.g. connection problems.
time.Sleep(50 * time.Millisecond)
}

if cctx.Err() == context.DeadlineExceeded {
return -1, errors.ErrTimeout
func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r})
if err != nil {
return nil, err
}
return -1, errors.ErrCanceled
return resp.(*pb.LeaseKeepAliveResponse), err
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
Expand Down
62 changes: 21 additions & 41 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,17 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
revokec: make(chan struct{}),
}

if l.ttl < le.minLeaseTTL {
l.ttl = le.minLeaseTTL
}

le.mu.Lock()
defer le.mu.Unlock()

if _, ok := le.leaseMap[id]; ok {
return nil, ErrLeaseExists
}

if l.ttl < le.minLeaseTTL {
l.ttl = le.minLeaseTTL
}

if le.isPrimary() {
l.refresh(0)
} else {
Expand Down Expand Up @@ -326,6 +326,12 @@ func (le *lessor) Revoke(id LeaseID) error {
le.mu.Unlock()
return ErrLeaseNotFound
}

// We shouldn't delete the lease inside the transaction lock, otherwise
// it may lead to deadlock with Grant or Checkpoint operations, which
// acquire the le.mu firstly and then the batchTx lock.
delete(le.leaseMap, id)

defer close(l.revokec)
// unlock before doing external work
le.mu.Unlock()
Expand All @@ -344,9 +350,6 @@ func (le *lessor) Revoke(id LeaseID) error {
txn.DeleteRange([]byte(key), nil)
}

le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
Expand All @@ -362,6 +365,10 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
le.mu.Lock()
defer le.mu.Unlock()

return le.checkpoint(id, remainingTTL)
}

func (le *lessor) checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
Expand All @@ -388,52 +395,25 @@ func greaterOrEqual(first, second semver.Version) bool {
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
le.mu.RLock()
if !le.isPrimary() {
// forward renew request to primary instead of returning error.
le.mu.RUnlock()
return -1, ErrNotPrimary
}

demotec := le.demotec
le.mu.Lock()
defer le.mu.Unlock()

l := le.leaseMap[id]
if l == nil {
le.mu.RUnlock()
return -1, ErrLeaseNotFound
}
// Clear remaining TTL when we renew if it is set
clearRemainingTTL := le.cp != nil && l.remainingTTL > 0

le.mu.RUnlock()
if l.expired() {
select {
// A expired lease might be pending for revoking or going through
// quorum to be revoked. To be accurate, renew request must wait for the
// deletion to complete.
case <-l.revokec:
return -1, ErrLeaseNotFound
// The expired lease might fail to be revoked if the primary changes.
// The caller will retry on ErrNotPrimary.
case <-demotec:
return -1, ErrNotPrimary
case <-le.stopC:
return -1, ErrNotPrimary
if !le.isPrimary() {
if l.remainingTTL > 0 {
le.checkpoint(id, 0)
}
return l.ttl, nil
}

// Clear remaining TTL when we renew if it is set
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
if clearRemainingTTL {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
}

le.mu.Lock()
le.checkpoint(id, 0)
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.mu.Unlock()

leaseRenewed.Inc()
return l.ttl, nil
Expand Down
45 changes: 10 additions & 35 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,30 +434,18 @@ func TestLessorExpire(t *testing.T) {
t.Fatalf("failed to receive expired lease")
}

donec := make(chan struct{}, 1)
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
t.Errorf("unexpected renew")
}
donec <- struct{}{}
}()

select {
case <-donec:
t.Fatalf("renew finished before lease revocation")
case <-time.After(50 * time.Millisecond):
if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew")
}

// expired lease can be revoked
if err := le.Revoke(l.ID); err != nil {
t.Fatalf("failed to revoke expired lease: %v", err)
}

select {
case <-donec:
case <-time.After(10 * time.Second):
t.Fatalf("renew has not returned after lease revocation")
// revoked lease can't be renewed
if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
t.Errorf("unexpected renew")
}
}

Expand Down Expand Up @@ -487,28 +475,15 @@ func TestLessorExpireAndDemote(t *testing.T) {
t.Fatalf("failed to receive expired lease")
}

donec := make(chan struct{}, 1)
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrNotPrimary {
t.Errorf("unexpected renew: %v", err)
}
donec <- struct{}{}
}()

select {
case <-donec:
t.Fatalf("renew finished before demotion")
case <-time.After(50 * time.Millisecond):
if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew: %v", err)
}

// demote will cause the renew request to fail with ErrNotPrimary
le.Demote()

select {
case <-donec:
case <-time.After(10 * time.Second):
t.Fatalf("renew has not returned after lessor demotion")
// renew should work after demote.
if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew: %v", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func TestV3LeaseFailover(t *testing.T) {

// send keep alive to old leader until the old leader starts
// to drop lease request.
var expectedExp time.Time
expectedExp := time.Now().Add(time.Duration(lresp.TTL) * time.Second)
for {
if err = lac.Send(lreq); err != nil {
break
Expand Down

0 comments on commit 96bf559

Please sign in to comment.