From 899a60af9b18ca5f34a1d472e06590430bf59a0d Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 7 Mar 2023 14:52:16 +0800 Subject: [PATCH] lease: process renew request via raft Previously, the renew request can only be processed by the leader. If a follower receives the renew request, it just forwards the request to the leader via a internal http channel. This isn't accurate because the leader may change during the process. When a leader receives the renew request, the previous implementation follows a three stage workflow: pre-raft, raft and post-raft. It's too complicated and error prone, and the raft is more like just a network transport channel instead of a concensus mechanism in this case. So we process the renew request via raft directly, it can greatly simplify the code. Signed-off-by: Benjamin Wang --- server/etcdserver/api/v3rpc/lease.go | 17 +---- server/etcdserver/apply/apply.go | 6 ++ server/etcdserver/apply/corrupt.go | 4 ++ server/etcdserver/apply/uber_applier.go | 3 + server/etcdserver/v3_server.go | 17 +++-- server/lease/lessor.go | 44 ++++++++++-- server/lease/lessor_test.go | 91 +++++++++++++++++++++++-- tests/integration/v3_lease_test.go | 2 +- 8 files changed, 153 insertions(+), 31 deletions(-) diff --git a/server/etcdserver/api/v3rpc/lease.go b/server/etcdserver/api/v3rpc/lease.go index e123dd2a37c..71d58ddefc1 100644 --- a/server/etcdserver/api/v3rpc/lease.go +++ b/server/etcdserver/api/v3rpc/lease.go @@ -123,26 +123,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro return err } - // Create header before we sent out the renew request. - // This can make sure that the revision is strictly smaller or equal to - // when the keepalive happened at the local server (when the local server is the leader) - // or remote leader. - // Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded - // at rev 4. - resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}} - ls.hdr.fill(resp.Header) - - ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID)) - if err == lease.ErrLeaseNotFound { - err = nil - ttl = 0 - } - + resp, err := ls.le.LeaseRenew(stream.Context(), req) if err != nil { return togRPCError(err) } - resp.TTL = ttl err = stream.Send(resp) if err != nil { if isClientCtxErr(stream.Context().Err(), err) { diff --git a/server/etcdserver/apply/apply.go b/server/etcdserver/apply/apply.go index 058870b1dc2..242d9d3334a 100644 --- a/server/etcdserver/apply/apply.go +++ b/server/etcdserver/apply/apply.go @@ -79,6 +79,7 @@ type applierV3 interface { LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) + LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) @@ -207,6 +208,11 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo return &pb.LeaseRevokeResponse{Header: a.newHeader()}, err } +func (a *applierV3backend) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) { + ttl, err := a.lessor.Renew(lease.LeaseID(lc.ID)) + return &pb.LeaseKeepAliveResponse{Header: a.newHeader(), ID: lc.ID, TTL: ttl}, err +} + func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) { for _, c := range lc.Checkpoints { err := a.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL) diff --git a/server/etcdserver/apply/corrupt.go b/server/etcdserver/apply/corrupt.go index 040f294aeba..f81836701ea 100644 --- a/server/etcdserver/apply/corrupt.go +++ b/server/etcdserver/apply/corrupt.go @@ -56,3 +56,7 @@ func (a *applierV3Corrupt) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantRe func (a *applierV3Corrupt) LeaseRevoke(_ *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { return nil, errors.ErrCorrupt } + +func (a *applierV3Corrupt) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) { + return nil, errors.ErrCorrupt +} diff --git a/server/etcdserver/apply/uber_applier.go b/server/etcdserver/apply/uber_applier.go index 201defa385b..7074fd67e60 100644 --- a/server/etcdserver/apply/uber_applier.go +++ b/server/etcdserver/apply/uber_applier.go @@ -172,6 +172,9 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s case r.LeaseRevoke != nil: op = "LeaseRevoke" ar.Resp, ar.Err = a.applyV3.LeaseRevoke(r.LeaseRevoke) + case r.LeaseRenew != nil: + op = "LeaseRenew" + ar.Resp, ar.Err = a.applyV3.LeaseRenew(r.LeaseRenew) case r.LeaseCheckpoint != nil: op = "LeaseCheckpoint" ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 4f1cd6b13ee..209682fcaba 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -68,9 +68,8 @@ type Lessor interface { // LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed. LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) - // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error - // is returned. - LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) + // LeaseRenew renews the lease. + LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) // LeaseTimeToLive retrieves lease information. LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) @@ -276,13 +275,13 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) return resp.(*pb.LeaseRevokeResponse), nil } -func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { +func (s *EtcdServer) LeaseRenewV2(ctx context.Context, id lease.LeaseID) (int64, error) { if s.isLeader() { if err := s.waitAppliedIndex(); err != nil { return 0, err } - ttl, err := s.lessor.Renew(id) + ttl, err := s.lessor.RenewV2(id) if err == nil { // already requested to primary lessor(leader) return ttl, nil } @@ -317,6 +316,14 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e return -1, errors.ErrCanceled } +func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) { + resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r}) + if err != nil { + return nil, err + } + return resp.(*pb.LeaseKeepAliveResponse), err +} + func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { if s.isLeader() { if err := s.waitAppliedIndex(); err != nil { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 860de54f45f..85e69eb7940 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -119,8 +119,13 @@ type Lessor interface { // Demote demotes the lessor from being the primary lessor. Demote() - // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, - // an error will be returned. + // RenewV2 renews a lease with given ID. It returns the renewed TTL. + // If the ID does not exist, an error will be returned. + // TODO(ahrtr): remove this legacy method in 3.7. + RenewV2(id LeaseID) (int64, error) + + // Renew renews a lease with given ID. It returns the renewed TTL. + // If the given lease does not exist, an error will be returned. Renew(id LeaseID) (int64, error) // Lookup gives the lease at a given lease id, if any @@ -364,7 +369,10 @@ func (le *lessor) Revoke(id LeaseID) error { func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { le.mu.Lock() defer le.mu.Unlock() + return le.checkpoint(id, remainingTTL) +} +func (le *lessor) checkpoint(id LeaseID, remainingTTL int64) error { if l, ok := le.leaseMap[id]; ok { // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry l.remainingTTL = remainingTTL @@ -388,9 +396,10 @@ func greaterOrEqual(first, second semver.Version) bool { return !version.LessThan(first, second) } -// Renew renews an existing lease. If the given lease does not exist or +// RenewV2 renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. -func (le *lessor) Renew(id LeaseID) (int64, error) { +// TODO(ahrtr): remove the legacy method in 3.7 +func (le *lessor) RenewV2(id LeaseID) (int64, error) { le.mu.RLock() if !le.isPrimary() { // forward renew request to primary instead of returning error. @@ -442,6 +451,31 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { return l.ttl, nil } +func (le *lessor) Renew(id LeaseID) (int64, error) { + le.mu.Lock() + defer le.mu.Unlock() + + l := le.leaseMap[id] + if l == nil { + return -1, ErrLeaseNotFound + } + + if !le.isPrimary() { + if l.remainingTTL > 0 { + le.checkpoint(id, 0) + } + return l.ttl, nil + } + + le.checkpoint(id, 0) + l.refresh(0) + item := &LeaseWithTime{id: l.ID, time: l.expiry} + le.leaseExpiredNotifier.RegisterOrUpdate(item) + + leaseRenewed.Inc() + return l.ttl, nil +} + func (le *lessor) Lookup(id LeaseID) *Lease { le.mu.RLock() defer le.mu.RUnlock() @@ -842,6 +876,8 @@ func (fl *FakeLessor) Promote(extend time.Duration) {} func (fl *FakeLessor) Demote() {} +func (fl *FakeLessor) RenewV2(id LeaseID) (int64, error) { return 10, nil } + func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil } func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil } diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index ae9ad52e820..871c7427eb4 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -426,7 +426,8 @@ func TestLessorRecover(t *testing.T) { } } -func TestLessorExpire(t *testing.T) { +// TODO(ahrtr): remove this test case when the legacy `RenewV2` is removed. +func TestLessorExpireV2(t *testing.T) { lg := zap.NewNop() dir, be := NewTestBackend(t) defer os.RemoveAll(dir) @@ -455,7 +456,7 @@ func TestLessorExpire(t *testing.T) { donec := make(chan struct{}, 1) go func() { // expired lease cannot be renewed - if _, err := le.Renew(l.ID); err != ErrLeaseNotFound { + if _, err := le.RenewV2(l.ID); err != ErrLeaseNotFound { t.Errorf("unexpected renew") } donec <- struct{}{} @@ -479,7 +480,49 @@ func TestLessorExpire(t *testing.T) { } } -func TestLessorExpireAndDemote(t *testing.T) { +func TestLessorExpire(t *testing.T) { + lg := zap.NewNop() + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + testMinTTL := int64(1) + + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL}) + defer le.Stop() + + le.Promote(1 * time.Second) + l, err := le.Grant(1, testMinTTL) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + select { + case el := <-le.ExpiredLeasesC(): + if el[0].ID != l.ID { + t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to receive expired lease") + } + + if _, err := le.Renew(l.ID); err != nil { + t.Errorf("unexpected renew") + } + + // expired lease can be revoked + if err := le.Revoke(l.ID); err != nil { + t.Fatalf("failed to revoke expired lease: %v", err) + } + + // revoked lease can't be renewed + if _, err := le.Renew(l.ID); err != ErrLeaseNotFound { + t.Errorf("unexpected renew") + } +} + +// TODO(ahrtr): remove this test case when the legacy `RenewV2` is removed. +func TestLessorExpireAndDemoteV2(t *testing.T) { lg := zap.NewNop() dir, be := NewTestBackend(t) defer os.RemoveAll(dir) @@ -508,7 +551,7 @@ func TestLessorExpireAndDemote(t *testing.T) { donec := make(chan struct{}, 1) go func() { // expired lease cannot be renewed - if _, err := le.Renew(l.ID); err != ErrNotPrimary { + if _, err := le.RenewV2(l.ID); err != ErrNotPrimary { t.Errorf("unexpected renew: %v", err) } donec <- struct{}{} @@ -520,7 +563,7 @@ func TestLessorExpireAndDemote(t *testing.T) { case <-time.After(50 * time.Millisecond): } - // demote will cause the renew request to fail with ErrNotPrimary + // demote will cause the renewV2 request to fail with ErrNotPrimary le.Demote() select { @@ -530,6 +573,44 @@ func TestLessorExpireAndDemote(t *testing.T) { } } +func TestLessorExpireAndDemote(t *testing.T) { + lg := zap.NewNop() + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + testMinTTL := int64(1) + + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL}) + defer le.Stop() + + le.Promote(1 * time.Second) + l, err := le.Grant(1, testMinTTL) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + select { + case el := <-le.ExpiredLeasesC(): + if el[0].ID != l.ID { + t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to receive expired lease") + } + + if _, err := le.Renew(l.ID); err != nil { + t.Errorf("unexpected renew: %v", err) + } + + le.Demote() + + // renew should work after demote. + if _, err := le.Renew(l.ID); err != nil { + t.Errorf("unexpected renew: %v", err) + } +} + func TestLessorMaxTTL(t *testing.T) { lg := zap.NewNop() dir, be := NewTestBackend(t) diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 8518b17879e..17121f53937 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -747,7 +747,7 @@ func TestV3LeaseFailover(t *testing.T) { // send keep alive to old leader until the old leader starts // to drop lease request. - var expectedExp time.Time + expectedExp := time.Now().Add(time.Duration(lresp.TTL) * time.Second) for { if err = lac.Send(lreq); err != nil { break