From f8e3a1f74e3a8a236ecfe7bf3f92e0b5301aa8b3 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 3 Jan 2025 16:31:13 +0800 Subject: [PATCH] fix Signed-off-by: okJiang <819421878@qq.com> --- server/grpc_service.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index d5fd8ae3e32..f268ae122ad 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -345,6 +345,9 @@ func (s *GrpcServer) GetMinTS( // GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across // all keyspace groups. func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, error) { + if s.IsClosed() { + return nil, ErrNotStarted + } addrs := s.keyspaceGroupManager.GetTSOServiceAddrs() if len(addrs) == 0 { return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered") @@ -571,6 +574,11 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { return errors.WithStack(err) } + // TSO uses leader lease to determine validity. No need to check leader here. + if s.IsClosed() { + return ErrNotStarted + } + forwardedHost := grpcutil.GetForwardedHost(stream.Context()) if !s.isLocalRequest(forwardedHost) { clientConn, err := s.getDelegateClient(s.ctx, forwardedHost) @@ -605,10 +613,6 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { } start := time.Now() - // TSO uses leader lease to determine validity. No need to check leader here. - if s.IsClosed() { - return status.Errorf(codes.Unknown, "server not started") - } if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId()) @@ -754,6 +758,9 @@ func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, _ *pdpb.IsSnapsho }, nil } } + if s.IsClosed() { + return nil, ErrNotStarted + } // recovering mark is stored in etcd directly, there's no need to forward. marked, err := s.Server.IsSnapshotRecovering(ctx) if err != nil {