From 35b2719a3487788e087d627248d3bcc85ddb53d1 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 25 Oct 2023 12:14:02 +0800 Subject: [PATCH] mcs: fix the scatter error (#7241) close tikv/pd#7234 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/grpc_service.go | 8 +++-- pkg/schedule/scatter/region_scatterer.go | 15 ++++++++-- server/grpc_service.go | 36 +++++++++++++++++++++-- 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 8e47e7380f9..79c5c293ee7 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -211,7 +211,11 @@ func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.Scat opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) if err != nil { - return nil, err + header := s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_UNKNOWN, + Message: err.Error(), + }) + return &schedulingpb.ScatterRegionsResponse{Header: header}, nil } percentage := 100 if len(failures) > 0 { @@ -243,7 +247,7 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper if r == nil { header := s.errorHeader(&schedulingpb.Error{ Type: schedulingpb.ErrorType_UNKNOWN, - Message: "Not Found", + Message: "region not found", }) return &schedulingpb.GetOperatorResponse{Header: header}, nil } diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 68d868750e8..898c4d052a7 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -53,6 +53,8 @@ var ( scatterUnnecessaryCounter = scatterCounter.WithLabelValues("unnecessary", "") scatterFailCounter = scatterCounter.WithLabelValues("fail", "") scatterSuccessCounter = scatterCounter.WithLabelValues("success", "") + errRegionNotFound = errors.New("region not found") + errEmptyRegion = errors.New("empty region") ) const ( @@ -165,7 +167,7 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s regions := r.cluster.ScanRegions(startKey, endKey, -1) if len(regions) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, nil, errors.New("empty region") + return 0, nil, errEmptyRegion } failures := make(map[uint64]error, len(regions)) regionMap := make(map[uint64]*core.RegionInfo, len(regions)) @@ -184,7 +186,14 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, map[uint64]error, error) { if len(regionsID) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, nil, errors.New("empty region") + return 0, nil, errEmptyRegion + } + if len(regionsID) == 1 { + region := r.cluster.GetRegion(regionsID[0]) + if region == nil { + scatterSkipNoRegionCounter.Inc() + return 0, nil, errRegionNotFound + } } failures := make(map[uint64]error, len(regionsID)) regions := make([]*core.RegionInfo, 0, len(regionsID)) @@ -219,7 +228,7 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, failures map[uint64]error, group string, retryLimit int, skipStoreLimit bool) (int, error) { if len(regions) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, errors.New("empty region") + return 0, errEmptyRegion } if retryLimit > maxRetryLimit { retryLimit = maxRetryLimit diff --git a/server/grpc_service.go b/server/grpc_service.go index da9f71170c3..2e59bdaf742 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1034,6 +1034,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) pre := s.schedulingClient.Load() + // 1. forwardedHost is not empty and pre is empty, update the schedulingClient + // 2. forwardedHost is not empty and forwardedHost is not equal to pre, update the schedulingClient + // 3. forwardedHost is not empty and forwardedHost is equal to pre, return pre + // 4. forwardedHost is empty, return nil if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { client, err := s.getDelegateClient(ctx, forwardedHost) if err != nil { @@ -1045,6 +1049,8 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.S } s.schedulingClient.Store(forwardCli) return forwardCli.getClient(), nil + } else if forwardedHost != "" && (pre != nil && forwardedHost == pre.(*schedulingClient).getPrimaryAddr()) { + return pre.(*schedulingClient).getClient(), nil } return nil, ErrNotFoundSchedulingAddr } @@ -1627,8 +1633,13 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.AskBatchSplitRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1802,7 +1813,14 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg }, nil } if forwardCli != nil { - regionsID := request.GetRegionsId() + var regionsID []uint64 + // nolint + if request.GetRegionId() != 0 { + // nolint + regionsID = []uint64{request.GetRegionId()} + } else { + regionsID = request.GetRegionsId() + } if len(regionsID) == 0 { return &pdpb.ScatterRegionResponse{ Header: s.invalidValue("regions id is required"), @@ -2134,6 +2152,15 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader { func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader { switch header.GetError().GetType() { case schedulingpb.ErrorType_UNKNOWN: + if strings.Contains(header.GetError().GetMessage(), "region not found") { + return &pdpb.ResponseHeader{ + ClusterId: header.GetClusterId(), + Error: &pdpb.Error{ + Type: pdpb.ErrorType_REGION_NOT_FOUND, + Message: header.GetError().GetMessage(), + }, + } + } return &pdpb.ResponseHeader{ ClusterId: header.GetClusterId(), Error: &pdpb.Error{ @@ -2502,6 +2529,9 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { + if client == nil { + return nil, nil, errors.New("connection is not set") + } done := make(chan struct{}) ctx, cancel := context.WithCancel(s.ctx) go grpcutil.CheckStream(ctx, cancel, done)