From 10cb9f64be67a0818db17c2df1cdbb8ae31c1a2a Mon Sep 17 00:00:00 2001 From: ahrtr Date: Sat, 12 Feb 2022 08:13:48 +0800 Subject: [PATCH 1/2] support linearizable renew lease When etcdserver receives a LeaseRenew request, it may be still in progress of processing the LeaseGrantRequest on exact the same leaseID. Accordingly it may return a TTL=0 to client due to the leaseID not found error. So the leader should wait for the appliedID to be available before processing client requests. --- CHANGELOG/CHANGELOG-3.6.md | 1 + server/etcdserver/v3_server.go | 35 +++++++++++++++++----- tests/framework/integration/cluster.go | 17 ++++++++--- tests/integration/v3_lease_test.go | 41 +++++++++++++++++++++----- 4 files changed, 75 insertions(+), 19 deletions(-) diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index 06c5c6d6200b..6c0c89b9b6c6 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -56,6 +56,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Fix [etcd gateway doesn't format the endpoint of IPv6 address correctly](https://github.com/etcd-io/etcd/pull/13551) - Fix [A client can cause a nil dereference in etcd by passing an invalid SortTarget](https://github.com/etcd-io/etcd/pull/13555) - Fix [Grant lease with negative ID can possibly cause db out of sync](https://github.com/etcd-io/etcd/pull/13676) +- Fix [Etcdserver is still in progress of processing LeaseGrantRequest when it receives a LeaseKeepAliveRequest on the same leaseID](https://github.com/etcd-io/etcd/pull/13690) ### tools/benchmark diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 3e868bebda98..961a5fc37025 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -271,6 +271,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* return resp.(*pb.LeaseGrantResponse), nil } +func (s *EtcdServer) waitAppliedIndex(ctx context.Context) error { + select { + case <-s.ApplyWait(): + case <-s.stopping: + return ErrStopped + case <-ctx.Done(): + return ErrTimeout + } + + return nil +} + func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) if err != nil { @@ -280,6 +292,15 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) } func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + + if s.isLeader() { + if err := s.waitAppliedIndex(cctx); err != nil { + return 0, err + } + } + ttl, err := s.lessor.Renew(id) if err == nil { // already requested to primary lessor(leader) return ttl, nil @@ -288,9 +309,6 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e return -1, err } - cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) - defer cancel() - // renewals don't go through raft; forward to leader manually for cctx.Err() == nil { leader, lerr := s.waitLeader(cctx) @@ -315,7 +333,13 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { - if s.Leader() == s.ID() { + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + + if s.isLeader() { + if err := s.waitAppliedIndex(cctx); err != nil { + return nil, err + } // primary; timetolive directly from leader le := s.lessor.Lookup(lease.LeaseID(r.ID)) if le == nil { @@ -334,9 +358,6 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR return resp, nil } - cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) - defer cancel() - // forward to leader for cctx.Err() == nil { leader, err := s.waitLeader(cctx) diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 197f87991ce8..2c9547105895 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -1368,14 +1368,23 @@ func (c *Cluster) Endpoints() []string { func (c *Cluster) ClusterClient() (client *clientv3.Client, err error) { if c.clusterClient == nil { - endpoints := []string{} + var endpoints []string for _, m := range c.Members { endpoints = append(endpoints, m.GrpcURL) } cfg := clientv3.Config{ - Endpoints: endpoints, - DialTimeout: 5 * time.Second, - DialOptions: []grpc.DialOption{grpc.WithBlock()}, + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + MaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize, + MaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize, + } + if c.Cfg.ClientTLS != nil { + tls, err := c.Cfg.ClientTLS.ClientConfig() + if err != nil { + return nil, err + } + cfg.TLS = tls } c.clusterClient, err = newClientV3(cfg) if err != nil { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 520c983a30ea..aba4df267e5f 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -16,6 +16,7 @@ package integration import ( "context" + "errors" "fmt" "math" "testing" @@ -487,17 +488,31 @@ func TestV3LeaseLeases(t *testing.T) { // it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found. // related issue https://github.com/etcd-io/etcd/issues/6978 func TestV3LeaseRenewStress(t *testing.T) { - testLeaseStress(t, stressLeaseRenew) + testLeaseStress(t, stressLeaseRenew, false) +} + +// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress, +// but it uses a cluster client instead of a specific member's client. +// The related issue is https://github.com/etcd-io/etcd/issues/13675. +func TestV3LeaseRenewStressWithClusterClient(t *testing.T) { + testLeaseStress(t, stressLeaseRenew, true) } // TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved. // it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found. // related issue https://github.com/etcd-io/etcd/issues/6978 func TestV3LeaseTimeToLiveStress(t *testing.T) { - testLeaseStress(t, stressLeaseTimeToLive) + testLeaseStress(t, stressLeaseTimeToLive, false) } -func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) { +// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress, +// but it uses a cluster client instead of a specific member's client. +// The related issue is https://github.com/etcd-io/etcd/issues/13675. +func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) { + testLeaseStress(t, stressLeaseTimeToLive, true) +} + +func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) { integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -506,13 +521,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient defer cancel() errc := make(chan error) - for i := 0; i < 30; i++ { - for j := 0; j < 3; j++ { - go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j) + if useClusterClient { + for i := 0; i < 300; i++ { + clusterClient, err := clus.ClusterClient() + if err != nil { + t.Fatal(err) + } + go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clusterClient).Lease) }(i) + } + } else { + for i := 0; i < 100; i++ { + for j := 0; j < 3; j++ { + go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j) + } } } - for i := 0; i < 90; i++ { + for i := 0; i < 300; i++ { if err := <-errc; err != nil { t.Fatal(err) } @@ -543,7 +568,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) { continue } if rresp.TTL == 0 { - return fmt.Errorf("TTL shouldn't be 0 so soon") + return errors.New("TTL shouldn't be 0 so soon") } } return nil From 536078024af37fbc73f1485505e98064dbe691b1 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Wed, 2 Mar 2022 10:47:45 +0800 Subject: [PATCH 2/2] set an separate applyTimeout for the waitAppliedIndex --- api/v3rpc/rpctypes/error.go | 2 + server/etcdserver/api/v3rpc/util.go | 1 + server/etcdserver/errors.go | 1 + server/etcdserver/v3_server.go | 42 +++++++++------- server/etcdserver/v3_server_test.go | 77 +++++++++++++++++++++++++++++ 5 files changed, 104 insertions(+), 19 deletions(-) create mode 100644 server/etcdserver/v3_server_test.go diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index 163e63b22c2e..50a859282b30 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -77,6 +77,7 @@ var ( ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err() ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err() + ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err() ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() ErrGRPCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err() @@ -212,6 +213,7 @@ var ( ErrTimeout = Error(ErrGRPCTimeout) ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail) ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost) + ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex) ErrUnhealthy = Error(ErrGRPCUnhealthy) ErrCorrupt = Error(ErrGRPCCorrupt) ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee) diff --git a/server/etcdserver/api/v3rpc/util.go b/server/etcdserver/api/v3rpc/util.go index cef6476bc415..a4ddbe58e386 100644 --- a/server/etcdserver/api/v3rpc/util.go +++ b/server/etcdserver/api/v3rpc/util.go @@ -54,6 +54,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost, + etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex, etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy, etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound, etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, diff --git a/server/etcdserver/errors.go b/server/etcdserver/errors.go index 9d9b07e13afb..e28f49c17789 100644 --- a/server/etcdserver/errors.go +++ b/server/etcdserver/errors.go @@ -27,6 +27,7 @@ var ( ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long") ErrLeaderChanged = errors.New("etcdserver: leader changed") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 961a5fc37025..781b0f456b22 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -45,6 +45,10 @@ const ( maxGapBetweenApplyAndCommitIndex = 5000 traceThreshold = 100 * time.Millisecond readIndexRetryTime = 500 * time.Millisecond + + // The timeout for the node to catch up its applied index, and is used in + // lease related operations, such as LeaseRenew and LeaseTimeToLive. + applyTimeout = time.Second ) type RaftKV interface { @@ -271,13 +275,13 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* return resp.(*pb.LeaseGrantResponse), nil } -func (s *EtcdServer) waitAppliedIndex(ctx context.Context) error { +func (s *EtcdServer) waitAppliedIndex() error { select { case <-s.ApplyWait(): case <-s.stopping: return ErrStopped - case <-ctx.Done(): - return ErrTimeout + case <-time.After(applyTimeout): + return ErrTimeoutWaitAppliedIndex } return nil @@ -292,23 +296,23 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) } func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { - cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) - defer cancel() - if s.isLeader() { - if err := s.waitAppliedIndex(cctx); err != nil { + if err := s.waitAppliedIndex(); err != nil { return 0, err } - } - ttl, err := s.lessor.Renew(id) - if err == nil { // already requested to primary lessor(leader) - return ttl, nil - } - if err != lease.ErrNotPrimary { - return -1, err + ttl, err := s.lessor.Renew(id) + if err == nil { // already requested to primary lessor(leader) + return ttl, nil + } + if err != lease.ErrNotPrimary { + return -1, err + } } + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + // renewals don't go through raft; forward to leader manually for cctx.Err() == nil { leader, lerr := s.waitLeader(cctx) @@ -317,7 +321,7 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } for _, url := range leader.PeerURLs { lurl := url + leasehttp.LeasePrefix - ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) + ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) if err == nil || err == lease.ErrLeaseNotFound { return ttl, err } @@ -333,11 +337,8 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { - cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) - defer cancel() - if s.isLeader() { - if err := s.waitAppliedIndex(cctx); err != nil { + if err := s.waitAppliedIndex(); err != nil { return nil, err } // primary; timetolive directly from leader @@ -358,6 +359,9 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR return resp, nil } + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + // forward to leader for cctx.Err() == nil { leader, err := s.waitLeader(cctx) diff --git a/server/etcdserver/v3_server_test.go b/server/etcdserver/v3_server_test.go new file mode 100644 index 000000000000..400da0b618fa --- /dev/null +++ b/server/etcdserver/v3_server_test.go @@ -0,0 +1,77 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "testing" + + "go.etcd.io/etcd/pkg/v3/wait" +) + +func TestEtcdServer_WaitAppliedIndex(t *testing.T) { + cases := []struct { + name string + appliedIndex uint64 + committedIndex uint64 + action func(s *EtcdServer) + ExpectedError error + }{ + { + name: "The applied Id is already equal to the commitId", + appliedIndex: 10, + committedIndex: 10, + action: func(s *EtcdServer) { + s.applyWait.Trigger(10) + }, + ExpectedError: nil, + }, + { + name: "The etcd server has already stopped", + appliedIndex: 10, + committedIndex: 12, + action: func(s *EtcdServer) { + s.stopping <- struct{}{} + }, + ExpectedError: ErrStopped, + }, + { + name: "Timed out waiting for the applied index", + appliedIndex: 10, + committedIndex: 12, + action: nil, + ExpectedError: ErrTimeoutWaitAppliedIndex, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := &EtcdServer{ + appliedIndex: tc.appliedIndex, + committedIndex: tc.committedIndex, + stopping: make(chan struct{}, 1), + applyWait: wait.NewTimeList(), + } + + if tc.action != nil { + go tc.action(s) + } + + err := s.waitAppliedIndex() + + if err != tc.ExpectedError { + t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err) + } + }) + } +}