From 49dcdc998beb0f1ddf32ca66bdd013cfa158f45b Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 7 Jun 2022 09:58:16 +0800 Subject: [PATCH] lease: process renew request via raft Previously, the renew request can only be processed by the leader. If a follower receives the renew request, it just forwards the request to the leader via a internal http channel. This isn't accurate because the leader may change during the process. When a leader receives the renew request, the previous implementation follows a three stage workflow: pre-raft, raft and post-raft. It's too complicated and error prone, and the raft is more like just a network transport channel instead of a concensus mechanism in this case. So we process the renew request via raft directly, it can greatly simplify the code. --- server/etcdserver/api/v3rpc/lease.go | 11 +++-- server/etcdserver/apply/apply.go | 6 +++ server/etcdserver/apply/corrupt.go | 4 ++ server/etcdserver/apply/uber_applier.go | 3 ++ server/etcdserver/v3_server.go | 48 +++---------------- server/lease/lessor.go | 62 +++++++++---------------- server/lease/lessor_test.go | 45 ++++-------------- tests/integration/v3_lease_test.go | 2 +- 8 files changed, 58 insertions(+), 123 deletions(-) diff --git a/server/etcdserver/api/v3rpc/lease.go b/server/etcdserver/api/v3rpc/lease.go index e123dd2a37ca..d66b95318428 100644 --- a/server/etcdserver/api/v3rpc/lease.go +++ b/server/etcdserver/api/v3rpc/lease.go @@ -129,20 +129,21 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro // or remote leader. // Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded // at rev 4. - resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}} - ls.hdr.fill(resp.Header) + // todo(ahrtr): remove respForLeaseNotFound, we don't need to ErrLeaseNotFound separately. + respForLeaseNotFound := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}} + ls.hdr.fill(respForLeaseNotFound.Header) - ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID)) + resp, err := ls.le.LeaseRenew(stream.Context(), req) if err == lease.ErrLeaseNotFound { err = nil - ttl = 0 + respForLeaseNotFound.TTL = 0 + resp = respForLeaseNotFound } if err != nil { return togRPCError(err) } - resp.TTL = ttl err = stream.Send(resp) if err != nil { if isClientCtxErr(stream.Context().Err(), err) { diff --git a/server/etcdserver/apply/apply.go b/server/etcdserver/apply/apply.go index 9fe77e91f4c7..d6e8c4363ee1 100644 --- a/server/etcdserver/apply/apply.go +++ b/server/etcdserver/apply/apply.go @@ -78,6 +78,7 @@ type applierV3 interface { LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) + LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) @@ -206,6 +207,11 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo return &pb.LeaseRevokeResponse{Header: a.newHeader()}, err } +func (a *applierV3backend) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) { + ttl, err := a.lessor.Renew(lease.LeaseID(lc.ID)) + return &pb.LeaseKeepAliveResponse{Header: a.newHeader(), ID: lc.ID, TTL: ttl}, err +} + func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) { for _, c := range lc.Checkpoints { err := a.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL) diff --git a/server/etcdserver/apply/corrupt.go b/server/etcdserver/apply/corrupt.go index 040f294aebad..f81836701eab 100644 --- a/server/etcdserver/apply/corrupt.go +++ b/server/etcdserver/apply/corrupt.go @@ -56,3 +56,7 @@ func (a *applierV3Corrupt) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantRe func (a *applierV3Corrupt) LeaseRevoke(_ *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { return nil, errors.ErrCorrupt } + +func (a *applierV3Corrupt) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) { + return nil, errors.ErrCorrupt +} diff --git a/server/etcdserver/apply/uber_applier.go b/server/etcdserver/apply/uber_applier.go index 50f8ba4b15eb..5e5c035b0128 100644 --- a/server/etcdserver/apply/uber_applier.go +++ b/server/etcdserver/apply/uber_applier.go @@ -174,6 +174,9 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s case r.LeaseCheckpoint != nil: op = "LeaseCheckpoint" ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) + case r.LeaseRenew != nil: + op = "LeaseRenew" + ar.Resp, ar.Err = a.applyV3.LeaseRenew(r.LeaseRenew) case r.Alarm != nil: op = "Alarm" ar.Resp, ar.Err = a.Alarm(r.Alarm) diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 63a190e6ed69..b8d31885b651 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -68,9 +68,8 @@ type Lessor interface { // LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed. LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) - // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error - // is returned. - LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) + // LeaseRenew renews the lease. + LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) // LeaseTimeToLive retrieves lease information. LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) @@ -276,45 +275,12 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) return resp.(*pb.LeaseRevokeResponse), nil } -func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { - if s.isLeader() { - if err := s.waitAppliedIndex(); err != nil { - return 0, err - } - - ttl, err := s.lessor.Renew(id) - if err == nil { // already requested to primary lessor(leader) - return ttl, nil - } - if err != lease.ErrNotPrimary { - return -1, err - } - } - - cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) - defer cancel() - - // renewals don't go through raft; forward to leader manually - for cctx.Err() == nil { - leader, lerr := s.waitLeader(cctx) - if lerr != nil { - return -1, lerr - } - for _, url := range leader.PeerURLs { - lurl := url + leasehttp.LeasePrefix - ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) - if err == nil || err == lease.ErrLeaseNotFound { - return ttl, err - } - } - // Throttle in case of e.g. connection problems. - time.Sleep(50 * time.Millisecond) - } - - if cctx.Err() == context.DeadlineExceeded { - return -1, errors.ErrTimeout +func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) { + resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r}) + if err != nil { + return nil, err } - return -1, errors.ErrCanceled + return resp.(*pb.LeaseKeepAliveResponse), err } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 78ed1472edd6..75720f8b32c0 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -286,6 +286,10 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { revokec: make(chan struct{}), } + if l.ttl < le.minLeaseTTL { + l.ttl = le.minLeaseTTL + } + le.mu.Lock() defer le.mu.Unlock() @@ -293,10 +297,6 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, ErrLeaseExists } - if l.ttl < le.minLeaseTTL { - l.ttl = le.minLeaseTTL - } - if le.isPrimary() { l.refresh(0) } else { @@ -326,6 +326,12 @@ func (le *lessor) Revoke(id LeaseID) error { le.mu.Unlock() return ErrLeaseNotFound } + + // We shouldn't delete the lease inside the transaction lock, otherwise + // it may lead to deadlock with Grant or Checkpoint operations, which + // acquire the le.mu firstly and then the batchTx lock. + delete(le.leaseMap, id) + defer close(l.revokec) // unlock before doing external work le.mu.Unlock() @@ -344,9 +350,6 @@ func (le *lessor) Revoke(id LeaseID) error { txn.DeleteRange([]byte(key), nil) } - le.mu.Lock() - defer le.mu.Unlock() - delete(le.leaseMap, l.ID) // lease deletion needs to be in the same backend transaction with the // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. @@ -362,6 +365,10 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { le.mu.Lock() defer le.mu.Unlock() + return le.checkpoint(id, remainingTTL) +} + +func (le *lessor) checkpoint(id LeaseID, remainingTTL int64) error { if l, ok := le.leaseMap[id]; ok { // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry l.remainingTTL = remainingTTL @@ -388,52 +395,25 @@ func greaterOrEqual(first, second semver.Version) bool { // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { - le.mu.RLock() - if !le.isPrimary() { - // forward renew request to primary instead of returning error. - le.mu.RUnlock() - return -1, ErrNotPrimary - } - - demotec := le.demotec + le.mu.Lock() + defer le.mu.Unlock() l := le.leaseMap[id] if l == nil { - le.mu.RUnlock() return -1, ErrLeaseNotFound } - // Clear remaining TTL when we renew if it is set - clearRemainingTTL := le.cp != nil && l.remainingTTL > 0 - le.mu.RUnlock() - if l.expired() { - select { - // A expired lease might be pending for revoking or going through - // quorum to be revoked. To be accurate, renew request must wait for the - // deletion to complete. - case <-l.revokec: - return -1, ErrLeaseNotFound - // The expired lease might fail to be revoked if the primary changes. - // The caller will retry on ErrNotPrimary. - case <-demotec: - return -1, ErrNotPrimary - case <-le.stopC: - return -1, ErrNotPrimary + if !le.isPrimary() { + if l.remainingTTL > 0 { + le.checkpoint(id, 0) } + return l.ttl, nil } - // Clear remaining TTL when we renew if it is set - // By applying a RAFT entry only when the remainingTTL is already set, we limit the number - // of RAFT entries written per lease to a max of 2 per checkpoint interval. - if clearRemainingTTL { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) - } - - le.mu.Lock() + le.checkpoint(id, 0) l.refresh(0) item := &LeaseWithTime{id: l.ID, time: l.expiry} le.leaseExpiredNotifier.RegisterOrUpdate(item) - le.mu.Unlock() leaseRenewed.Inc() return l.ttl, nil diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 58a36e612323..9618fddc75db 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -434,19 +434,8 @@ func TestLessorExpire(t *testing.T) { t.Fatalf("failed to receive expired lease") } - donec := make(chan struct{}, 1) - go func() { - // expired lease cannot be renewed - if _, err := le.Renew(l.ID); err != ErrLeaseNotFound { - t.Errorf("unexpected renew") - } - donec <- struct{}{} - }() - - select { - case <-donec: - t.Fatalf("renew finished before lease revocation") - case <-time.After(50 * time.Millisecond): + if _, err := le.Renew(l.ID); err != nil { + t.Errorf("unexpected renew") } // expired lease can be revoked @@ -454,10 +443,9 @@ func TestLessorExpire(t *testing.T) { t.Fatalf("failed to revoke expired lease: %v", err) } - select { - case <-donec: - case <-time.After(10 * time.Second): - t.Fatalf("renew has not returned after lease revocation") + // revoked lease can't be renewed + if _, err := le.Renew(l.ID); err != ErrLeaseNotFound { + t.Errorf("unexpected renew") } } @@ -487,28 +475,15 @@ func TestLessorExpireAndDemote(t *testing.T) { t.Fatalf("failed to receive expired lease") } - donec := make(chan struct{}, 1) - go func() { - // expired lease cannot be renewed - if _, err := le.Renew(l.ID); err != ErrNotPrimary { - t.Errorf("unexpected renew: %v", err) - } - donec <- struct{}{} - }() - - select { - case <-donec: - t.Fatalf("renew finished before demotion") - case <-time.After(50 * time.Millisecond): + if _, err := le.Renew(l.ID); err != nil { + t.Errorf("unexpected renew: %v", err) } - // demote will cause the renew request to fail with ErrNotPrimary le.Demote() - select { - case <-donec: - case <-time.After(10 * time.Second): - t.Fatalf("renew has not returned after lessor demotion") + // renew should work after demote. + if _, err := le.Renew(l.ID); err != nil { + t.Errorf("unexpected renew: %v", err) } } diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index b2b7efbf6d9f..6ce597029b1b 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -746,7 +746,7 @@ func TestV3LeaseFailover(t *testing.T) { // send keep alive to old leader until the old leader starts // to drop lease request. - var expectedExp time.Time + expectedExp := time.Now().Add(time.Duration(lresp.TTL) * time.Second) for { if err = lac.Send(lreq); err != nil { break