From a7819e01225a00e963caef47ee8f0b1251d156c3 Mon Sep 17 00:00:00 2001 From: Izabella Raulin Date: Mon, 27 Feb 2017 14:27:38 +0100 Subject: [PATCH] Fixed #1520 - Ended task can be removed --- core/scheduler_event/scheduler_event.go | 10 + core/task.go | 3 +- docs/TASKS.md | 5 +- mgmt/rest/client/task.go | 2 +- mgmt/rest/rest_v1_test.go | 2 +- mgmt/rest/v1/rbody/task.go | 2 + mgmt/rest/v1/task.go | 8 +- mgmt/rest/v2/watch.go | 9 +- scheduler/scheduler.go | 29 ++ scheduler/scheduler_medium_test.go | 372 ++++++++++++++++++++++-- scheduler/scheduler_test.go | 30 -- scheduler/task.go | 2 +- scheduler/watcher.go | 23 ++ scheduler/watcher_test.go | 5 + 14 files changed, 446 insertions(+), 56 deletions(-) diff --git a/core/scheduler_event/scheduler_event.go b/core/scheduler_event/scheduler_event.go index 3c478c166..db80e1d93 100644 --- a/core/scheduler_event/scheduler_event.go +++ b/core/scheduler_event/scheduler_event.go @@ -28,6 +28,7 @@ const ( TaskDeleted = "Scheduler.TaskDeleted" TaskStarted = "Scheduler.TaskStarted" TaskStopped = "Scheduler.TaskStopped" + TaskEnded = "Scheduler.TaskEnded" TaskDisabled = "Scheduler.TaskDisabled" MetricCollected = "Scheduler.MetricsCollected" MetricCollectionFailed = "Scheduler.MetricCollectionFailed" @@ -70,6 +71,15 @@ func (e TaskStoppedEvent) Namespace() string { return TaskStopped } +type TaskEndedEvent struct { + TaskID string + Source string +} + +func (e TaskEndedEvent) Namespace() string { + return TaskEnded +} + type TaskDisabledEvent struct { TaskID string Why string diff --git a/core/task.go b/core/task.go index 215f00602..657cdda4e 100644 --- a/core/task.go +++ b/core/task.go @@ -52,7 +52,7 @@ var ( TaskStopped: "Stopped", // stopped but resumable TaskSpinning: "Running", // running TaskFiring: "Running", // running (firing can happen so briefly we don't want to try and render it as a string state) - TaskEnded: "Ended", // ended, not resumable because the schedule will not fire again + TaskEnded: "Ended", // ended, but resumable if the schedule is still valid and might fire again TaskStopping: "Stopping", // channel has been closed, wait for TaskStopped state } ) @@ -65,6 +65,7 @@ type TaskWatcherHandler interface { CatchCollection([]Metric) CatchTaskStarted() CatchTaskStopped() + CatchTaskEnded() CatchTaskDisabled(string) } diff --git a/docs/TASKS.md b/docs/TASKS.md index 8760a895e..c99d7f264 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -12,10 +12,9 @@ A task can be in the following states: - **running:** a running task - **stopped:** a task that is not running - **disabled:** a task in a state not allowed to start. This happens when the task produces consecutive errors. A disabled task must be re-enabled before it can be started again. +- **ended:** a task for which the schedule is ended. At present this happens only for windowed schedule with defined _stop_timestamp_. An ended task is resumable if the schedule is still valid. - -![newtaskstatediagram2](https://cloud.githubusercontent.com/assets/21182867/19282545/a4179520-8fa3-11e6-9056-4fc3aa610983.png) - +![statediagram](https://cloud.githubusercontent.com/assets/11335874/23362447/0f0b9f74-fcf6-11e6-93d7-889a7ccdc45f.png) How To | Command ----------------------------------------|------------------------ diff --git a/mgmt/rest/client/task.go b/mgmt/rest/client/task.go index 9bcd529b9..fa94f6fe5 100644 --- a/mgmt/rest/client/task.go +++ b/mgmt/rest/client/task.go @@ -157,7 +157,7 @@ func (c *Client) WatchTask(id string) *WatchTasksResult { case rbody.TaskWatchTaskDisabled: r.EventChan <- ste r.Close() - case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: + case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: r.EventChan <- ste } } diff --git a/mgmt/rest/rest_v1_test.go b/mgmt/rest/rest_v1_test.go index 6d01b55d2..d907fc143 100644 --- a/mgmt/rest/rest_v1_test.go +++ b/mgmt/rest/rest_v1_test.go @@ -119,7 +119,7 @@ func watchTask(id string, port int) *watchTaskResult { r.eventChan <- ste.EventType r.close() return - case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: + case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: log.Info(ste.EventType) r.eventChan <- ste.EventType } diff --git a/mgmt/rest/v1/rbody/task.go b/mgmt/rest/v1/rbody/task.go index f4cf4fded..ea1eb1ac2 100644 --- a/mgmt/rest/v1/rbody/task.go +++ b/mgmt/rest/v1/rbody/task.go @@ -36,6 +36,7 @@ const ( ScheduledTaskType = "scheduled_task" ScheduledTaskStartedType = "scheduled_task_started" ScheduledTaskStoppedType = "scheduled_task_stopped" + ScheduledTaskEndedType = "scheduled_task_ended" ScheduledTaskRemovedType = "scheduled_task_removed" ScheduledTaskWatchingEndedType = "schedule_task_watch_ended" ScheduledTaskEnabledType = "scheduled_task_enabled" @@ -46,6 +47,7 @@ const ( TaskWatchTaskDisabled = "task-disabled" TaskWatchTaskStarted = "task-started" TaskWatchTaskStopped = "task-stopped" + TaskWatchTaskEnded = "task-ended" ) type ScheduledTaskListReturned struct { diff --git a/mgmt/rest/v1/task.go b/mgmt/rest/v1/task.go index 90da2182d..54e33d67c 100644 --- a/mgmt/rest/v1/task.go +++ b/mgmt/rest/v1/task.go @@ -149,7 +149,7 @@ func (s *apiV1) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P // The client can decide to stop receiving on the stream on Task Stopped. // We write the event to the buffer fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) - case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped: + case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded: // A disabled task should end the streaming and close the connection fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) // Flush since we are sending nothing new @@ -289,6 +289,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() { } } +func (t *TaskWatchHandler) CatchTaskEnded() { + t.mChan <- rbody.StreamedTaskEvent{ + EventType: rbody.TaskWatchTaskEnded, + } +} + func (t *TaskWatchHandler) CatchTaskDisabled(why string) { t.mChan <- rbody.StreamedTaskEvent{ EventType: rbody.TaskWatchTaskDisabled, diff --git a/mgmt/rest/v2/watch.go b/mgmt/rest/v2/watch.go index 671e0f6a2..c6eb8c98b 100644 --- a/mgmt/rest/v2/watch.go +++ b/mgmt/rest/v2/watch.go @@ -37,6 +37,7 @@ const ( TaskWatchTaskDisabled = "task-disabled" TaskWatchTaskStarted = "task-started" TaskWatchTaskStopped = "task-stopped" + TaskWatchTaskEnded = "task-ended" ) // The amount of time to buffer streaming events before flushing in seconds @@ -95,7 +96,7 @@ func (s *apiV2) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P // The client can decide to stop receiving on the stream on Task Stopped. // We write the event to the buffer fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) - case TaskWatchTaskDisabled, TaskWatchTaskStopped: + case TaskWatchTaskDisabled, TaskWatchTaskStopped, TaskWatchTaskEnded: // A disabled task should end the streaming and close the connection fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) // Flush since we are sending nothing new @@ -165,6 +166,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() { } } +func (t *TaskWatchHandler) CatchTaskEnded() { + t.mChan <- StreamedTaskEvent{ + EventType: TaskWatchTaskEnded, + } +} + func (t *TaskWatchHandler) CatchTaskDisabled(why string) { t.mChan <- StreamedTaskEvent{ EventType: TaskWatchTaskDisabled, diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1288555cf..c64e6d06f 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -64,6 +64,8 @@ var ( ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.") // ErrTaskDisabledNotStoppable - The error message for when a task is disabled and cannot be stopped ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.") + // ErrTaskEndedNotStoppable - The error message for when a task is ended and cannot be stopped + ErrTaskEndedNotStoppable = errors.New("Task is ended. Only running tasks can be stopped.") ) type schedulerState int @@ -471,6 +473,7 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError { serror.New(ErrTaskDisabledNotRunnable), } } + if t.state == core.TaskFiring || t.state == core.TaskSpinning { logger.WithFields(log.Fields{ "task-id": t.ID(), @@ -481,6 +484,16 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError { } } + // Ensure the schedule is valid at this point and time. + if err := t.schedule.Validate(); err != nil { + errs := []serror.SnapError{ + serror.New(err), + } + f := buildErrorsLog(errs, logger) + f.Error("schedule passed not valid") + return errs + } + // Group dependencies by the node they live on // and subscribe to them. depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics) @@ -559,6 +572,14 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError { return []serror.SnapError{ serror.New(ErrTaskAlreadyStopped), } + case core.TaskEnded: + logger.WithFields(log.Fields{ + "task-id": t.ID(), + "task-state": t.State(), + }).Error("task is already ended") + return []serror.SnapError{ + serror.New(ErrTaskEndedNotStoppable), + } case core.TaskDisabled: logger.WithFields(log.Fields{ "task-id": t.ID(), @@ -768,6 +789,14 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { "task-id": v.TaskID, }).Debug("event received") s.taskWatcherColl.handleTaskStopped(v.TaskID) + case *scheduler_event.TaskEndedEvent: + log.WithFields(log.Fields{ + "_module": "scheduler-events", + "_block": "handle-events", + "event-namespace": e.Namespace(), + "task-id": v.TaskID, + }).Debug("event received") + s.taskWatcherColl.handleTaskEnded(v.TaskID) case *scheduler_event.TaskDisabledEvent: log.WithFields(log.Fields{ "_module": "scheduler-events", diff --git a/scheduler/scheduler_medium_test.go b/scheduler/scheduler_medium_test.go index cc1212e4c..5a9b08880 100644 --- a/scheduler/scheduler_medium_test.go +++ b/scheduler/scheduler_medium_test.go @@ -128,7 +128,7 @@ func (m mockScheduleResponse) missedIntervals() uint { return 0 } -// Helper constructor functions for resuse amongst tests +// Helper constructor functions for re-use amongst tests func newMockMetricManager() *mockMetricManager { m := new(mockMetricManager) return m @@ -178,48 +178,386 @@ func newMockWorkflowMap() *wmap.WorkflowMap { return w } +var ( + startWait = time.Millisecond * 50 + windowSize = time.Millisecond * 100 + interval = time.Millisecond * 10 +) + // ----------------------------- Medium Tests ---------------------------- + +func TestCreateTask(t *testing.T) { + s := newScheduler() + s.Start() + w := newMockWorkflowMap() + + Convey("Calling CreateTask for a simple schedule", t, func() { + Convey("returns an error when the schedule does not validate", func() { + Convey("the interval is invalid", func() { + Convey("the interval equals zero", func() { + tsk, errs := s.CreateTask(schedule.NewSimpleSchedule(0), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) + }) + Convey("the interval is less than zero", func() { + tsk, errs := s.CreateTask(schedule.NewSimpleSchedule((-1)*time.Millisecond), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) + }) + }) + }) + Convey("should not error when the schedule is valid", func() { + tsk, errs := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + }) + }) //end of tests for a simple scheduler + + Convey("Calling CreateTask for a windowed schedule", t, func() { + Convey("returns an error when the schedule does not validate", func() { + Convey("the interval is invalid", func() { + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + + Convey("the interval equals zero", func() { + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(0, &start, &stop), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) + }) + Convey("the interval is less than zero", func() { + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule((-1)*time.Millisecond, &start, &stop), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidInterval.Error()) + }) + }) + Convey("the stop time was set in the past", func() { + start := time.Now().Add(startWait) + stop := time.Now().Add(time.Second * -10) + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrInvalidStopTime.Error()) + }) + Convey("the stop time is before the start time", func() { + start := time.Now().Add(startWait * 2) + stop := time.Now().Add(startWait) + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrStopBeforeStart.Error()) + }) + }) + Convey("should not error when the schedule is valid", func() { + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + + task := s.tasks.Get(tsk.ID()) + task.Spin() + Convey("the task should be ended after reaching the end of window", func() { + // wait for the end of determined window + time.Sleep(startWait + windowSize) + // wait an interval to be sure that the task state has been updated + time.Sleep(interval) + // check if the task is ended + So(tsk.State(), ShouldEqual, core.TaskEnded) + }) + }) + }) //end of tests for a windowed scheduler + + Convey("Calling CreateTask for a cron schedule", t, func() { + Convey("returns an error when the schedule does not validate", func() { + Convey("the cron entry is empty", func() { + cronEntry := "" + tsk, errs := s.CreateTask(schedule.NewCronSchedule(cronEntry), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldEqual, schedule.ErrMissingCronEntry.Error()) + }) + Convey("the cron entry is invalid", func() { + cronEntry := "0 30" + tsk, errs := s.CreateTask(schedule.NewCronSchedule(cronEntry), w, false) + So(errs, ShouldNotBeEmpty) + So(tsk, ShouldBeNil) + So(errs.Errors()[0].Error(), ShouldStartWith, "Expected 5 or 6 fields") + }) + }) + Convey("should not error when the schedule is valid", func() { + cronEntry := "0 30 * * * *" + tsk, errs := s.CreateTask(schedule.NewCronSchedule(cronEntry), w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + }) + }) //end of tests for a cron scheduler + + s.Stop() +} + func TestStopTask(t *testing.T) { logrus.SetLevel(logrus.FatalLevel) s := newScheduler() s.Start() w := newMockWorkflowMap() - tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false) - task := s.tasks.Get(tsk.ID()) - task.Spin() - err := s.StopTask(tsk.ID()) - Convey("Calling StopTask a running task", t, func() { + Convey("Calling StopTask on a running task", t, func() { + tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tsk, ShouldNotBeNil) + task := s.tasks.Get(tsk.ID()) + task.Spin() + // check if the task is running + So(core.TaskStateLookup[task.State()], ShouldEqual, "Running") + + // stop the running task + err := s.StopTask(tsk.ID()) Convey("Should not return an error", func() { So(err, ShouldBeNil) }) time.Sleep(100 * time.Millisecond) Convey("State of the task should be TaskStopped", func() { - So(task.state, ShouldEqual, core.TaskStopped) + So(tsk.State(), ShouldEqual, core.TaskStopped) }) }) - - tskStopped, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false) - err = s.StopTask(tskStopped.ID()) Convey("Calling StopTask on a stopped task", t, func() { + tskStopped, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tskStopped, ShouldNotBeNil) + // check if the task is already stopped + So(tskStopped.State(), ShouldEqual, core.TaskStopped) + + // try to stop the stopped task + err := s.StopTask(tskStopped.ID()) Convey("Should return an error", func() { So(err, ShouldNotBeNil) }) Convey("Error should read: Task is already stopped.", func() { - So(err[0].Error(), ShouldResemble, "Task is already stopped.") + So(err[0].Error(), ShouldEqual, ErrTaskAlreadyStopped.Error()) }) }) - - tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false) - taskDisabled := s.tasks.Get(tskDisabled.ID()) - taskDisabled.state = core.TaskDisabled - err = s.StopTask(tskDisabled.ID()) Convey("Calling StopTask on a disabled task", t, func() { + tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tskDisabled, ShouldNotBeNil) + taskDisabled := s.tasks.Get(tskDisabled.ID()) + taskDisabled.state = core.TaskDisabled + + // try to stop the disabled task + err := s.StopTask(tskDisabled.ID()) Convey("Should return an error", func() { So(err, ShouldNotBeNil) }) Convey("Error should read: Task is disabled. Only running tasks can be stopped.", func() { - So(err[0].Error(), ShouldResemble, "Task is disabled. Only running tasks can be stopped.") + So(err[0].Error(), ShouldEqual, ErrTaskDisabledNotStoppable.Error()) + }) + Convey("State of the task should be still TaskDisabled", func() { + So(tskDisabled.State(), ShouldEqual, core.TaskDisabled) + }) + }) + Convey("Calling StopTask on an ended task", t, func() { + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + + // create a task with windowed schedule + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + + task := s.tasks.Get(tsk.ID()) + task.Spin() + + // wait for the end of determined window + time.Sleep(startWait + windowSize) + // wait an interval to be sure that the task state has been updated + time.Sleep(interval) + + // check if the task is ended + So(tsk.State(), ShouldEqual, core.TaskEnded) + + // try to stop the ended task + err := s.StopTask(tsk.ID()) + Convey("Should return an error", func() { + So(err, ShouldNotBeNil) + }) + Convey("Error should read: Task is ended. Only running tasks can be stopped.", func() { + So(err[0].Error(), ShouldEqual, ErrTaskEndedNotStoppable.Error()) + }) + Convey("State of the task should be still TaskEnded", func() { + So(tsk.State(), ShouldEqual, core.TaskEnded) + }) + }) + + s.Stop() +} + +func TestStartTask(t *testing.T) { + logrus.SetLevel(logrus.FatalLevel) + s := newScheduler() + s.Start() + w := newMockWorkflowMap() + + Convey("Calling StartTask a running task", t, func() { + tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tsk, ShouldNotBeNil) + + task := s.tasks.Get(tsk.ID()) + task.Spin() + // check if the task is running + So(core.TaskStateLookup[task.State()], ShouldEqual, "Running") + + // try to start the running task + err := s.StartTask(tsk.ID()) + Convey("Should return an error", func() { + So(err, ShouldNotBeNil) + }) + Convey("Error should read: Task is already running.", func() { + So(err[0].Error(), ShouldEqual, ErrTaskAlreadyRunning.Error()) + }) + Convey("State of the task should be still Running", func() { + So(core.TaskStateLookup[task.State()], ShouldEqual, "Running") + }) + + task.Stop() + }) + Convey("Calling StartTask on a disabled task", t, func() { + tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tskDisabled, ShouldNotBeNil) + taskDisabled := s.tasks.Get(tskDisabled.ID()) + taskDisabled.state = core.TaskDisabled + + // try to start the disabled task + err := s.StartTask(tskDisabled.ID()) + Convey("Should return an error", func() { + So(err, ShouldNotBeNil) + }) + Convey("Error should read: Task is disabled. Cannot be started", func() { + So(err[0].Error(), ShouldEqual, ErrTaskDisabledNotRunnable.Error()) + }) + Convey("State of the task should be still TaskDisabled", func() { + So(tskDisabled.State(), ShouldEqual, core.TaskDisabled) + }) + }) + Convey("Calling StartTask on an ended windowed task", t, func() { + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + + //create a task with windowed schedule + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + + task := s.tasks.Get(tsk.ID()) + task.Spin() + + // wait for the end of determined window + time.Sleep(startWait + windowSize) + // wait an interval to be sure that the task state has been updated + time.Sleep(interval) + + // check if the task is ended + So(tsk.State(), ShouldEqual, core.TaskEnded) + + // try to restart the ended windowed task for which the stop time is in the past + err := s.StartTask(tsk.ID()) + Convey("Should return an error", func() { + So(err, ShouldNotBeNil) + }) + // the schedule is not longer valid at this point of time + Convey("Error should read: Stop time is in the past", func() { + So(err[0].Error(), ShouldEqual, schedule.ErrInvalidStopTime.Error()) + }) + Convey("State of the task should be still TaskEnded", func() { + So(tsk.State(), ShouldEqual, core.TaskEnded) + }) + }) + + s.Stop() +} + +func TestEnableTask(t *testing.T) { + logrus.SetLevel(logrus.FatalLevel) + s := newScheduler() + s.Start() + w := newMockWorkflowMap() + + Convey("Calling EnableTask on a disabled task", t, func() { + tskDisabled, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tskDisabled, ShouldNotBeNil) + taskDisabled := s.tasks.Get(tskDisabled.ID()) + taskDisabled.state = core.TaskDisabled + + // enable the disabled task + tskEnabled, err := s.EnableTask(tskDisabled.ID()) + Convey("Should not return an error", func() { + So(err, ShouldBeNil) + }) + Convey("State of the task should be TaskStopped after enabling", func() { + So(tskEnabled, ShouldNotBeNil) + // EnableTask changes state from disabled to stopped + So(tskEnabled.State(), ShouldEqual, core.TaskStopped) + }) + }) + Convey("Calling EnableTask on a running task", t, func() { + tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tsk, ShouldNotBeNil) + task := s.tasks.Get(tsk.ID()) + task.Spin() + // check if the task is running + So(core.TaskStateLookup[task.State()], ShouldEqual, "Running") + + // try to enable the running task + _, err := s.EnableTask(tsk.ID()) + Convey("Should return an error", func() { + So(err, ShouldNotBeNil) + }) + Convey("Error should read: Task must be disabled.", func() { + So(err, ShouldEqual, ErrTaskNotDisabled) + }) + }) + Convey("Calling EnableTask on a stopped task", t, func() { + tskStopped, _ := s.CreateTask(schedule.NewSimpleSchedule(interval), w, false) + So(tskStopped, ShouldNotBeNil) + // check if the task is already stopped + So(tskStopped.State(), ShouldEqual, core.TaskStopped) + + // try to enable the stopped task + _, err := s.EnableTask(tskStopped.ID()) + Convey("Should return an error", func() { + So(err, ShouldNotBeNil) + }) + Convey("Error should read: Task must be disabled.", func() { + So(err, ShouldEqual, ErrTaskNotDisabled) + }) + }) + Convey("Calling EnableTask on an ended task", t, func() { + start := time.Now().Add(startWait) + stop := time.Now().Add(startWait + windowSize) + + //create a task with windowed schedule + tsk, errs := s.CreateTask(schedule.NewWindowedSchedule(interval, &start, &stop), w, false) + So(errs.Errors(), ShouldBeEmpty) + So(tsk, ShouldNotBeNil) + + task := s.tasks.Get(tsk.ID()) + task.Spin() + + // wait for the end of determined window + time.Sleep(startWait + windowSize) + // wait an interval to be sure that the task state has been updated + time.Sleep(interval) + + // check if the task is ended + So(tsk.State(), ShouldEqual, core.TaskEnded) + + // try to enable the ended task + _, err := s.EnableTask(tsk.ID()) + Convey("Should return an error", func() { + So(err, ShouldNotBeNil) + }) + Convey("Error should read: Task must be disabled.", func() { + So(err, ShouldEqual, ErrTaskNotDisabled) }) }) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index e42ceeb51..aebb88528 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -272,36 +272,6 @@ func TestScheduler(t *testing.T) { So(tsk.(*task).deadlineDuration, ShouldResemble, time.Duration(6*time.Second)) }) - Convey("Enable a stopped task", func() { - tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false) - So(tsk, ShouldNotBeNil) - - _, err := s.EnableTask(tsk.ID()) - So(err, ShouldNotBeNil) - }) - - Convey("Enable a disabled task", func() { - tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false) - So(tsk, ShouldNotBeNil) - - t := s.tasks.Get(tsk.ID()) - t.state = core.TaskDisabled - - etsk, err1 := s.EnableTask(tsk.ID()) - So(err1, ShouldBeNil) - So(etsk.State(), ShouldEqual, core.TaskStopped) - }) - Convey("Start disabled task", func() { - tsk, _ := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*100), w, false) - So(tsk, ShouldNotBeNil) - - t := s.tasks.Get(tsk.ID()) - t.state = core.TaskDisabled - - err := s.StartTask(tsk.ID()) - So(err[0].Error(), ShouldResemble, "Task is disabled. Cannot be started.") - So(t.state, ShouldEqual, core.TaskDisabled) - }) }) Convey("Stop()", t, func() { Convey("Should set scheduler state to SchedulerStopped", func() { diff --git a/scheduler/task.go b/scheduler/task.go index 28b672395..ddc476766 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -428,7 +428,7 @@ func (t *taskCollection) remove(task *task) error { t.Lock() defer t.Unlock() if _, ok := t.table[task.id]; ok { - if task.state != core.TaskStopped && task.state != core.TaskDisabled { + if task.state != core.TaskStopped && task.state != core.TaskDisabled && task.state != core.TaskEnded { taskLogger.WithFields(log.Fields{ "_block": "remove", "task id": task.id, diff --git a/scheduler/watcher.go b/scheduler/watcher.go index b74570a0e..734061be7 100644 --- a/scheduler/watcher.go +++ b/scheduler/watcher.go @@ -179,6 +179,29 @@ func (t *taskWatcherCollection) handleTaskStopped(taskID string) { } } +func (t *taskWatcherCollection) handleTaskEnded(taskID string) { + t.mutex.Lock() + defer t.mutex.Unlock() + // no taskID means no watches, early exit + if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 { + // Uncomment this debug line if needed. Otherwise this is too verbose for even debug level. + // watcherLog.WithFields(log.Fields{ + // "task-id": taskID, + // }).Debug("no watchers") + return + } + // Walk all watchers for a task ID + for _, v := range t.coll[taskID] { + // Check if they have a catcher assigned + watcherLog.WithFields(log.Fields{ + "task-id": taskID, + "task-watcher-id": v.id, + }).Debug("calling taskwatcher task ended func") + // Call the catcher + v.handler.CatchTaskEnded() + } +} + func (t *taskWatcherCollection) handleTaskDisabled(taskID string, why string) { t.mutex.Lock() defer t.mutex.Unlock() diff --git a/scheduler/watcher_test.go b/scheduler/watcher_test.go index ecb1ea19b..a475d3881 100644 --- a/scheduler/watcher_test.go +++ b/scheduler/watcher_test.go @@ -53,6 +53,11 @@ func (d *mockCatcher) CatchTaskStopped() { sum++ } +func (d *mockCatcher) CatchTaskEnded() { + d.count++ + sum++ +} + func (d *mockCatcher) CatchTaskStarted() { d.count++ sum++