Skip to content

Commit

Permalink
mcs: fix the scatter error (tikv#7241)
Browse files Browse the repository at this point in the history
close tikv#7234

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Dec 1, 2023
1 parent 528071b commit 0c4819f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 8 deletions.
8 changes: 6 additions & 2 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.Scat

opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit())
if err != nil {
return nil, err
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Message: err.Error(),
})
return &schedulingpb.ScatterRegionsResponse{Header: header}, nil
}
percentage := 100
if len(failures) > 0 {
Expand Down Expand Up @@ -243,7 +247,7 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper
if r == nil {
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Message: "Not Found",
Message: "region not found",
})
return &schedulingpb.GetOperatorResponse{Header: header}, nil
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var (
scatterUnnecessaryCounter = scatterCounter.WithLabelValues("unnecessary", "")
scatterFailCounter = scatterCounter.WithLabelValues("fail", "")
scatterSuccessCounter = scatterCounter.WithLabelValues("success", "")
errRegionNotFound = errors.New("region not found")
errEmptyRegion = errors.New("empty region")
)

const (
Expand Down Expand Up @@ -184,7 +186,7 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s
regions := r.cluster.ScanRegions(startKey, endKey, -1)
if len(regions) < 1 {
scatterSkipEmptyRegionCounter.Inc()
return 0, nil, errors.New("empty region")
return 0, nil, errEmptyRegion
}
failures := make(map[uint64]error, len(regions))
regionMap := make(map[uint64]*core.RegionInfo, len(regions))
Expand All @@ -203,7 +205,14 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s
func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, retryLimit int) (int, map[uint64]error, error) {
if len(regionsID) < 1 {
scatterSkipEmptyRegionCounter.Inc()
return 0, nil, errors.New("empty region")
return 0, nil, errEmptyRegion
}
if len(regionsID) == 1 {
region := r.cluster.GetRegion(regionsID[0])
if region == nil {
scatterSkipNoRegionCounter.Inc()
return 0, nil, errRegionNotFound
}
}
failures := make(map[uint64]error, len(regionsID))
regions := make([]*core.RegionInfo, 0, len(regionsID))
Expand Down Expand Up @@ -238,7 +247,7 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r
func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, failures map[uint64]error, group string, retryLimit int) (int, error) {
if len(regions) < 1 {
scatterSkipEmptyRegionCounter.Inc()
return 0, errors.New("empty region")
return 0, errEmptyRegion
}
if retryLimit > maxRetryLimit {
retryLimit = maxRetryLimit
Expand Down
36 changes: 33 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) {
forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName)
pre := s.schedulingClient.Load()
// 1. forwardedHost is not empty and pre is empty, update the schedulingClient
// 2. forwardedHost is not empty and forwardedHost is not equal to pre, update the schedulingClient
// 3. forwardedHost is not empty and forwardedHost is equal to pre, return pre
// 4. forwardedHost is empty, return nil
if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
Expand All @@ -996,6 +1000,8 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.S
}
s.schedulingClient.Store(forwardCli)
return forwardCli.getClient(), nil
} else if forwardedHost != "" && (pre != nil && forwardedHost == pre.(*schedulingClient).getPrimaryAddr()) {
return pre.(*schedulingClient).getClient(), nil
}
return nil, ErrNotFoundSchedulingAddr
}
Expand Down Expand Up @@ -1534,8 +1540,13 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest
// AskBatchSplit implements gRPC PDServer.
func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if s.IsAPIServiceMode() {
s.updateSchedulingClient(ctx)
if s.schedulingClient.Load() != nil {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.AskBatchSplitResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
if forwardCli != nil {
req := &schedulingpb.AskBatchSplitRequest{
Header: &schedulingpb.RequestHeader{
ClusterId: request.GetHeader().GetClusterId(),
Expand Down Expand Up @@ -1710,7 +1721,14 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg
}, nil
}
if forwardCli != nil {
regionsID := request.GetRegionsId()
var regionsID []uint64
// nolint
if request.GetRegionId() != 0 {
// nolint
regionsID = []uint64{request.GetRegionId()}
} else {
regionsID = request.GetRegionsId()
}
if len(regionsID) == 0 {
return &pdpb.ScatterRegionResponse{
Header: s.invalidValue("regions id is required"),
Expand Down Expand Up @@ -2028,6 +2046,15 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader {
func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader {
switch header.GetError().GetType() {
case schedulingpb.ErrorType_UNKNOWN:
if strings.Contains(header.GetError().GetMessage(), "region not found") {
return &pdpb.ResponseHeader{
ClusterId: header.GetClusterId(),
Error: &pdpb.Error{
Type: pdpb.ErrorType_REGION_NOT_FOUND,
Message: header.GetError().GetMessage(),
},
}
}
return &pdpb.ResponseHeader{
ClusterId: header.GetClusterId(),
Error: &pdpb.Error{
Expand Down Expand Up @@ -2393,6 +2420,9 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC
}

func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) {
if client == nil {
return nil, nil, errors.New("connection is not set")
}
done := make(chan struct{})
ctx, cancel := context.WithCancel(s.ctx)
go grpcutil.CheckStream(ctx, cancel, done)
Expand Down

0 comments on commit 0c4819f

Please sign in to comment.