Skip to content

Commit

Permalink
fix the scatter error
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 23, 2023
1 parent 1899f41 commit e19265f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 5 deletions.
8 changes: 6 additions & 2 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.Scat

opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit())
if err != nil {
return nil, err
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Message: err.Error(),
})
return &schedulingpb.ScatterRegionsResponse{Header: header}, nil
}
percentage := 100
if len(failures) > 0 {
Expand Down Expand Up @@ -243,7 +247,7 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper
if r == nil {
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Message: "Not Found",
Message: "region not found",
})
return &schedulingpb.GetOperatorResponse{Header: header}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r
region := r.cluster.GetRegion(id)
if region == nil {
scatterSkipNoRegionCounter.Inc()
if len(regionsID) == 1 {
return 0, nil, errors.New("region not found")
}
log.Warn("failed to find region during scatter", zap.Uint64("region-id", id))
failures[id] = errors.New(fmt.Sprintf("failed to find region %v", id))
continue
Expand Down
33 changes: 30 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) {
forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName)
pre := s.schedulingClient.Load()
// 1. forwardedHost is not empty and pre is empty, update the schedulingClient
// 2. forwardedHost is not empty and forwardedHost is not equal to pre, update the schedulingClient
// 3. forwardedHost is not empty and forwardedHost is equal to pre, return pre
// 4. forwardedHost is empty, return nil
if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
Expand All @@ -1045,6 +1049,8 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.S
}
s.schedulingClient.Store(forwardCli)
return forwardCli.getClient(), nil
} else if forwardedHost != "" && (pre != nil && forwardedHost == pre.(*schedulingClient).getPrimaryAddr()) {
return pre.(*schedulingClient).getClient(), nil
}
return nil, ErrNotFoundSchedulingAddr
}
Expand Down Expand Up @@ -1627,8 +1633,13 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest
// AskBatchSplit implements gRPC PDServer.
func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if s.IsAPIServiceMode() {
s.updateSchedulingClient(ctx)
if s.schedulingClient.Load() != nil {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.AskBatchSplitResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
if forwardCli != nil {
req := &schedulingpb.AskBatchSplitRequest{
Header: &schedulingpb.RequestHeader{
ClusterId: request.GetHeader().GetClusterId(),
Expand Down Expand Up @@ -1802,7 +1813,14 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg
}, nil
}
if forwardCli != nil {
regionsID := request.GetRegionsId()
var regionsID []uint64
// nolint
if request.GetRegionId() != 0 {
// nolint
regionsID = []uint64{request.GetRegionId()}
} else {
regionsID = request.GetRegionsId()
}
if len(regionsID) == 0 {
return &pdpb.ScatterRegionResponse{
Header: s.invalidValue("regions id is required"),
Expand Down Expand Up @@ -2134,6 +2152,15 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader {
func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader {
switch header.GetError().GetType() {
case schedulingpb.ErrorType_UNKNOWN:
if strings.Contains(header.GetError().GetMessage(), "region not found") {
return &pdpb.ResponseHeader{
ClusterId: header.GetClusterId(),
Error: &pdpb.Error{
Type: pdpb.ErrorType_REGION_NOT_FOUND,
Message: header.GetError().GetMessage(),
},
}
}
return &pdpb.ResponseHeader{
ClusterId: header.GetClusterId(),
Error: &pdpb.Error{
Expand Down

0 comments on commit e19265f

Please sign in to comment.