Skip to content

Commit

Permalink
support linearizable renew lease
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrtr committed Feb 15, 2022
1 parent e814f6f commit 68f90a6
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Fix [etcd gateway doesn't format the endpoint of IPv6 address correctly](https://github.com/etcd-io/etcd/pull/13551)
- Fix [A client can cause a nil dereference in etcd by passing an invalid SortTarget](https://github.com/etcd-io/etcd/pull/13555)
- Fix [Grant lease with negative ID can possibly cause db out of sync](https://github.com/etcd-io/etcd/pull/13676)
- Fix [Etcdserver is still in progress of processing LeaseGrantRequest when it receives a LeaseKeepAliveRequest on the same leaseID](https://github.com/etcd-io/etcd/pull/13690)

### tools/benchmark

Expand Down
35 changes: 28 additions & 7 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
return resp.(*pb.LeaseGrantResponse), nil
}

func (s *EtcdServer) waitAppliedIndex(ctx context.Context) error {
select {
case <-s.ApplyWait():
case <-s.stopping:
return ErrStopped
case <-ctx.Done():
return ErrTimeout
}

return nil
}

func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
if err != nil {
Expand All @@ -279,6 +291,15 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

if s.isLeader() {
if err := s.waitAppliedIndex(cctx); err != nil {
return 0, err
}
}

ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
Expand All @@ -287,9 +308,6 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
return -1, err
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
Expand All @@ -314,7 +332,13 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
if s.Leader() == s.ID() {
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

if s.isLeader() {
if err := s.waitAppliedIndex(cctx); err != nil {
return nil, err
}
// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
Expand All @@ -333,9 +357,6 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
return resp, nil
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// forward to leader
for cctx.Err() == nil {
leader, err := s.waitLeader(cctx)
Expand Down
6 changes: 3 additions & 3 deletions server/lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
var (
LeasePrefix = "/leases"
LeaseInternalPrefix = "/leases/internal"
applyTimeout = time.Second
ApplyTimeout = time.Second
ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
)

Expand Down Expand Up @@ -69,7 +69,7 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
select {
case <-h.waitch():
case <-time.After(applyTimeout):
case <-time.After(ApplyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
select {
case <-h.waitch():
case <-time.After(applyTimeout):
case <-time.After(ApplyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
Expand Down
22 changes: 16 additions & 6 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
for _, m := range membs {
possibleLead[uint64(m.Server.ID())] = true
}
cc, err := c.ClusterClient()
cc, err := c.ClusterClient(t)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1372,16 +1372,26 @@ func (c *Cluster) Client(i int) *clientv3.Client {
return c.Members[i].Client
}

func (c *Cluster) ClusterClient() (client *clientv3.Client, err error) {
func (c *Cluster) ClusterClient(t testutil.TB) (client *clientv3.Client, err error) {
if c.clusterClient == nil {
endpoints := []string{}
var endpoints []string
for _, m := range c.Members {
endpoints = append(endpoints, m.GrpcURL)
}
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
MaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize,
MaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize,
Logger: memberLogger(t, "cluster"),
}
if c.Cfg.ClientTLS != nil {
tls, err := c.Cfg.ClientTLS.ClientConfig()
if err != nil {
return nil, err
}
cfg.TLS = tls
}
c.clusterClient, err = newClientV3(cfg)
if err != nil {
Expand Down
41 changes: 33 additions & 8 deletions tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"context"
"errors"
"fmt"
"math"
"testing"
Expand Down Expand Up @@ -487,17 +488,31 @@ func TestV3LeaseLeases(t *testing.T) {
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/etcd-io/etcd/issues/6978
func TestV3LeaseRenewStress(t *testing.T) {
testLeaseStress(t, stressLeaseRenew)
testLeaseStress(t, stressLeaseRenew, false)
}

// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress,
// but it uses a cluster client instead of a specific member's client.
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
func TestV3LeaseRenewStressWithClusterClient(t *testing.T) {
testLeaseStress(t, stressLeaseRenew, true)
}

// TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
// it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
// related issue https://github.com/etcd-io/etcd/issues/6978
func TestV3LeaseTimeToLiveStress(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive)
testLeaseStress(t, stressLeaseTimeToLive, false)
}

func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress,
// but it uses a cluster client instead of a specific member's client.
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive, true)
}

func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
Expand All @@ -506,13 +521,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient
defer cancel()
errc := make(chan error)

for i := 0; i < 30; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j)
if useClusterClient {
for i := 0; i < 300; i++ {
clusterClient, err := clus.ClusterClient(t)
if err != nil {
t.Fatal(err)
}
go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clusterClient).Lease) }(i)
}
} else {
for i := 0; i < 100; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j)
}
}
}

for i := 0; i < 90; i++ {
for i := 0; i < 300; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -543,7 +568,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
continue
}
if rresp.TTL == 0 {
return fmt.Errorf("TTL shouldn't be 0 so soon")
return errors.New("TTL shouldn't be 0 so soon")
}
}
return nil
Expand Down

0 comments on commit 68f90a6

Please sign in to comment.