-
Notifications
You must be signed in to change notification settings - Fork 295
Fixed tests in distributed task #1577
Fixed tests in distributed task #1577
Conversation
scheduler/scheduler.go
Outdated
@@ -761,9 +761,6 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { | |||
"event-namespace": e.Namespace(), | |||
"task-id": v.TaskID, | |||
}).Debug("event received") | |||
// We need to unsubscribe from deps when a task has ended | |||
task, _ := s.getTask(v.TaskID) | |||
task.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reviewers:
Now this happens before emitting the event - see scheduler/task.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be nice if the HandleGomitEvent function handles the unsubscription of plugins when the task is ended. This way, we are ensuring that these happen in order: change state, emit the TaskEnded event, have the event handler call UnsubscribePlugins(), which seems more logical. Do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will revert that
scheduler/scheduler.go
Outdated
@@ -773,9 +770,6 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { | |||
"task-id": v.TaskID, | |||
"disabled-reason": v.Why, | |||
}).Debug("event received") | |||
// We need to unsubscribe from deps when a task goes disabled | |||
task, _ := s.getTask(v.TaskID) | |||
task.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reviewers:
Now this happens before emitting the event - see scheduler/task.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above.
To keep it consistent, we should have all three different cases (Ended, Disabled and Stopped), follow the same pattern, which is to emit the event first and handle unsubscription of plugins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, reverted
return ErrTaskDisabledOnFailures | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reviewers
I see the reasoning of having checkTaskFailures()
to avoid duplication of the same code, but the error log loses its value because this function might be called everywhere, so "_block" does not point to the exact place. So, I changed it to the following:
if t.stopOnFailure >= 0 && consecutiveFailures >= t.stopOnFailure {
taskLogger.WithFields(log.Fields{
"_block": "stream",
"task-id": t.id,
"task-name": t.name,
"consecutive failures": consecutiveFailures,
"error": t.lastFailureMessage,
}).Error(ErrTaskDisabledOnFailures)
// disable the task
t.disable(t.lastFailureMessage)
return
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iza, probably "_block" input can be passed in so that code may be reused? Of course, it's everywhere already. Probably it's no big deal for reuse 1 0r 2. If we can refactor to standardized Snap's logs in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@candysmurf - yes I fully agree. There is still a way to make some small optimization is this area -
but if we decide to do such thing it should be done across all files I suppose. So, is it ok for you as it is now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totally, 👍
35e8243
to
5c719dd
Compare
scheduler/scheduler.go
Outdated
@@ -761,9 +761,6 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { | |||
"event-namespace": e.Namespace(), | |||
"task-id": v.TaskID, | |||
}).Debug("event received") | |||
// We need to unsubscribe from deps when a task has ended | |||
task, _ := s.getTask(v.TaskID) | |||
task.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be nice if the HandleGomitEvent function handles the unsubscription of plugins when the task is ended. This way, we are ensuring that these happen in order: change state, emit the TaskEnded event, have the event handler call UnsubscribePlugins(), which seems more logical. Do you agree?
scheduler/scheduler.go
Outdated
@@ -773,9 +770,6 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { | |||
"task-id": v.TaskID, | |||
"disabled-reason": v.Why, | |||
}).Debug("event received") | |||
// We need to unsubscribe from deps when a task goes disabled | |||
task, _ := s.getTask(v.TaskID) | |||
task.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above.
To keep it consistent, we should have all three different cases (Ended, Disabled and Stopped), follow the same pattern, which is to emit the event first and handle unsubscription of plugins.
scheduler/task.go
Outdated
defer t.eventEmitter.Emit(event) | ||
|
||
// We need to unsubscribe from deps when a task has ended | ||
t.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we catch errors here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we allow ended task be restarted? If the answer is yes, do we subscribe plugins in the restart? If no impact, adding error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
scheduler/task.go
Outdated
defer t.eventEmitter.Emit(event) | ||
|
||
// We need to unsubscribe from deps when a task has disabled | ||
t.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should catch the errors here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IzabellaRaulin, thanks for working on this one. The fundamental question is if this change will change the Snap task flow of subscribe/unsubscribe plugins. Otherwise, I like the change.
return ErrTaskDisabledOnFailures | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iza, probably "_block" input can be passed in so that code may be reused? Of course, it's everywhere already. Probably it's no big deal for reuse 1 0r 2. If we can refactor to standardized Snap's logs in the future.
scheduler/task.go
Outdated
defer t.eventEmitter.Emit(event) | ||
|
||
// We need to unsubscribe from deps when a task has disabled | ||
t.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iza, sounds a good change. Only concern is that the disabled task can be enabled. Do we need to subscribe it before enabling it? Could you double check. If no impact, adding error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answer to
Only concern is that the disabled task can be enabled. Do we need to subscribe it before enabling it?
The disabled task can be enabled and yes, we need to subscribe it before enabling it (exactly it is a part of enabling procedure). To be clear this PR does not impact on such behavior.
adding error handling
There was a discussion about what should happen first - emitting an event or unsubscribe deps. The decision is to keep it as it is what means emitting event as first, and then do unsubscribing in HandleGomitEvent. I will revert my changes corresponding to this aspect.
scheduler/task.go
Outdated
defer t.eventEmitter.Emit(event) | ||
|
||
// We need to unsubscribe from deps when a task has ended | ||
t.UnsubscribePlugins() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we allow ended task be restarted? If the answer is yes, do we subscribe plugins in the restart? If no impact, adding error handling.
@IzabellaRaulin, tried to test your fix on my laptop. It didn't fix the issue. Here are errors for medium-test.
|
- Adds event for when plugins are unsubscribed - Listens for unsubscription event before asserting
491c517
to
c96203d
Compare
Update done - please see also updated PR description. All identified flaky tests related with schedule fixed. cc: @intelsdi-x/snap-maintainers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #1576
The following failures are fixed in
scheduler/distributed_task_test.go
:The following failures are fixed in
pkg/schedule/windowed_schedule_medium_test.go
:Summary of changes:
added PluginsUnsubscribed event and listener to this event in distributed_workflow_test.go -> react on incoming PluginsUnsubscribed event fixes the flaky tests in
distributed_workflow_tests.go
added methods to proceed task disabling -> it's done couple times in different places, it's rather improving code readability, do not impact on how it behaves
increase an interval to be sure that calculated stop_timestamp (equals to interval multiplied by count) does not pass before test starts -> this fixes tests in windowed_schedule_medium_test.go
Testing done:
@intelsdi-x/snap-maintainers