Skip to content

Commit

Permalink
Merge pull request #61 from holdno/feat/improving_task_executer
Browse files Browse the repository at this point in the history
Feat/improving task executer
  • Loading branch information
holdno authored Aug 24, 2024
2 parents e2181d0 + a361ae2 commit f5188d7
Show file tree
Hide file tree
Showing 26 changed files with 195 additions and 103 deletions.
15 changes: 9 additions & 6 deletions agent/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,17 @@ func (a *client) TryStartTask(plan common.TaskSchedulePlan) error {
})
return err
}, retry.RetryIf(func(err error) bool {
if gerr, _ := status.FromError(err); gerr.Code() != codes.Aborted {
a.logger.Debug("task aborted", zap.String("task_id", plan.Task.TaskID), zap.Int64("project_id", plan.Task.ProjectID),
zap.String("tmp_id", plan.TmpID), zap.Error(err))
if gerr, _ := status.FromError(err); gerr.Code() == codes.Aborted || gerr.Code() == codes.Unauthenticated {
return false
}
return true
}), retry.Attempts(3), retry.DelayType(retry.BackOffDelay),
retry.MaxJitter(time.Second*30), retry.LastErrorOnly(true)); err != nil {
if gerr, _ := status.FromError(err); gerr.Code() == codes.Aborted {
a.logger.Debug("task aborted", zap.String("task_id", plan.Task.TaskID), zap.Int64("project_id", plan.Task.ProjectID),
zap.String("tmp_id", plan.TmpID), zap.Error(err))
return
}
taskExecuteInfo.CancelFunc()
a.metrics.SystemErrInc("agent_status_report_failure")
a.logger.Error(fmt.Sprintf("task: %s, id: %s, tmp_id: %s, change running status error, %v", plan.Task.Name,
Expand Down Expand Up @@ -708,9 +711,9 @@ func tryLockUntilCtxIsDone(cli cronpb.CenterClient, execInfo *common.TaskExecuti
safe.Run(func() {
// 任务执行后锁最少保持5s
// 防止分布式部署下多台机器共同执行
// if time.Since(execInfo.RealTime).Seconds() < 5 {
// time.Sleep(5*time.Second - time.Since(execInfo.RealTime))
// }
if time.Since(execInfo.RealTime).Seconds() < 5 {
time.Sleep(5*time.Second - time.Since(execInfo.RealTime))
}
locker.Send(&cronpb.TryLockRequest{
ProjectId: execInfo.Task.ProjectID,
TaskId: execInfo.Task.TaskID,
Expand Down
26 changes: 17 additions & 9 deletions cmd/service/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,14 @@ Here:
return nil
}

type registerInfo struct {
reqID string
info *cronpb.RegisterInfo
}

// watchAgentResponse watch agent register request or event handle response
func watchAgentResponse(ctx context.Context, receive func() (*cronpb.ClientEvent, error), callback func(*cronpb.ClientEvent)) <-chan *cronpb.RegisterInfo {
newRegisterInfo := make(chan *cronpb.RegisterInfo)
func watchAgentResponse(ctx context.Context, receive func() (*cronpb.ClientEvent, error), callback func(*cronpb.ClientEvent)) <-chan *registerInfo {
newRegisterInfo := make(chan *registerInfo)
go safe.Run(func() {
defer close(newRegisterInfo)
for {
Expand All @@ -425,7 +430,10 @@ func watchAgentResponse(ctx context.Context, receive func() (*cronpb.ClientEvent
}

if info.Type == cronpb.EventType_EVENT_REGISTER_REQUEST {
newRegisterInfo <- info.GetRegisterInfo()
newRegisterInfo <- &registerInfo{
reqID: info.Id,
info: info.GetRegisterInfo(),
}
} else {
callback(info)
}
Expand Down Expand Up @@ -527,15 +535,15 @@ func (s *cronRpc) buildAgentRegister(ctx context.Context) (registerFunc func(req
return registerFunc, deRegisterFunc
}

type dispatcher func(meta infra.NodeMeta) app.JobDispatcher
type dispatcher func(reqID string, meta infra.NodeMeta) app.JobDispatcher

func buildDispatchJobsV2Handler(sendEvent func(ctx context.Context, e *cronpb.ServiceEvent) error) dispatcher {
return func(meta infra.NodeMeta) app.JobDispatcher {
return func(reqID string, meta infra.NodeMeta) app.JobDispatcher {
return func(taskRaw []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
if err := sendEvent(ctx, &cronpb.ServiceEvent{
Id: utils.GetStrID(),
Id: reqID,
Type: cronpb.EventType_EVENT_REGISTER_REPLY,
EventTime: time.Now().Unix(),
Event: &cronpb.ServiceEvent_RegisterReply{
Expand Down Expand Up @@ -599,7 +607,7 @@ func (s *cronRpc) RegisterAgentV2(req cronpb.Center_RegisterAgentV2Server) error
}
})

// 注册成功后像agent下发任务的处理方法
// 注册成功后向agent下发任务的处理方法
dispatchHandler := buildDispatchJobsV2Handler(func(ctx context.Context, e *cronpb.ServiceEvent) error {
_, err := s.app.StreamManagerV2().SendEventWaitResponse(ctx, req, e)
return err
Expand All @@ -619,12 +627,12 @@ func (s *cronRpc) RegisterAgentV2(req cronpb.Center_RegisterAgentV2Server) error
return nil
}
// 将agent信息进行注册
err := register(multiService, func(nm []infra.NodeMeta) error {
err := register(multiService.info, func(nm []infra.NodeMeta) error {
// 完成注册后将stream缓存至内存中,方便后续中心与agent通信时使用
for _, meta := range nm {
s.app.StreamManagerV2().SaveStream(meta, req, cancel)
// 下发对应项目的任务列表
if err := s.app.DispatchAgentJob(meta.System, dispatchHandler(meta)); err != nil {
if err := s.app.DispatchAgentJob(meta.System, dispatchHandler(multiService.reqID, meta)); err != nil {
return err
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f5188d7

Please sign in to comment.