Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Lease] Refactor lease renew request via raft #14094

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 122 additions & 67 deletions api/etcdserverpb/raft_internal.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/etcdserverpb/raft_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message InternalRaftRequest {
AlarmRequest alarm = 10;

LeaseCheckpointRequest lease_checkpoint = 11 [(versionpb.etcd_version_field) = "3.4"];
LeaseKeepAliveRequest lease_renew = 12 [(versionpb.etcd_version_field) = "3.6"];

AuthEnableRequest auth_enable = 1000;
AuthDisableRequest auth_disable = 1011;
Expand Down
1 change: 1 addition & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ etcdserverpb.InternalRaftRequest.downgrade_info_set: "3.5"
etcdserverpb.InternalRaftRequest.header: ""
etcdserverpb.InternalRaftRequest.lease_checkpoint: "3.4"
etcdserverpb.InternalRaftRequest.lease_grant: ""
etcdserverpb.InternalRaftRequest.lease_renew: "3.6"
etcdserverpb.InternalRaftRequest.lease_revoke: ""
etcdserverpb.InternalRaftRequest.put: ""
etcdserverpb.InternalRaftRequest.range: ""
Expand Down
16 changes: 4 additions & 12 deletions server/etcdserver/api/v3rpc/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,18 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
return err
}

// Create header before we sent out the renew request.
// This can make sure that the revision is strictly smaller or equal to
// when the keepalive happened at the local server (when the local server is the leader)
// or remote leader.
// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
// at rev 4.
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)

ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
resp, err := ls.le.LeaseRenew(stream.Context(), req)
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
resp = &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)
resp.TTL = 0
}

if err != nil {
return togRPCError(err)
}

resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
if isClientCtxErr(stream.Context().Err(), err) {
Expand Down
6 changes: 6 additions & 0 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type applierV3 interface {

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)

Expand Down Expand Up @@ -207,6 +208,11 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return &pb.LeaseRevokeResponse{Header: a.newHeader()}, err
}

func (a *applierV3backend) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
Copy link
Contributor

@mitake mitake May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still possible to apply LeaseRenew entries issued by a stale leader unless it provides a mechanism like comparing term #15247 (comment) ?
I think all Raft messages issued by etcd itself might have similar problems potentially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a simpler approach is not using MsgProp and issuing lease related requests as MsgApp (related discussion: #15944 (comment)). With this approach we might be able to solve the issue without changing the WAL format. Its implementation will be tricky though. I'll try this idea this weekend if I can have time.

ttl, err := a.lessor.Renew(lease.LeaseID(lc.ID))
return &pb.LeaseKeepAliveResponse{Header: a.newHeader(), ID: lc.ID, TTL: ttl}, err
}

func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
for _, c := range lc.Checkpoints {
err := a.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func (a *applierV3Corrupt) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantRe
func (a *applierV3Corrupt) LeaseRevoke(_ *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
return nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
return nil, errors.ErrCorrupt
}
3 changes: 3 additions & 0 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
case r.LeaseRevoke != nil:
op = "LeaseRevoke"
ar.Resp, ar.Err = a.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseRenew != nil:
op = "LeaseRenew"
ar.Resp, ar.Err = a.applyV3.LeaseRenew(r.LeaseRenew)
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
Expand Down
69 changes: 64 additions & 5 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ type Lessor interface {
// LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed.
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)

// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
// LeaseRenew renews the lease.
LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
Expand Down Expand Up @@ -276,13 +275,73 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return resp.(*pb.LeaseRevokeResponse), nil
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
func (s *EtcdServer) LeaseRenewV2(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
ttl, err := s.lessor.RenewV2(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
// Throttle in case of e.g. connection problems.
time.Sleep(50 * time.Millisecond)
}

if cctx.Err() == context.DeadlineExceeded {
return -1, errors.ErrTimeout
}
return -1, errors.ErrCanceled
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
// TODO(ahrtr): remove the legacy `leaseRenewV2` in 3.7.
cv := s.cluster.Version()
if version.LessThan(*cv, version.V3_6) {
resp := &pb.LeaseKeepAliveResponse{ID: r.ID, Header: s.newHeader()}

var err error
resp.TTL, err = s.leaseRenewV2(ctx, lease.LeaseID(r.ID))

return resp, err
}

resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r})
if err != nil {
return nil, err
}
return resp.(*pb.LeaseKeepAliveResponse), err
}

