diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index 163e63b22c2..50a859282b3 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -77,6 +77,7 @@ var ( ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err() ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err() + ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err() ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() ErrGRPCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err() @@ -212,6 +213,7 @@ var ( ErrTimeout = Error(ErrGRPCTimeout) ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail) ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost) + ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex) ErrUnhealthy = Error(ErrGRPCUnhealthy) ErrCorrupt = Error(ErrGRPCCorrupt) ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee) diff --git a/server/etcdserver/api/v3rpc/util.go b/server/etcdserver/api/v3rpc/util.go index cef6476bc41..a4ddbe58e38 100644 --- a/server/etcdserver/api/v3rpc/util.go +++ b/server/etcdserver/api/v3rpc/util.go @@ -54,6 +54,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost, + etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex, etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy, etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound, etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, diff --git a/server/etcdserver/errors.go b/server/etcdserver/errors.go index 9d9b07e13af..e28f49c1778 100644 --- a/server/etcdserver/errors.go +++ b/server/etcdserver/errors.go @@ -27,6 +27,7 @@ var ( ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long") ErrLeaderChanged = errors.New("etcdserver: leader changed") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 706c7854952..d63de8e8f2c 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -1906,3 +1906,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) { } s.sendC <- send } + +func TestWaitAppliedIndex(t *testing.T) { + cases := []struct { + name string + appliedIndex uint64 + committedIndex uint64 + action func(s *EtcdServer) + ExpectedError error + }{ + { + name: "The applied Id is already equal to the commitId", + appliedIndex: 10, + committedIndex: 10, + action: func(s *EtcdServer) { + s.applyWait.Trigger(10) + }, + ExpectedError: nil, + }, + { + name: "The etcd server has already stopped", + appliedIndex: 10, + committedIndex: 12, + action: func(s *EtcdServer) { + s.stopping <- struct{}{} + }, + ExpectedError: ErrStopped, + }, + { + name: "Timed out waiting for the applied index", + appliedIndex: 10, + committedIndex: 12, + action: nil, + ExpectedError: ErrTimeoutWaitAppliedIndex, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := &EtcdServer{ + appliedIndex: tc.appliedIndex, + committedIndex: tc.committedIndex, + stopping: make(chan struct{}, 1), + applyWait: wait.NewTimeList(), + } + + if tc.action != nil { + go tc.action(s) + } + + err := s.waitAppliedIndex() + + if err != tc.ExpectedError { + t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err) + } + }) + } +} diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index f2bfa758142..154cbee2357 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -45,6 +45,10 @@ const ( maxGapBetweenApplyAndCommitIndex = 5000 traceThreshold = 100 * time.Millisecond readIndexRetryTime = 500 * time.Millisecond + + // The timeout for the node to catch up its applied index, and is used in + // lease related operations, such as LeaseRenew and LeaseTimeToLive. + applyTimeout = time.Second ) type RaftKV interface { @@ -275,13 +279,13 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* return resp.(*pb.LeaseGrantResponse), nil } -func (s *EtcdServer) waitAppliedIndex(ctx context.Context) error { +func (s *EtcdServer) waitAppliedIndex() error { select { case <-s.ApplyWait(): case <-s.stopping: return ErrStopped - case <-ctx.Done(): - return ErrTimeout + case <-time.After(applyTimeout): + return ErrTimeoutWaitAppliedIndex } return nil @@ -296,23 +300,23 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) } func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { - cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) - defer cancel() - if s.isLeader() { - if err := s.waitAppliedIndex(cctx); err != nil { + if err := s.waitAppliedIndex(); err != nil { return 0, err } - } - ttl, err := s.lessor.Renew(id) - if err == nil { // already requested to primary lessor(leader) - return ttl, nil - } - if err != lease.ErrNotPrimary { - return -1, err + ttl, err := s.lessor.Renew(id) + if err == nil { // already requested to primary lessor(leader) + return ttl, nil + } + if err != lease.ErrNotPrimary { + return -1, err + } } + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + // renewals don't go through raft; forward to leader manually for cctx.Err() == nil { leader, lerr := s.waitLeader(cctx) @@ -321,7 +325,7 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } for _, url := range leader.PeerURLs { lurl := url + leasehttp.LeasePrefix - ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) + ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) if err == nil || err == lease.ErrLeaseNotFound { return ttl, err } @@ -337,11 +341,8 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { - cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) - defer cancel() - if s.isLeader() { - if err := s.waitAppliedIndex(cctx); err != nil { + if err := s.waitAppliedIndex(); err != nil { return nil, err } // primary; timetolive directly from leader @@ -362,6 +363,9 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR return resp, nil } + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + // forward to leader for cctx.Err() == nil { leader, err := s.waitLeader(cctx)