diff --git a/core/scheduler_event/scheduler_event.go b/core/scheduler_event/scheduler_event.go index 3c478c166..db80e1d93 100644 --- a/core/scheduler_event/scheduler_event.go +++ b/core/scheduler_event/scheduler_event.go @@ -28,6 +28,7 @@ const ( TaskDeleted = "Scheduler.TaskDeleted" TaskStarted = "Scheduler.TaskStarted" TaskStopped = "Scheduler.TaskStopped" + TaskEnded = "Scheduler.TaskEnded" TaskDisabled = "Scheduler.TaskDisabled" MetricCollected = "Scheduler.MetricsCollected" MetricCollectionFailed = "Scheduler.MetricCollectionFailed" @@ -70,6 +71,15 @@ func (e TaskStoppedEvent) Namespace() string { return TaskStopped } +type TaskEndedEvent struct { + TaskID string + Source string +} + +func (e TaskEndedEvent) Namespace() string { + return TaskEnded +} + type TaskDisabledEvent struct { TaskID string Why string diff --git a/core/task.go b/core/task.go index 215f00602..1bce3fc35 100644 --- a/core/task.go +++ b/core/task.go @@ -65,6 +65,7 @@ type TaskWatcherHandler interface { CatchCollection([]Metric) CatchTaskStarted() CatchTaskStopped() + CatchTaskEnded() CatchTaskDisabled(string) } diff --git a/mgmt/rest/client/task.go b/mgmt/rest/client/task.go index 9bcd529b9..fa94f6fe5 100644 --- a/mgmt/rest/client/task.go +++ b/mgmt/rest/client/task.go @@ -157,7 +157,7 @@ func (c *Client) WatchTask(id string) *WatchTasksResult { case rbody.TaskWatchTaskDisabled: r.EventChan <- ste r.Close() - case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: + case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: r.EventChan <- ste } } diff --git a/mgmt/rest/rest_v1_test.go b/mgmt/rest/rest_v1_test.go index 6d01b55d2..d907fc143 100644 --- a/mgmt/rest/rest_v1_test.go +++ b/mgmt/rest/rest_v1_test.go @@ -119,7 +119,7 @@ func watchTask(id string, port int) *watchTaskResult { r.eventChan <- ste.EventType r.close() return - case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: + case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: log.Info(ste.EventType) r.eventChan <- ste.EventType } diff --git a/mgmt/rest/v1/rbody/task.go b/mgmt/rest/v1/rbody/task.go index f4cf4fded..ea1eb1ac2 100644 --- a/mgmt/rest/v1/rbody/task.go +++ b/mgmt/rest/v1/rbody/task.go @@ -36,6 +36,7 @@ const ( ScheduledTaskType = "scheduled_task" ScheduledTaskStartedType = "scheduled_task_started" ScheduledTaskStoppedType = "scheduled_task_stopped" + ScheduledTaskEndedType = "scheduled_task_ended" ScheduledTaskRemovedType = "scheduled_task_removed" ScheduledTaskWatchingEndedType = "schedule_task_watch_ended" ScheduledTaskEnabledType = "scheduled_task_enabled" @@ -46,6 +47,7 @@ const ( TaskWatchTaskDisabled = "task-disabled" TaskWatchTaskStarted = "task-started" TaskWatchTaskStopped = "task-stopped" + TaskWatchTaskEnded = "task-ended" ) type ScheduledTaskListReturned struct { diff --git a/mgmt/rest/v1/task.go b/mgmt/rest/v1/task.go index 90da2182d..54e33d67c 100644 --- a/mgmt/rest/v1/task.go +++ b/mgmt/rest/v1/task.go @@ -149,7 +149,7 @@ func (s *apiV1) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P // The client can decide to stop receiving on the stream on Task Stopped. // We write the event to the buffer fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) - case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped: + case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded: // A disabled task should end the streaming and close the connection fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) // Flush since we are sending nothing new @@ -289,6 +289,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() { } } +func (t *TaskWatchHandler) CatchTaskEnded() { + t.mChan <- rbody.StreamedTaskEvent{ + EventType: rbody.TaskWatchTaskEnded, + } +} + func (t *TaskWatchHandler) CatchTaskDisabled(why string) { t.mChan <- rbody.StreamedTaskEvent{ EventType: rbody.TaskWatchTaskDisabled, diff --git a/mgmt/rest/v2/watch.go b/mgmt/rest/v2/watch.go index 671e0f6a2..c6eb8c98b 100644 --- a/mgmt/rest/v2/watch.go +++ b/mgmt/rest/v2/watch.go @@ -37,6 +37,7 @@ const ( TaskWatchTaskDisabled = "task-disabled" TaskWatchTaskStarted = "task-started" TaskWatchTaskStopped = "task-stopped" + TaskWatchTaskEnded = "task-ended" ) // The amount of time to buffer streaming events before flushing in seconds @@ -95,7 +96,7 @@ func (s *apiV2) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P // The client can decide to stop receiving on the stream on Task Stopped. // We write the event to the buffer fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) - case TaskWatchTaskDisabled, TaskWatchTaskStopped: + case TaskWatchTaskDisabled, TaskWatchTaskStopped, TaskWatchTaskEnded: // A disabled task should end the streaming and close the connection fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) // Flush since we are sending nothing new @@ -165,6 +166,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() { } } +func (t *TaskWatchHandler) CatchTaskEnded() { + t.mChan <- StreamedTaskEvent{ + EventType: TaskWatchTaskEnded, + } +} + func (t *TaskWatchHandler) CatchTaskDisabled(why string) { t.mChan <- StreamedTaskEvent{ EventType: TaskWatchTaskDisabled, diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1288555cf..6c3e7a293 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -64,6 +64,10 @@ var ( ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.") // ErrTaskDisabledNotStoppable - The error message for when a task is disabled and cannot be stopped ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.") + // ErrTaskEndedNotRunnable - The error message for task is disabled and cannot be started + ErrTaskEndedNotRunnable = errors.New("Task is ended. Cannot be restarted.") + // ErrTaskEndedNotStoppable - The error message for when a task is ended and cannot be stopped + ErrTaskEndedNotStoppable = errors.New("Task is ended. Only running tasks can be stopped.") ) type schedulerState int @@ -471,6 +475,16 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError { serror.New(ErrTaskDisabledNotRunnable), } } + + if t.state == core.TaskEnded { + logger.WithFields(log.Fields{ + "task-id": t.ID(), + }).Error("Task is already ended (not resumable)") + return []serror.SnapError{ + serror.New(ErrTaskEndedNotRunnable), + } + } + if t.state == core.TaskFiring || t.state == core.TaskSpinning { logger.WithFields(log.Fields{ "task-id": t.ID(), @@ -559,6 +573,14 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError { return []serror.SnapError{ serror.New(ErrTaskAlreadyStopped), } + case core.TaskEnded: + logger.WithFields(log.Fields{ + "task-id": t.ID(), + "task-state": t.State(), + }).Error("task is already ended") + return []serror.SnapError{ + serror.New(ErrTaskEndedNotStoppable), + } case core.TaskDisabled: logger.WithFields(log.Fields{ "task-id": t.ID(), @@ -768,6 +790,14 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { "task-id": v.TaskID, }).Debug("event received") s.taskWatcherColl.handleTaskStopped(v.TaskID) + case *scheduler_event.TaskEndedEvent: + log.WithFields(log.Fields{ + "_module": "scheduler-events", + "_block": "handle-events", + "event-namespace": e.Namespace(), + "task-id": v.TaskID, + }).Debug("event received") + s.taskWatcherColl.handleTaskEnded(v.TaskID) case *scheduler_event.TaskDisabledEvent: log.WithFields(log.Fields{ "_module": "scheduler-events", diff --git a/scheduler/task.go b/scheduler/task.go index 28b672395..ddc476766 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -428,7 +428,7 @@ func (t *taskCollection) remove(task *task) error { t.Lock() defer t.Unlock() if _, ok := t.table[task.id]; ok { - if task.state != core.TaskStopped && task.state != core.TaskDisabled { + if task.state != core.TaskStopped && task.state != core.TaskDisabled && task.state != core.TaskEnded { taskLogger.WithFields(log.Fields{ "_block": "remove", "task id": task.id, diff --git a/scheduler/watcher.go b/scheduler/watcher.go index b74570a0e..734061be7 100644 --- a/scheduler/watcher.go +++ b/scheduler/watcher.go @@ -179,6 +179,29 @@ func (t *taskWatcherCollection) handleTaskStopped(taskID string) { } } +func (t *taskWatcherCollection) handleTaskEnded(taskID string) { + t.mutex.Lock() + defer t.mutex.Unlock() + // no taskID means no watches, early exit + if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 { + // Uncomment this debug line if needed. Otherwise this is too verbose for even debug level. + // watcherLog.WithFields(log.Fields{ + // "task-id": taskID, + // }).Debug("no watchers") + return + } + // Walk all watchers for a task ID + for _, v := range t.coll[taskID] { + // Check if they have a catcher assigned + watcherLog.WithFields(log.Fields{ + "task-id": taskID, + "task-watcher-id": v.id, + }).Debug("calling taskwatcher task ended func") + // Call the catcher + v.handler.CatchTaskEnded() + } +} + func (t *taskWatcherCollection) handleTaskDisabled(taskID string, why string) { t.mutex.Lock() defer t.mutex.Unlock() diff --git a/scheduler/watcher_test.go b/scheduler/watcher_test.go index ecb1ea19b..a475d3881 100644 --- a/scheduler/watcher_test.go +++ b/scheduler/watcher_test.go @@ -53,6 +53,11 @@ func (d *mockCatcher) CatchTaskStopped() { sum++ } +func (d *mockCatcher) CatchTaskEnded() { + d.count++ + sum++ +} + func (d *mockCatcher) CatchTaskStarted() { d.count++ sum++