From ed5e9f0d01819c8b04a50bb6330ced1247594152 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Sat, 8 Apr 2023 14:51:33 +0800 Subject: [PATCH] fix: not return task ids https://github.com/crawlab-team/crawlab/issues/1197 --- controllers/spider.go | 5 +++-- controllers/task.go | 10 ++++++---- interfaces/spider_admin_service.go | 2 +- interfaces/task_scheduler_service.go | 2 +- schedule/service.go | 2 +- spider/admin/service.go | 30 ++++++++++++++-------------- spider/test/admin_test.go | 4 ++-- task/fs/service_test.go | 2 +- task/scheduler/service.go | 10 +++++----- task/test/handler_test.go | 6 +++--- 10 files changed, 38 insertions(+), 35 deletions(-) diff --git a/controllers/spider.go b/controllers/spider.go index a0861d8..4a7f078 100644 --- a/controllers/spider.go +++ b/controllers/spider.go @@ -318,12 +318,13 @@ func (ctx *spiderContext) run(c *gin.Context) { } // schedule - if err := ctx.adminSvc.Schedule(id, &opts); err != nil { + taskIds, err := ctx.adminSvc.Schedule(id, &opts) + if err != nil { HandleErrorInternalServerError(c, err) return } - HandleSuccess(c) + HandleSuccessWithData(c, taskIds) } func (ctx *spiderContext) getGit(c *gin.Context) { diff --git a/controllers/task.go b/controllers/task.go index 34e95dd..d9dc43c 100644 --- a/controllers/task.go +++ b/controllers/task.go @@ -119,12 +119,13 @@ func (ctx *taskContext) run(c *gin.Context) { } // run - if err := ctx.adminSvc.Schedule(s.GetId(), opts); err != nil { + taskIds, err := ctx.adminSvc.Schedule(s.GetId(), opts) + if err != nil { HandleErrorInternalServerError(c, err) return } - HandleSuccess(c) + HandleSuccessWithData(c, taskIds) } func (ctx *taskContext) restart(c *gin.Context) { @@ -157,12 +158,13 @@ func (ctx *taskContext) restart(c *gin.Context) { } // run - if err := ctx.adminSvc.Schedule(t.SpiderId, opts); err != nil { + taskIds, err := ctx.adminSvc.Schedule(t.SpiderId, opts) + if err != nil { HandleErrorInternalServerError(c, err) return } - HandleSuccess(c) + HandleSuccessWithData(c, taskIds) } func (ctx *taskContext) cancel(c *gin.Context) { diff --git a/interfaces/spider_admin_service.go b/interfaces/spider_admin_service.go index 4dc63e4..fa0a977 100644 --- a/interfaces/spider_admin_service.go +++ b/interfaces/spider_admin_service.go @@ -8,7 +8,7 @@ type SpiderAdminService interface { WithConfigPath Start() (err error) // Schedule a new task of the spider - Schedule(id primitive.ObjectID, opts *SpiderRunOptions) (err error) + Schedule(id primitive.ObjectID, opts *SpiderRunOptions) (taskIds []primitive.ObjectID, err error) // Clone the spider Clone(id primitive.ObjectID, opts *SpiderCloneOptions) (err error) // Delete the spider diff --git a/interfaces/task_scheduler_service.go b/interfaces/task_scheduler_service.go index d801211..b287440 100644 --- a/interfaces/task_scheduler_service.go +++ b/interfaces/task_scheduler_service.go @@ -8,7 +8,7 @@ import ( type TaskSchedulerService interface { TaskBaseService // Enqueue task into the task queue - Enqueue(t Task) (err error) + Enqueue(t Task) (t2 Task, err error) // Cancel task to corresponding node Cancel(id primitive.ObjectID, args ...interface{}) (err error) // SetInterval set the interval or duration between two adjacent fetches diff --git a/schedule/service.go b/schedule/service.go index 3be5598..d413381 100644 --- a/schedule/service.go +++ b/schedule/service.go @@ -228,7 +228,7 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) { } // schedule or assign a task in the task queue - if err := svc.adminSvc.Schedule(s.GetSpiderId(), opts); err != nil { + if _, err := svc.adminSvc.Schedule(s.GetSpiderId(), opts); err != nil { trace.PrintError(err) } } diff --git a/spider/admin/service.go b/spider/admin/service.go index 46dfc6c..fb48297 100644 --- a/spider/admin/service.go +++ b/spider/admin/service.go @@ -47,19 +47,15 @@ func (svc *Service) Start() (err error) { return svc.SyncGit() } -func (svc *Service) Schedule(id primitive.ObjectID, opts *interfaces.SpiderRunOptions) (err error) { +func (svc *Service) Schedule(id primitive.ObjectID, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) { // spider s, err := svc.modelSvc.GetSpiderById(id) if err != nil { - return err + return nil, err } // assign tasks - if err := svc.scheduleTasks(s, opts); err != nil { - return err - } - - return nil + return svc.scheduleTasks(s, opts) } func (svc *Service) Clone(id primitive.ObjectID, opts *interfaces.SpiderCloneOptions) (err error) { @@ -79,7 +75,7 @@ func (svc *Service) SyncGit() (err error) { return nil } -func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOptions) (err error) { +func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) { // main task mainTask := &models.Task{ SpiderId: s.Id, @@ -118,7 +114,7 @@ func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOp //} nodeIds, err := svc.getNodeIds(opts) if err != nil { - return err + return nil, err } for _, nodeId := range nodeIds { t := &models.Task{ @@ -133,25 +129,29 @@ func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOp Priority: opts.Priority, UserId: opts.UserId, } - if err := svc.schedulerSvc.Enqueue(t); err != nil { - return err + t2, err := svc.schedulerSvc.Enqueue(t) + if err != nil { + return nil, err } + taskIds = append(taskIds, t2.GetId()) } } else { // single task nodeIds, err := svc.getNodeIds(opts) if err != nil { - return err + return nil, err } if len(nodeIds) > 0 { mainTask.NodeId = nodeIds[0] } - if err := svc.schedulerSvc.Enqueue(mainTask); err != nil { - return err + t2, err := svc.schedulerSvc.Enqueue(mainTask) + if err != nil { + return nil, err } + taskIds = append(taskIds, t2.GetId()) } - return nil + return taskIds, nil } func (svc *Service) getNodeIds(opts *interfaces.SpiderRunOptions) (nodeIds []primitive.ObjectID, err error) { diff --git a/spider/test/admin_test.go b/spider/test/admin_test.go index 3cddd5c..31e48e3 100644 --- a/spider/test/admin_test.go +++ b/spider/test/admin_test.go @@ -15,7 +15,7 @@ func TestAdminService_Run(t *testing.T) { // TODO: implement // run - err = T.adminSvc.Schedule(T.TestSpider.Id, &interfaces.SpiderRunOptions{ + _, err = T.adminSvc.Schedule(T.TestSpider.Id, &interfaces.SpiderRunOptions{ Mode: constants.RunTypeRandom, }) require.Nil(t, err) @@ -25,5 +25,5 @@ func TestAdminService_Run(t *testing.T) { task, err := T.modelSvc.GetTask(bson.M{"spider_id": T.TestSpider.Id}, nil) require.Nil(t, err) require.False(t, task.Id.IsZero()) - require.Equal(t, constants.TaskStatusFinished, task.Status) + require.NotEqual(t, constants.TaskStatusPending, task.Status) } diff --git a/task/fs/service_test.go b/task/fs/service_test.go index 3b51986..d4ce01f 100644 --- a/task/fs/service_test.go +++ b/task/fs/service_test.go @@ -20,7 +20,7 @@ func TestTaskFsService(t *testing.T) { require.Nil(t, err) t.Run("sync-to-workspace", func(t *testing.T) { - fsSvc, err := NewTaskFsService(task.Id) + fsSvc, err := NewTaskFsService(task.Id, spider.Id) require.Nil(t, err) err = fsSvc.GetFsService().SyncToWorkspace() diff --git a/task/scheduler/service.go b/task/scheduler/service.go index dfe1655..52cfb17 100644 --- a/task/scheduler/service.go +++ b/task/scheduler/service.go @@ -48,7 +48,7 @@ func (svc *Service) Start() { svc.Stop() } -func (svc *Service) Enqueue(t interfaces.Task) (err error) { +func (svc *Service) Enqueue(t interfaces.Task) (t2 interfaces.Task, err error) { // set task status t.SetStatus(constants.TaskStatusPending) @@ -60,7 +60,7 @@ func (svc *Service) Enqueue(t interfaces.Task) (err error) { // add task if err = delegate.NewModelDelegate(t, u).Add(); err != nil { - return err + return nil, err } // task queue item @@ -79,17 +79,17 @@ func (svc *Service) Enqueue(t interfaces.Task) (err error) { // enqueue task _, err = mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).Insert(tq) if err != nil { - return trace.TraceError(err) + return nil, trace.TraceError(err) } // add task stat _, err = mongo.GetMongoCol(interfaces.ModelColNameTaskStat).Insert(ts) if err != nil { - return trace.TraceError(err) + return nil, trace.TraceError(err) } // success - return nil + return t, nil } func (svc *Service) Cancel(id primitive.ObjectID, args ...interface{}) (err error) { diff --git a/task/test/handler_test.go b/task/test/handler_test.go index 4b44041..1d1660b 100644 --- a/task/test/handler_test.go +++ b/task/test/handler_test.go @@ -13,7 +13,7 @@ func TestHandlerService_Run(t *testing.T) { T.Setup(t) task := T.NewTask() - err = T.schedulerSvc.Enqueue(task) + _, err = T.schedulerSvc.Enqueue(task) require.Nil(t, err) err = T.handlerSvc.Run(task.GetId()) @@ -30,7 +30,7 @@ func TestHandlerService_Cancel(t *testing.T) { T.Setup(t) task := T.NewTaskLong() - err = T.schedulerSvc.Enqueue(task) + _, err = T.schedulerSvc.Enqueue(task) require.Nil(t, err) time.Sleep(1 * time.Second) @@ -57,7 +57,7 @@ func TestHandlerService_ReportStatus(t *testing.T) { T.Setup(t) task := T.NewTaskLong() - err = T.schedulerSvc.Enqueue(task) + _, err = T.schedulerSvc.Enqueue(task) require.Nil(t, err) time.Sleep(1 * time.Second)