Skip to content

Commit

Permalink
*: remove the subtask operation queue in DM-worker (pingcap#432)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Dec 17, 2019
1 parent bdf102a commit f049e46
Show file tree
Hide file tree
Showing 35 changed files with 5,417 additions and 10,432 deletions.
128 changes: 39 additions & 89 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
wg.Add(1)
go s.ap.Emit(ctx, 0, func(args ...interface{}) {
defer wg.Done()
cli, worker, stCfgToml, taskName, err := s.taskConfigArgsExtractor(args...)
cli, worker, stCfgToml, _, err := s.taskConfigArgsExtractor(args...)
if err != nil {
workerRespCh <- errorCommonWorkerResponse(err.Error(), worker)
return
Expand All @@ -353,9 +353,14 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
StartSubTask: &pb.StartSubTaskRequest{Task: stCfgToml},
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := s.handleOperationResult(ctx, cli, taskName, worker, err, resp)
workerResp.Meta.Worker = worker
workerRespCh <- workerResp.Meta
if err != nil {
resp = &workerrpc.Response{
Type: workerrpc.CmdStartSubTask,
StartSubTask: errorCommonWorkerResponse(err.Error(), worker),
}
}
resp.StartSubTask.Worker = worker
workerRespCh <- resp.StartSubTask
}, func(args ...interface{}) {
defer wg.Done()
_, worker, _, _, err := s.taskConfigArgsExtractor(args...)
Expand Down Expand Up @@ -420,8 +425,10 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*
handleErr := func(err error, worker string) {
log.L().Error("response error", zap.Error(err))
workerResp := &pb.OperateSubTaskResponse{
Meta: errorCommonWorkerResponse(err.Error(), worker),
Op: req.Op,
Op: req.Op,
Result: false,
Worker: worker,
Msg: err.Error(),
}
workerRespCh <- workerResp
}
Expand All @@ -445,10 +452,18 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*
return
}
resp, err := cli.SendRequest(ctx, subReq, s.cfg.RPCTimeout)
workerResp := s.handleOperationResult(ctx, cli, req.Name, worker1, err, resp)
workerResp.Op = req.Op
workerResp.Meta.Worker = worker1
workerRespCh <- workerResp
if err != nil {
resp = &workerrpc.Response{
Type: workerrpc.CmdOperateSubTask,
OperateSubTask: &pb.OperateSubTaskResponse{
Op: req.Op,
Result: false,
Msg: err.Error(),
},
}
}
resp.OperateSubTask.Worker = worker1
workerRespCh <- resp.OperateSubTask
}, func(args ...interface{}) {
defer wg.Done()
_, worker1, err := s.workerArgsExtractor(args...)
Expand All @@ -465,9 +480,9 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*
workerRespMap := make(map[string]*pb.OperateSubTaskResponse, len(workers))
for len(workerRespCh) > 0 {
workerResp := <-workerRespCh
workerRespMap[workerResp.Meta.Worker] = workerResp
if len(workerResp.Meta.Msg) == 0 { // no error occurred
validWorkers = append(validWorkers, workerResp.Meta.Worker)
workerRespMap[workerResp.Worker] = workerResp
if len(workerResp.Msg) == 0 { // no error occurred
validWorkers = append(validWorkers, workerResp.Worker)
}
}

Expand Down Expand Up @@ -527,7 +542,7 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb
wg.Add(1)
go s.ap.Emit(ctx, 0, func(args ...interface{}) {
defer wg.Done()
cli, worker, stCfgToml, taskName, err := s.taskConfigArgsExtractor(args...)
cli, worker, stCfgToml, _, err := s.taskConfigArgsExtractor(args...)
if err != nil {
workerRespCh <- errorCommonWorkerResponse(err.Error(), worker)
return
Expand All @@ -537,9 +552,14 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb
UpdateSubTask: &pb.UpdateSubTaskRequest{Task: stCfgToml},
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := s.handleOperationResult(ctx, cli, taskName, worker, err, resp)
workerResp.Meta.Worker = worker
workerRespCh <- workerResp.Meta
if err != nil {
resp = &workerrpc.Response{
Type: workerrpc.CmdUpdateSubTask,
UpdateSubTask: errorCommonWorkerResponse(err.Error(), worker),
}
}
resp.UpdateSubTask.Worker = worker
workerRespCh <- resp.UpdateSubTask
}, func(args ...interface{}) {
defer wg.Done()
_, worker, _, _, err := s.taskConfigArgsExtractor(args...)
Expand Down Expand Up @@ -927,11 +947,7 @@ func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWork

handleErr := func(err error, worker string) {
log.L().Error("response error", zap.Error(err))
resp := &pb.CommonWorkerResponse{
Result: false,
Msg: errors.ErrorStack(err),
Worker: worker,
}
resp := errorCommonWorkerResponse(errors.ErrorStack(err), worker)
workerRespCh <- resp
}

Expand Down Expand Up @@ -1961,75 +1977,9 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
}

var (
maxRetryNum = 30
retryInterval = time.Second
maxRetryNum = 30
)

func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, taskName, workerID string, opLogID int64) error {
req := &workerrpc.Request{
Type: workerrpc.CmdQueryTaskOperation,
QueryTaskOperation: &pb.QueryTaskOperationRequest{
Name: taskName,
LogID: opLogID,
},
}

for num := 0; num < maxRetryNum; num++ {
resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout)
var queryResp *pb.QueryTaskOperationResponse
if err != nil {
log.L().Error("fail to query task operation", zap.String("task", taskName), zap.String("worker", workerID), zap.Int64("operation log ID", opLogID), log.ShortError(err))
} else {
queryResp = resp.QueryTaskOperation
respLog := queryResp.Log
if respLog == nil {
return terror.ErrMasterOperNotFound.Generate(opLogID, taskName, workerID)
} else if respLog.Success {
return nil
} else if len(respLog.Message) != 0 {
return terror.ErrMasterOperRespNotSuccess.Generate(opLogID, taskName, workerID, respLog.Message)
}
log.L().Info("wait op log result", zap.String("task", taskName), zap.String("worker", workerID), zap.Int64("operation log ID", opLogID), zap.Stringer("result", resp.QueryTaskOperation))
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryInterval):
}
}

return terror.ErrMasterOperRequestTimeout.Generate(workerID)
}

func (s *Server) handleOperationResult(ctx context.Context, cli workerrpc.Client, taskName, workerID string, err error, resp *workerrpc.Response) *pb.OperateSubTaskResponse {
if err != nil {
return &pb.OperateSubTaskResponse{
Meta: errorCommonWorkerResponse(errors.ErrorStack(err), ""),
}
}
response := &pb.OperateSubTaskResponse{}
switch resp.Type {
case workerrpc.CmdStartSubTask:
response = resp.StartSubTask
case workerrpc.CmdOperateSubTask:
response = resp.OperateSubTask
case workerrpc.CmdUpdateSubTask:
response = resp.UpdateSubTask
default:
// this should not happen
response.Meta = errorCommonWorkerResponse(fmt.Sprintf("invalid operate task type %v", resp.Type), "")
return response
}

err = s.waitOperationOk(ctx, cli, taskName, workerID, response.LogID)
if err != nil {
response.Meta = errorCommonWorkerResponse(errors.ErrorStack(err), "")
}

return response
}

// taskConfigArgsExtractor extracts SubTaskConfig from args and returns its relevant
// grpc client, worker id (host:port), subtask config in toml, task name and error
func (s *Server) taskConfigArgsExtractor(args ...interface{}) (workerrpc.Client, string, string, string, error) {
Expand Down
Loading

0 comments on commit f049e46

Please sign in to comment.