Skip to content

Commit

Permalink
Fixed intelsdi-x#1520 - Ended task can be removed
Browse files Browse the repository at this point in the history
  • Loading branch information
IzabellaRaulin committed Feb 17, 2017
1 parent b7c4674 commit fd032b8
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 5 deletions.
10 changes: 10 additions & 0 deletions core/scheduler_event/scheduler_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
TaskDeleted = "Scheduler.TaskDeleted"
TaskStarted = "Scheduler.TaskStarted"
TaskStopped = "Scheduler.TaskStopped"
TaskEnded = "Scheduler.TaskEnded"
TaskDisabled = "Scheduler.TaskDisabled"
MetricCollected = "Scheduler.MetricsCollected"
MetricCollectionFailed = "Scheduler.MetricCollectionFailed"
Expand Down Expand Up @@ -70,6 +71,15 @@ func (e TaskStoppedEvent) Namespace() string {
return TaskStopped
}

type TaskEndedEvent struct {
TaskID string
Source string
}

func (e TaskEndedEvent) Namespace() string {
return TaskEnded
}

type TaskDisabledEvent struct {
TaskID string
Why string
Expand Down
1 change: 1 addition & 0 deletions core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type TaskWatcherHandler interface {
CatchCollection([]Metric)
CatchTaskStarted()
CatchTaskStopped()
CatchTaskEnded()
CatchTaskDisabled(string)
}

Expand Down
2 changes: 1 addition & 1 deletion mgmt/rest/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *Client) WatchTask(id string) *WatchTasksResult {
case rbody.TaskWatchTaskDisabled:
r.EventChan <- ste
r.Close()
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r.EventChan <- ste
}
}
Expand Down
2 changes: 1 addition & 1 deletion mgmt/rest/rest_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func watchTask(id string, port int) *watchTaskResult {
r.eventChan <- ste.EventType
r.close()
return
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
log.Info(ste.EventType)
r.eventChan <- ste.EventType
}
Expand Down
2 changes: 2 additions & 0 deletions mgmt/rest/v1/rbody/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
ScheduledTaskType = "scheduled_task"
ScheduledTaskStartedType = "scheduled_task_started"
ScheduledTaskStoppedType = "scheduled_task_stopped"
ScheduledTaskEndedType = "scheduled_task_ended"
ScheduledTaskRemovedType = "scheduled_task_removed"
ScheduledTaskWatchingEndedType = "schedule_task_watch_ended"
ScheduledTaskEnabledType = "scheduled_task_enabled"
Expand All @@ -46,6 +47,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

type ScheduledTaskListReturned struct {
Expand Down
8 changes: 7 additions & 1 deletion mgmt/rest/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *apiV1) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped:
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -289,6 +289,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskDisabled,
Expand Down
9 changes: 8 additions & 1 deletion mgmt/rest/v2/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

// The amount of time to buffer streaming events before flushing in seconds
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *apiV2) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case TaskWatchTaskDisabled, TaskWatchTaskStopped:
case TaskWatchTaskDisabled, TaskWatchTaskStopped, TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -165,6 +166,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskDisabled,
Expand Down
30 changes: 30 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ var (
ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.")
// ErrTaskDisabledNotStoppable - The error message for when a task is disabled and cannot be stopped
ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.")
// ErrTaskEndedNotRunnable - The error message for task is disabled and cannot be started
ErrTaskEndedNotRunnable = errors.New("Task is ended. Cannot be restarted.")
// ErrTaskEndedNotStoppable - The error message for when a task is ended and cannot be stopped
ErrTaskEndedNotStoppable = errors.New("Task is ended. Only running tasks can be stopped.")
)

type schedulerState int
Expand Down Expand Up @@ -471,6 +475,16 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError {
serror.New(ErrTaskDisabledNotRunnable),
}
}

if t.state == core.TaskEnded {
logger.WithFields(log.Fields{
"task-id": t.ID(),
}).Error("Task is already ended (not resumable)")
return []serror.SnapError{
serror.New(ErrTaskEndedNotRunnable),
}
}

if t.state == core.TaskFiring || t.state == core.TaskSpinning {
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand Down Expand Up @@ -559,6 +573,14 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError {
return []serror.SnapError{
serror.New(ErrTaskAlreadyStopped),
}
case core.TaskEnded:
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Error("task is already ended")
return []serror.SnapError{
serror.New(ErrTaskEndedNotStoppable),
}
case core.TaskDisabled:
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand Down Expand Up @@ -768,6 +790,14 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) {
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskStopped(v.TaskID)
case *scheduler_event.TaskEndedEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
"_block": "handle-events",
"event-namespace": e.Namespace(),
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskEnded(v.TaskID)
case *scheduler_event.TaskDisabledEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
Expand Down
2 changes: 1 addition & 1 deletion scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (t *taskCollection) remove(task *task) error {
t.Lock()
defer t.Unlock()
if _, ok := t.table[task.id]; ok {
if task.state != core.TaskStopped && task.state != core.TaskDisabled {
if task.state != core.TaskStopped && task.state != core.TaskDisabled && task.state != core.TaskEnded {
taskLogger.WithFields(log.Fields{
"_block": "remove",
"task id": task.id,
Expand Down
23 changes: 23 additions & 0 deletions scheduler/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,29 @@ func (t *taskWatcherCollection) handleTaskStopped(taskID string) {
}
}

func (t *taskWatcherCollection) handleTaskEnded(taskID string) {
t.mutex.Lock()
defer t.mutex.Unlock()
// no taskID means no watches, early exit
if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 {
// Uncomment this debug line if needed. Otherwise this is too verbose for even debug level.
// watcherLog.WithFields(log.Fields{
// "task-id": taskID,
// }).Debug("no watchers")
return
}
// Walk all watchers for a task ID
for _, v := range t.coll[taskID] {
// Check if they have a catcher assigned
watcherLog.WithFields(log.Fields{
"task-id": taskID,
"task-watcher-id": v.id,
}).Debug("calling taskwatcher task ended func")
// Call the catcher
v.handler.CatchTaskEnded()
}
}

func (t *taskWatcherCollection) handleTaskDisabled(taskID string, why string) {
t.mutex.Lock()
defer t.mutex.Unlock()
Expand Down
5 changes: 5 additions & 0 deletions scheduler/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (d *mockCatcher) CatchTaskStopped() {
sum++
}

func (d *mockCatcher) CatchTaskEnded() {
d.count++
sum++
}

func (d *mockCatcher) CatchTaskStarted() {
d.count++
sum++
Expand Down

0 comments on commit fd032b8

Please sign in to comment.