Skip to content

Commit

Permalink
etcdserver: process lease Renew request via raft when clusterVersion …
Browse files Browse the repository at this point in the history
…>= 3.6

Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Mar 7, 2023
1 parent 4c2a5bc commit eb63556
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,64 @@ func (s *EtcdServer) LeaseRenewV2(ctx context.Context, id lease.LeaseID) (int64,
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
cv := s.cluster.Version()
if version.LessThan(*cv, version.V3_6) {
resp := &pb.LeaseKeepAliveResponse{ID: r.ID, Header: s.newHeader()}

var err error
resp.TTL, err = s.leaseRenewV2(ctx, lease.LeaseID(r.ID))

return resp, err
}

resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r})
if err != nil {
return nil, err
}
return resp.(*pb.LeaseKeepAliveResponse), err
}

func (s *EtcdServer) leaseRenewV2(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
// Throttle in case of e.g. connection problems.
time.Sleep(50 * time.Millisecond)
}

if cctx.Err() == context.DeadlineExceeded {
return -1, errors.ErrTimeout
}
return -1, errors.ErrCanceled
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
Expand Down

0 comments on commit eb63556

Please sign in to comment.