From 6092356a4fb3f72d73d548fa2fd8cf0e5d294517 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Sat, 12 Feb 2022 08:13:48 +0800 Subject: [PATCH] support linearizable renew lease --- server/etcdserver/v3_server.go | 21 ++++++++++++- server/lease/leasehttp/http.go | 6 ++-- tests/framework/integration/cluster.go | 22 ++++++++++---- tests/integration/v3_lease_test.go | 41 +++++++++++++++++++++----- 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 9885fc01c031..619ba1ed7a2b 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -270,6 +270,16 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* return resp.(*pb.LeaseGrantResponse), nil } +func (s *EtcdServer) waitApplyPendingCommits() error { + select { + case <-s.ApplyWait(): + case <-time.After(leasehttp.ApplyTimeout): + return ErrTimeout + } + + return nil +} + func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) if err != nil { @@ -279,6 +289,12 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) } func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { + if s.isLeader() { + if err := s.waitApplyPendingCommits(); err != nil { + return 0, err + } + } + ttl, err := s.lessor.Renew(id) if err == nil { // already requested to primary lessor(leader) return ttl, nil @@ -314,7 +330,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { - if s.Leader() == s.ID() { + if s.isLeader() { + if err := s.waitApplyPendingCommits(); err != nil { + return nil, err + } // primary; timetolive directly from leader le := s.lessor.Lookup(lease.LeaseID(r.ID)) if le == nil { diff --git a/server/lease/leasehttp/http.go b/server/lease/leasehttp/http.go index 542c3a82a0cb..f0672b8c2058 100644 --- a/server/lease/leasehttp/http.go +++ b/server/lease/leasehttp/http.go @@ -32,7 +32,7 @@ import ( var ( LeasePrefix = "/leases" LeaseInternalPrefix = "/leases/internal" - applyTimeout = time.Second + ApplyTimeout = time.Second ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out") ) @@ -69,7 +69,7 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } select { case <-h.waitch(): - case <-time.After(applyTimeout): + case <-time.After(ApplyTimeout): http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) return } @@ -99,7 +99,7 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } select { case <-h.waitch(): - case <-time.After(applyTimeout): + case <-time.After(ApplyTimeout): http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) return } diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 4600e092d84d..3b78fdb155be 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -407,7 +407,7 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int { for _, m := range membs { possibleLead[uint64(m.Server.ID())] = true } - cc, err := c.ClusterClient() + cc, err := c.ClusterClient(t) if err != nil { t.Fatal(err) } @@ -1366,16 +1366,26 @@ func (c *Cluster) Client(i int) *clientv3.Client { return c.Members[i].Client } -func (c *Cluster) ClusterClient() (client *clientv3.Client, err error) { +func (c *Cluster) ClusterClient(t testutil.TB) (client *clientv3.Client, err error) { if c.clusterClient == nil { - endpoints := []string{} + var endpoints []string for _, m := range c.Members { endpoints = append(endpoints, m.GrpcURL) } cfg := clientv3.Config{ - Endpoints: endpoints, - DialTimeout: 5 * time.Second, - DialOptions: []grpc.DialOption{grpc.WithBlock()}, + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + MaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize, + MaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize, + Logger: memberLogger(t, "cluster"), + } + if c.Cfg.ClientTLS != nil { + tls, err := c.Cfg.ClientTLS.ClientConfig() + if err != nil { + return nil, err + } + cfg.TLS = tls } c.clusterClient, err = newClientV3(cfg) if err != nil { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 412dd7899e65..d56817212d37 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -16,6 +16,7 @@ package integration import ( "context" + "errors" "fmt" "testing" "time" @@ -401,17 +402,31 @@ func TestV3LeaseLeases(t *testing.T) { // it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found. // related issue https://github.com/etcd-io/etcd/issues/6978 func TestV3LeaseRenewStress(t *testing.T) { - testLeaseStress(t, stressLeaseRenew) + testLeaseStress(t, stressLeaseRenew, false) +} + +// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress, +// but it uses a cluster client instead of a specific member's client. +// The related issue is https://github.com/etcd-io/etcd/issues/13675. +func TestV3LeaseRenewStressWithClusterClient(t *testing.T) { + testLeaseStress(t, stressLeaseRenew, true) } // TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved. // it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found. // related issue https://github.com/etcd-io/etcd/issues/6978 func TestV3LeaseTimeToLiveStress(t *testing.T) { - testLeaseStress(t, stressLeaseTimeToLive) + testLeaseStress(t, stressLeaseTimeToLive, false) } -func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) { +// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress, +// but it uses a cluster client instead of a specific member's client. +// The related issue is https://github.com/etcd-io/etcd/issues/13675. +func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) { + testLeaseStress(t, stressLeaseTimeToLive, true) +} + +func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) { integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -420,13 +435,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient defer cancel() errc := make(chan error) - for i := 0; i < 30; i++ { - for j := 0; j < 3; j++ { - go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j) + if useClusterClient { + for i := 0; i < 300; i++ { + clusterClient, err := clus.ClusterClient(t) + if err != nil { + t.Fatal(err) + } + go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clusterClient).Lease) }(i) + } + } else { + for i := 0; i < 100; i++ { + for j := 0; j < 3; j++ { + go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j) + } } } - for i := 0; i < 90; i++ { + for i := 0; i < 300; i++ { if err := <-errc; err != nil { t.Fatal(err) } @@ -457,7 +482,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) { continue } if rresp.TTL == 0 { - return fmt.Errorf("TTL shouldn't be 0 so soon") + return errors.New("TTL shouldn't be 0 so soon") } } return nil