Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixed flaky test in windowed schedule and added method to proceed tas…
Browse files Browse the repository at this point in the history
…k disabling
  • Loading branch information
IzabellaRaulin committed Apr 5, 2017
1 parent 9a32c87 commit c96203d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 45 deletions.
5 changes: 1 addition & 4 deletions pkg/schedule/windowed_schedule_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestWindowedSchedule(t *testing.T) {
}) // the end of `Nominal windowed Schedule`

Convey("Windowed Schedule with determined the count of runs", t, func() {
interval := time.Millisecond * 10
interval := time.Second

Convey("expected to start immediately", func() {
Convey("single run", func() {
Expand Down Expand Up @@ -523,8 +523,6 @@ func TestWindowedSchedule(t *testing.T) {
Convey("started in the past", func() {
startWait := time.Millisecond * -200
count := uint(1)
interval := time.Millisecond * 10

start := time.Now().Add(startWait)
w := NewWindowedSchedule(
interval,
Expand Down Expand Up @@ -564,7 +562,6 @@ func TestWindowedSchedule(t *testing.T) {
Convey("with determined stop", func() {
startWait := time.Millisecond * 50
windowSize := time.Millisecond * 200
interval := time.Millisecond * 10
count := uint(1)

start := time.Now().Add(startWait)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/managers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *managers) Add(key string, val managesMetrics) {

// Returns the managesMetric instance that maps to given
// string. If an empty string is given, will instead return
// the local instance passed in on initializiation.
// the local instance passed in on initialization.
func (m *managers) Get(key string) (managesMetrics, error) {
if key == "" {
return m.local, nil
Expand Down
7 changes: 7 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,13 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) {
task, _ := s.getTask(v.TaskID)
task.UnsubscribePlugins()
s.taskWatcherColl.handleTaskDisabled(v.TaskID, v.Why)
case *scheduler_event.PluginsUnsubscribedEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
"_block": "handle-events",
"event-namespace": e.Namespace(),
"task-id": v.TaskID,
}).Debug("event received")
default:
log.WithFields(log.Fields{
"_module": "scheduler-events",
Expand Down
90 changes: 50 additions & 40 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,17 @@ func (t *task) stream() {
t.maxMetricsBuffer)
if err != nil {
consecutiveFailures++
e := checkTaskFailures(t, consecutiveFailures)
if e != nil {
// check task failures
if t.stopOnFailure >= 0 && consecutiveFailures >= t.stopOnFailure {
taskLogger.WithFields(log.Fields{
"_block": "stream",
"task-id": t.id,
"task-name": t.name,
"consecutive failures": consecutiveFailures,
"error": t.lastFailureMessage,
}).Error(ErrTaskDisabledOnFailures)
// disable the task
t.disable(t.lastFailureMessage)
return
}
// If we are unsuccessful at setting up the stream
Expand Down Expand Up @@ -321,37 +330,24 @@ func (t *task) stream() {
time.Sleep(resetTime)
done = true
}
e := checkTaskFailures(t, consecutiveFailures)
if e != nil {
// check task failures
if t.stopOnFailure >= 0 && consecutiveFailures >= t.stopOnFailure {
taskLogger.WithFields(log.Fields{
"_block": "stream",
"task-id": t.id,
"task-name": t.name,
"consecutive failures": consecutiveFailures,
"error": t.lastFailureMessage,
}).Error(ErrTaskDisabledOnFailures)
// disable the task
t.disable(t.lastFailureMessage)
return
}
}
}
}
}

func checkTaskFailures(t *task, consecutiveFailures int) error {
if t.stopOnFailure >= 0 && consecutiveFailures >= t.stopOnFailure {
taskLogger.WithFields(log.Fields{
"_block": "spin",
"task-id": t.id,
"task-name": t.name,
"consecutive failures": consecutiveFailures,
"error": t.lastFailureMessage,
}).Error(ErrTaskDisabledOnFailures)
// You must lock on state change for tasks
t.Lock()
t.state = core.TaskDisabled
t.Unlock()
// Send task disabled event
event := new(scheduler_event.TaskDisabledEvent)
event.TaskID = t.id
event.Why = fmt.Sprintf("Task disabled with error: %s", t.lastFailureMessage)
defer t.eventEmitter.Emit(event)
return ErrTaskDisabledOnFailures
}
return nil
}
func (t *task) Stop() {
t.Lock()
defer t.Unlock()
Expand All @@ -366,7 +362,7 @@ func (t *task) UnsubscribePlugins() []serror.SnapError {
depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics)
var errs []serror.SnapError
for k := range depGroups {
event := scheduler_event.PluginsUnsubscribedEvent{
event := &scheduler_event.PluginsUnsubscribedEvent{
TaskID: t.ID(),
Plugins: depGroups[k].subscribedPlugins,
}
Expand All @@ -381,6 +377,14 @@ func (t *task) UnsubscribePlugins() []serror.SnapError {
}
}
}
for _, err := range errs {
taskLogger.WithFields(log.Fields{
"_block": "UnsubscribePlugins",
"task-id": t.id,
"task-name": t.name,
"task-state": t.state,
}).Error(err)
}
return errs
}

Expand Down Expand Up @@ -489,15 +493,9 @@ func (t *task) spin() {
"consecutive failures": consecutiveFailures,
"error": t.lastFailureMessage,
}).Error(ErrTaskDisabledOnFailures)
// You must lock on state change for tasks
t.Lock()
t.state = core.TaskDisabled
t.Unlock()
// Send task disabled event
event := new(scheduler_event.TaskDisabledEvent)
event.TaskID = t.id
event.Why = fmt.Sprintf("Task disabled with error: %s", t.lastFailureMessage)
defer t.eventEmitter.Emit(event)

// disable the task
t.disable(t.lastFailureMessage)
return
}

Expand All @@ -515,10 +513,9 @@ func (t *task) spin() {

// Schedule has errored
case schedule.Error:
// You must lock task to change state
t.Lock()
t.state = core.TaskDisabled
t.Unlock()
// disable the task
failureMessage := sr.Error().Error()
t.disable(failureMessage)
return //spin

}
Expand All @@ -542,6 +539,19 @@ func (t *task) fire() {
t.state = core.TaskSpinning
}

// disable proceeds disabling a task which consists of changing task state to disabled and emitting an appropriate event
func (t *task) disable(failureMsg string) {
t.Lock()
t.state = core.TaskDisabled
t.Unlock()

// Send task disabled event
event := new(scheduler_event.TaskDisabledEvent)
event.TaskID = t.id
event.Why = fmt.Sprintf("Task disabled with error: %s", failureMsg)
defer t.eventEmitter.Emit(event)
}

func (t *task) waitForSchedule() {
select {
case <-t.killChan:
Expand Down

0 comments on commit c96203d

Please sign in to comment.