diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 8e47e7380f9..79c5c293ee7 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -211,7 +211,11 @@ func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.Scat opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) if err != nil { - return nil, err + header := s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_UNKNOWN, + Message: err.Error(), + }) + return &schedulingpb.ScatterRegionsResponse{Header: header}, nil } percentage := 100 if len(failures) > 0 { @@ -243,7 +247,7 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper if r == nil { header := s.errorHeader(&schedulingpb.Error{ Type: schedulingpb.ErrorType_UNKNOWN, - Message: "Not Found", + Message: "region not found", }) return &schedulingpb.GetOperatorResponse{Header: header}, nil } diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 68d868750e8..6086655b0c3 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -192,6 +192,9 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r region := r.cluster.GetRegion(id) if region == nil { scatterSkipNoRegionCounter.Inc() + if len(regionsID) == 1 { + return 0, nil, errors.New("region not found") + } log.Warn("failed to find region during scatter", zap.Uint64("region-id", id)) failures[id] = errors.New(fmt.Sprintf("failed to find region %v", id)) continue diff --git a/server/grpc_service.go b/server/grpc_service.go index da9f71170c3..80e08dc6c7b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1034,6 +1034,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) pre := s.schedulingClient.Load() + // 1. forwardedHost is not empty and pre is empty, update the schedulingClient + // 2. forwardedHost is not empty and forwardedHost is not equal to pre, update the schedulingClient + // 3. forwardedHost is not empty and forwardedHost is equal to pre, return pre + // 4. forwardedHost is empty, return nil if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { client, err := s.getDelegateClient(ctx, forwardedHost) if err != nil { @@ -1045,6 +1049,8 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.S } s.schedulingClient.Store(forwardCli) return forwardCli.getClient(), nil + } else if forwardedHost != "" && (pre != nil && forwardedHost == pre.(*schedulingClient).getPrimaryAddr()) { + return pre.(*schedulingClient).getClient(), nil } return nil, ErrNotFoundSchedulingAddr } @@ -1627,8 +1633,13 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.AskBatchSplitRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1802,7 +1813,14 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg }, nil } if forwardCli != nil { - regionsID := request.GetRegionsId() + var regionsID []uint64 + // nolint + if request.GetRegionId() != 0 { + // nolint + regionsID = []uint64{request.GetRegionId()} + } else { + regionsID = request.GetRegionsId() + } if len(regionsID) == 0 { return &pdpb.ScatterRegionResponse{ Header: s.invalidValue("regions id is required"), @@ -2134,6 +2152,15 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader { func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader { switch header.GetError().GetType() { case schedulingpb.ErrorType_UNKNOWN: + if strings.Contains(header.GetError().GetMessage(), "region not found") { + return &pdpb.ResponseHeader{ + ClusterId: header.GetClusterId(), + Error: &pdpb.Error{ + Type: pdpb.ErrorType_REGION_NOT_FOUND, + Message: header.GetError().GetMessage(), + }, + } + } return &pdpb.ResponseHeader{ ClusterId: header.GetClusterId(), Error: &pdpb.Error{