func (s *EtcdServer) leaseRenewV2(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.RenewV2(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
ttl, rerr := h.l.RenewV2(lease.LeaseID(lreq.ID))
if rerr != nil {
if rerr == lease.ErrLeaseNotFound {
http.Error(w, rerr.Error(), http.StatusNotFound)
Expand Down
44 changes: 40 additions & 4 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,13 @@ type Lessor interface {
// Demote demotes the lessor from being the primary lessor.
Demote()

// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
// an error will be returned.
// RenewV2 renews a lease with given ID. It returns the renewed TTL.
// If the ID does not exist, an error will be returned.
// TODO(ahrtr): remove this legacy method in 3.7.
RenewV2(id LeaseID) (int64, error)

// Renew renews a lease with given ID. It returns the renewed TTL.
// If the given lease does not exist, an error will be returned.
Renew(id LeaseID) (int64, error)

// Lookup gives the lease at a given lease id, if any
Expand Down Expand Up @@ -364,7 +369,10 @@ func (le *lessor) Revoke(id LeaseID) error {
func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
le.mu.Lock()
defer le.mu.Unlock()
return le.checkpoint(id, remainingTTL)
}

func (le *lessor) checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
Expand All @@ -388,9 +396,10 @@ func greaterOrEqual(first, second semver.Version) bool {
return !version.LessThan(first, second)
}

// Renew renews an existing lease. If the given lease does not exist or
// RenewV2 renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
// TODO(ahrtr): remove the legacy method in 3.7
func (le *lessor) RenewV2(id LeaseID) (int64, error) {
le.mu.RLock()
if !le.isPrimary() {
// forward renew request to primary instead of returning error.
Expand Down Expand Up @@ -442,6 +451,31 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
return l.ttl, nil
}

func (le *lessor) Renew(id LeaseID) (int64, error) {
le.mu.Lock()
defer le.mu.Unlock()

l := le.leaseMap[id]
if l == nil {
return -1, ErrLeaseNotFound
}

if !le.isPrimary() {
if l.remainingTTL > 0 {
le.checkpoint(id, 0)
}
return l.ttl, nil
}

le.checkpoint(id, 0)
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)

leaseRenewed.Inc()
return l.ttl, nil
}

func (le *lessor) Lookup(id LeaseID) *Lease {
le.mu.RLock()
defer le.mu.RUnlock()
Expand Down Expand Up @@ -842,6 +876,8 @@ func (fl *FakeLessor) Promote(extend time.Duration) {}

func (fl *FakeLessor) Demote() {}

func (fl *FakeLessor) RenewV2(id LeaseID) (int64, error) { return 10, nil }

func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }

func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
Expand Down
93 changes: 88 additions & 5 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,9 @@ func TestLessorRecover(t *testing.T) {
}
}

func TestLessorExpire(t *testing.T) {
// TestLessorExpireV2 tests the legacy `RenewV2`.
// TODO(ahrtr): remove this test case when the legacy `RenewV2` is removed.
func TestLessorExpireV2(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -455,7 +457,7 @@ func TestLessorExpire(t *testing.T) {
donec := make(chan struct{}, 1)
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
if _, err := le.RenewV2(l.ID); err != ErrLeaseNotFound {
t.Errorf("unexpected renew")
}
donec <- struct{}{}
Expand All @@ -479,7 +481,50 @@ func TestLessorExpire(t *testing.T) {
}
}

func TestLessorExpireAndDemote(t *testing.T) {
func TestLessorExpire(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

testMinTTL := int64(1)

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()

le.Promote(1 * time.Second)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

select {
case el := <-le.ExpiredLeasesC():
if el[0].ID != l.ID {
t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to receive expired lease")
}

if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew")
}

// expired lease can be revoked
if err := le.Revoke(l.ID); err != nil {
t.Fatalf("failed to revoke expired lease: %v", err)
}

// revoked lease can't be renewed
if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
t.Errorf("unexpected renew")
}
}

// TestLessorExpireAndDemoteV2 tests the legacy `RenewV2`.
// TODO(ahrtr): remove this test case when the legacy `RenewV2` is removed.
func TestLessorExpireAndDemoteV2(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -508,7 +553,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
donec := make(chan struct{}, 1)
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrNotPrimary {
if _, err := le.RenewV2(l.ID); err != ErrNotPrimary {
t.Errorf("unexpected renew: %v", err)
}
donec <- struct{}{}
Expand All @@ -520,7 +565,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
case <-time.After(50 * time.Millisecond):
}

// demote will cause the renew request to fail with ErrNotPrimary
// demote will cause the renewV2 request to fail with ErrNotPrimary
le.Demote()

select {
Expand All @@ -530,6 +575,44 @@ func TestLessorExpireAndDemote(t *testing.T) {
}
}

func TestLessorExpireAndDemote(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

testMinTTL := int64(1)

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()

le.Promote(1 * time.Second)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

select {
case el := <-le.ExpiredLeasesC():
if el[0].ID != l.ID {
t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to receive expired lease")
}

if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew: %v", err)
}

le.Demote()

// renew should work after demote.
if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew: %v", err)
}
}

func TestLessorMaxTTL(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
Expand Down
Loading