Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixed #1520 - Ended task can be removed
Browse files Browse the repository at this point in the history
  • Loading branch information
IzabellaRaulin committed Feb 27, 2017
1 parent b7c4674 commit a7819e0
Show file tree
Hide file tree
Showing 14 changed files with 446 additions and 56 deletions.
10 changes: 10 additions & 0 deletions core/scheduler_event/scheduler_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
TaskDeleted = "Scheduler.TaskDeleted"
TaskStarted = "Scheduler.TaskStarted"
TaskStopped = "Scheduler.TaskStopped"
TaskEnded = "Scheduler.TaskEnded"
TaskDisabled = "Scheduler.TaskDisabled"
MetricCollected = "Scheduler.MetricsCollected"
MetricCollectionFailed = "Scheduler.MetricCollectionFailed"
Expand Down Expand Up @@ -70,6 +71,15 @@ func (e TaskStoppedEvent) Namespace() string {
return TaskStopped
}

type TaskEndedEvent struct {
TaskID string
Source string
}

func (e TaskEndedEvent) Namespace() string {
return TaskEnded
}

type TaskDisabledEvent struct {
TaskID string
Why string
Expand Down
3 changes: 2 additions & 1 deletion core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
TaskStopped: "Stopped", // stopped but resumable
TaskSpinning: "Running", // running
TaskFiring: "Running", // running (firing can happen so briefly we don't want to try and render it as a string state)
TaskEnded: "Ended", // ended, not resumable because the schedule will not fire again
TaskEnded: "Ended", // ended, but resumable if the schedule is still valid and might fire again
TaskStopping: "Stopping", // channel has been closed, wait for TaskStopped state
}
)
Expand All @@ -65,6 +65,7 @@ type TaskWatcherHandler interface {
CatchCollection([]Metric)
CatchTaskStarted()
CatchTaskStopped()
CatchTaskEnded()
CatchTaskDisabled(string)
}

Expand Down
5 changes: 2 additions & 3 deletions docs/TASKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ A task can be in the following states:
- **running:** a running task
- **stopped:** a task that is not running
- **disabled:** a task in a state not allowed to start. This happens when the task produces consecutive errors. A disabled task must be re-enabled before it can be started again.
- **ended:** a task for which the schedule is ended. At present this happens only for windowed schedule with defined _stop_timestamp_. An ended task is resumable if the schedule is still valid.


![newtaskstatediagram2](https://cloud.githubusercontent.com/assets/21182867/19282545/a4179520-8fa3-11e6-9056-4fc3aa610983.png)

![statediagram](https://cloud.githubusercontent.com/assets/11335874/23362447/0f0b9f74-fcf6-11e6-93d7-889a7ccdc45f.png)

How To | Command
----------------------------------------|------------------------
Expand Down
2 changes: 1 addition & 1 deletion mgmt/rest/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *Client) WatchTask(id string) *WatchTasksResult {
case rbody.TaskWatchTaskDisabled:
r.EventChan <- ste
r.Close()
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r.EventChan <- ste
}
}
Expand Down
2 changes: 1 addition & 1 deletion mgmt/rest/rest_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func watchTask(id string, port int) *watchTaskResult {
r.eventChan <- ste.EventType
r.close()
return
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
log.Info(ste.EventType)
r.eventChan <- ste.EventType
}
Expand Down
2 changes: 2 additions & 0 deletions mgmt/rest/v1/rbody/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
ScheduledTaskType = "scheduled_task"
ScheduledTaskStartedType = "scheduled_task_started"
ScheduledTaskStoppedType = "scheduled_task_stopped"
ScheduledTaskEndedType = "scheduled_task_ended"
ScheduledTaskRemovedType = "scheduled_task_removed"
ScheduledTaskWatchingEndedType = "schedule_task_watch_ended"
ScheduledTaskEnabledType = "scheduled_task_enabled"
Expand All @@ -46,6 +47,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

type ScheduledTaskListReturned struct {
Expand Down
8 changes: 7 additions & 1 deletion mgmt/rest/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *apiV1) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped:
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -289,6 +289,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskDisabled,
Expand Down
9 changes: 8 additions & 1 deletion mgmt/rest/v2/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

// The amount of time to buffer streaming events before flushing in seconds
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *apiV2) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case TaskWatchTaskDisabled, TaskWatchTaskStopped:
case TaskWatchTaskDisabled, TaskWatchTaskStopped, TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -165,6 +166,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskDisabled,
Expand Down
29 changes: 29 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.")
// ErrTaskDisabledNotStoppable - The error message for when a task is disabled and cannot be stopped
ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.")
// ErrTaskEndedNotStoppable - The error message for when a task is ended and cannot be stopped
ErrTaskEndedNotStoppable = errors.New("Task is ended. Only running tasks can be stopped.")
)

type schedulerState int
Expand Down Expand Up @@ -471,6 +473,7 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError {
serror.New(ErrTaskDisabledNotRunnable),
}
}

if t.state == core.TaskFiring || t.state == core.TaskSpinning {
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand All @@ -481,6 +484,16 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError {
}
}

// Ensure the schedule is valid at this point and time.
if err := t.schedule.Validate(); err != nil {
errs := []serror.SnapError{
serror.New(err),
}
f := buildErrorsLog(errs, logger)
f.Error("schedule passed not valid")
return errs
}

// Group dependencies by the node they live on
// and subscribe to them.
depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics)
Expand Down Expand Up @@ -559,6 +572,14 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError {
return []serror.SnapError{
serror.New(ErrTaskAlreadyStopped),
}
case core.TaskEnded:
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Error("task is already ended")
return []serror.SnapError{
serror.New(ErrTaskEndedNotStoppable),
}
case core.TaskDisabled:
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand Down Expand Up @@ -768,6 +789,14 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) {
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskStopped(v.TaskID)
case *scheduler_event.TaskEndedEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
"_block": "handle-events",
"event-namespace": e.Namespace(),
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskEnded(v.TaskID)
case *scheduler_event.TaskDisabledEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
Expand Down
Loading

0 comments on commit a7819e0

Please sign in to comment.