diff --git a/spider/admin/service.go b/spider/admin/service.go index 9734f53..fc0f323 100644 --- a/spider/admin/service.go +++ b/spider/admin/service.go @@ -132,20 +132,13 @@ func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOp if svc.isMultiTask(opts) { // multi tasks - // TODO: implement associated tasks - //mainTask.HasSub = true - //if err := delegate.NewModelDelegate(mainTask).Add(); err != nil { - // return err - //} nodeIds, err := svc.getNodeIds(opts) if err != nil { return nil, err } for _, nodeId := range nodeIds { t := &models.Task{ - SpiderId: s.Id, - // TODO: implement associated tasks - //ParentId: mainTask.Id, + SpiderId: s.Id, Mode: opts.Mode, Cmd: opts.Cmd, Param: opts.Param, diff --git a/task/handler/service.go b/task/handler/service.go index 660d3bf..9f0ccc0 100644 --- a/task/handler/service.go +++ b/task/handler/service.go @@ -421,7 +421,7 @@ func NewTaskHandlerService(opts ...Option) (svc2 interfaces.TaskHandlerService, svc := &Service{ TaskBaseService: baseSvc, exitWatchDuration: 60 * time.Second, - fetchInterval: 5 * time.Second, + fetchInterval: 1 * time.Second, fetchTimeout: 15 * time.Second, reportInterval: 5 * time.Second, cancelTimeout: 5 * time.Second, diff --git a/task/scheduler/service.go b/task/scheduler/service.go index 52cfb17..5a16656 100644 --- a/task/scheduler/service.go +++ b/task/scheduler/service.go @@ -1,14 +1,11 @@ package scheduler import ( - "fmt" - "github.com/apex/log" config2 "github.com/crawlab-team/crawlab-core/config" "github.com/crawlab-team/crawlab-core/constants" "github.com/crawlab-team/crawlab-core/errors" "github.com/crawlab-team/crawlab-core/grpc/server" "github.com/crawlab-team/crawlab-core/interfaces" - "github.com/crawlab-team/crawlab-core/models/client" "github.com/crawlab-team/crawlab-core/models/delegate" "github.com/crawlab-team/crawlab-core/models/models" "github.com/crawlab-team/crawlab-core/models/service" @@ -24,7 +21,6 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" mongo2 "go.mongodb.org/mongo-driver/mongo" "go.uber.org/dig" - "math/rand" "sync" "time" ) @@ -158,157 +154,6 @@ func (svc *Service) SetInterval(interval time.Duration) { svc.interval = interval } -func (svc *Service) getTaskQueueItems() (tqList []models.TaskQueueItem, err error) { - opts := &mongo.FindOptions{ - Sort: bson.D{ - {"p", 1}, - {"_id", 1}, - }, - } - if err := mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).Find(nil, opts).All(&tqList); err != nil { - if err == mongo2.ErrNoDocuments { - return nil, nil - } - return nil, err - } - return tqList, nil -} - -func (svc *Service) getResourcesAndNodesMap() (resources map[string]models.Node, nodesMap map[primitive.ObjectID]models.Node, err error) { - nodesMap = map[primitive.ObjectID]models.Node{} - resources = map[string]models.Node{} - query := bson.M{ - // enabled: true - "enabled": true, - // active: true - "active": true, - // available_runners > 0 - "available_runners": bson.M{ - "$gt": 0, - }, - } - nodes, err := svc.modelSvc.GetNodeList(query, nil) - if err != nil { - if err == mongo2.ErrNoDocuments { - return nil, nil, nil - } - return nil, nil, err - } - for _, n := range nodes { - nodesMap[n.Id] = n - for i := 0; i < n.AvailableRunners; i++ { - key := fmt.Sprintf("%s:%d", n.Id.Hex(), i) - resources[key] = n - } - } - return resources, nodesMap, nil -} - -func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []interfaces.Task, nodesMap map[primitive.ObjectID]models.Node, err error) { - // get resources and nodes map - resources, nodesMap, err := svc.getResourcesAndNodesMap() - if err != nil { - return nil, nil, err - } - if resources == nil || len(resources) == 0 { - return nil, nil, nil - } - - // resources list - var resourcesList []models.Node - for _, r := range resources { - resourcesList = append(resourcesList, r) - } - - // shuffle resources list - rand.Seed(time.Now().Unix()) - rand.Shuffle(len(resourcesList), func(i, j int) { - resourcesList[i], resourcesList[j] = resourcesList[j], resourcesList[i] - }) - - // iterate task queue items - for _, tq := range tqList { - // task - t, err := svc.modelSvc.GetTaskById(tq.GetId()) - if err != nil { - // remove task queue item if it is not found in tasks - _ = mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).DeleteId(tq.GetId()) - - // set task status as abnormal - t.Status = constants.TaskStatusAbnormal - t.Error = err.Error() - _ = delegate.NewModelDelegate(t, nil).Save() - continue - } - - // iterate shuffled resources to match a resource - for i, r := range resourcesList { - // If node id is unset or node id of task matches with resource id (node id), - // assign corresponding resource id to the task - if t.GetNodeId().IsZero() || - t.GetNodeId() == r.GetId() { - // assign resource id - t.NodeId = r.GetId() - - // append to tasks - tasks = append(tasks, t) - - // delete from resources list - resourcesList = append(resourcesList[:i], resourcesList[(i+1):]...) - - // decrement available runners - n := nodesMap[r.GetId()] - n.DecrementAvailableRunners() - - // break loop - break - } - } - } - - return tasks, nodesMap, nil -} - -func (svc *Service) updateResources(nodesMap map[primitive.ObjectID]models.Node) (err error) { - for _, n := range nodesMap { - if err := delegate.NewModelNodeDelegate(&n).Save(); err != nil { - return err - } - } - return nil -} - -func (svc *Service) updateTasks(tasks []interfaces.Task) (err error) { - for _, t := range tasks { - // save task with node id - if err := delegate.NewModelDelegate(t).Save(); err != nil { - return err - } - } - return nil -} - -func (svc *Service) deleteTaskQueueItems(tasks []interfaces.Task) (err error) { - for _, t := range tasks { - if err := mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).DeleteId(t.GetId()); err != nil { - log.Warnf("task[id: %s] missing task queue: %s", t.GetId(), err.Error()) - continue - } - } - return nil -} - -func (svc *Service) handleTaskError(n interfaces.Node, t interfaces.Task, err error) { - trace.PrintError(err) - t.SetStatus(constants.TaskStatusError) - t.SetError(err.Error()) - if n.GetIsMaster() { - _ = delegate.NewModelDelegate(t).Save() - } else { - _ = client.NewModelDelegate(t).Save() - } -} - // initTaskStatus initialize task status of existing tasks func (svc *Service) initTaskStatus() { // set status of running tasks as TaskStatusAbnormal