From e19265fe5c8aff8c247c8853bcacafbbcb8f5573 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 20 Oct 2023 17:03:46 +0800 Subject: [PATCH 1/4] fix the scatter error Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/grpc_service.go | 8 ++++-- pkg/schedule/scatter/region_scatterer.go | 3 +++ server/grpc_service.go | 33 ++++++++++++++++++++--- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 8e47e7380f9..79c5c293ee7 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -211,7 +211,11 @@ func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.Scat opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) if err != nil { - return nil, err + header := s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_UNKNOWN, + Message: err.Error(), + }) + return &schedulingpb.ScatterRegionsResponse{Header: header}, nil } percentage := 100 if len(failures) > 0 { @@ -243,7 +247,7 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper if r == nil { header := s.errorHeader(&schedulingpb.Error{ Type: schedulingpb.ErrorType_UNKNOWN, - Message: "Not Found", + Message: "region not found", }) return &schedulingpb.GetOperatorResponse{Header: header}, nil } diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 68d868750e8..6086655b0c3 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -192,6 +192,9 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r region := r.cluster.GetRegion(id) if region == nil { scatterSkipNoRegionCounter.Inc() + if len(regionsID) == 1 { + return 0, nil, errors.New("region not found") + } log.Warn("failed to find region during scatter", zap.Uint64("region-id", id)) failures[id] = errors.New(fmt.Sprintf("failed to find region %v", id)) continue diff --git a/server/grpc_service.go b/server/grpc_service.go index da9f71170c3..80e08dc6c7b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1034,6 +1034,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) pre := s.schedulingClient.Load() + // 1. forwardedHost is not empty and pre is empty, update the schedulingClient + // 2. forwardedHost is not empty and forwardedHost is not equal to pre, update the schedulingClient + // 3. forwardedHost is not empty and forwardedHost is equal to pre, return pre + // 4. forwardedHost is empty, return nil if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { client, err := s.getDelegateClient(ctx, forwardedHost) if err != nil { @@ -1045,6 +1049,8 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.S } s.schedulingClient.Store(forwardCli) return forwardCli.getClient(), nil + } else if forwardedHost != "" && (pre != nil && forwardedHost == pre.(*schedulingClient).getPrimaryAddr()) { + return pre.(*schedulingClient).getClient(), nil } return nil, ErrNotFoundSchedulingAddr } @@ -1627,8 +1633,13 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.AskBatchSplitRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1802,7 +1813,14 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg }, nil } if forwardCli != nil { - regionsID := request.GetRegionsId() + var regionsID []uint64 + // nolint + if request.GetRegionId() != 0 { + // nolint + regionsID = []uint64{request.GetRegionId()} + } else { + regionsID = request.GetRegionsId() + } if len(regionsID) == 0 { return &pdpb.ScatterRegionResponse{ Header: s.invalidValue("regions id is required"), @@ -2134,6 +2152,15 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader { func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader { switch header.GetError().GetType() { case schedulingpb.ErrorType_UNKNOWN: + if strings.Contains(header.GetError().GetMessage(), "region not found") { + return &pdpb.ResponseHeader{ + ClusterId: header.GetClusterId(), + Error: &pdpb.Error{ + Type: pdpb.ErrorType_REGION_NOT_FOUND, + Message: header.GetError().GetMessage(), + }, + } + } return &pdpb.ResponseHeader{ ClusterId: header.GetClusterId(), Error: &pdpb.Error{ From 55dd28e13854727bb4c35897d1e73364ea120229 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 23 Oct 2023 18:18:22 +0800 Subject: [PATCH 2/4] fix stream panic Signed-off-by: Ryan Leung --- server/grpc_service.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/grpc_service.go b/server/grpc_service.go index 80e08dc6c7b..2e59bdaf742 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -2529,6 +2529,9 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { + if client == nil { + return nil, nil, errors.New("connection is not set") + } done := make(chan struct{}) ctx, cancel := context.WithCancel(s.ctx) go grpcutil.CheckStream(ctx, cancel, done) From c209d34894d79aefe49e31623bcf6b3028519b1c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 24 Oct 2023 09:30:47 +0800 Subject: [PATCH 3/4] address comments Signed-off-by: Ryan Leung --- pkg/schedule/scatter/region_scatterer.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 6086655b0c3..0a83efb923f 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -53,6 +53,8 @@ var ( scatterUnnecessaryCounter = scatterCounter.WithLabelValues("unnecessary", "") scatterFailCounter = scatterCounter.WithLabelValues("fail", "") scatterSuccessCounter = scatterCounter.WithLabelValues("success", "") + errRegionNotFound = errors.New("region not found") + errEmptyRegion = errors.New("empty region") ) const ( @@ -165,7 +167,7 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s regions := r.cluster.ScanRegions(startKey, endKey, -1) if len(regions) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, nil, errors.New("empty region") + return 0, nil, errEmptyRegion } failures := make(map[uint64]error, len(regions)) regionMap := make(map[uint64]*core.RegionInfo, len(regions)) @@ -184,7 +186,11 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, map[uint64]error, error) { if len(regionsID) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, nil, errors.New("empty region") + return 0, nil, errEmptyRegion + } + if len(regionsID) == 1 { + scatterSkipNoRegionCounter.Inc() + return 0, nil, errRegionNotFound } failures := make(map[uint64]error, len(regionsID)) regions := make([]*core.RegionInfo, 0, len(regionsID)) @@ -192,9 +198,6 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r region := r.cluster.GetRegion(id) if region == nil { scatterSkipNoRegionCounter.Inc() - if len(regionsID) == 1 { - return 0, nil, errors.New("region not found") - } log.Warn("failed to find region during scatter", zap.Uint64("region-id", id)) failures[id] = errors.New(fmt.Sprintf("failed to find region %v", id)) continue @@ -222,7 +225,7 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, failures map[uint64]error, group string, retryLimit int, skipStoreLimit bool) (int, error) { if len(regions) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, errors.New("empty region") + return 0, errEmptyRegion } if retryLimit > maxRetryLimit { retryLimit = maxRetryLimit From 7e28af70e9f581ab64c5f1e9db201af3309ca772 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 25 Oct 2023 10:37:59 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: Ryan Leung --- pkg/schedule/scatter/region_scatterer.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 0a83efb923f..898c4d052a7 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -189,8 +189,11 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r return 0, nil, errEmptyRegion } if len(regionsID) == 1 { - scatterSkipNoRegionCounter.Inc() - return 0, nil, errRegionNotFound + region := r.cluster.GetRegion(regionsID[0]) + if region == nil { + scatterSkipNoRegionCounter.Inc() + return 0, nil, errRegionNotFound + } } failures := make(map[uint64]error, len(regionsID)) regions := make([]*core.RegionInfo, 0, len(regionsID))