Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes #1576 - flakey tests scheduler
Browse files Browse the repository at this point in the history
- Adds event for when plugins are unsubscribed
- Listens for unsubscription event before asserting
  • Loading branch information
jcooklin committed Apr 5, 2017
1 parent 3580ad2 commit 9a32c87
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 11 deletions.
10 changes: 10 additions & 0 deletions core/scheduler_event/scheduler_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

const (
PluginsUnsubscribed = "Scheduler.PluginUnsubscribed"
TaskCreated = "Scheduler.TaskCreated"
TaskDeleted = "Scheduler.TaskDeleted"
TaskStarted = "Scheduler.TaskStarted"
Expand All @@ -34,6 +35,15 @@ const (
MetricCollectionFailed = "Scheduler.MetricCollectionFailed"
)

type PluginsUnsubscribedEvent struct {
TaskID string
Plugins []core.SubscribedPlugin
}

func (e PluginsUnsubscribedEvent) Namespace() string {
return PluginsUnsubscribed
}

type TaskStartedEvent struct {
TaskID string
Source string
Expand Down
18 changes: 9 additions & 9 deletions scheduler/distributed_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,19 @@ func TestDistributedSubscriptions(t *testing.T) {
schTask.RemoteManagers.Add(fmt.Sprintf("127.0.0.1:%v", port1), remoteMockManager)
terrs := s.StartTask(t.ID())
So(terrs, ShouldBeNil)
// wait for the task to stop and plugins to be unsubscribed (or timeout)
select {
case event := <-lse.UnsubscribedPluginEvents:
So(event.TaskID, ShouldEqual, t.ID())
case <-time.After(time.Duration(int64(count)*interval.Nanoseconds()) + 1*time.Second):
}

Convey("So all dependencies should have been subscribed to", func() {
So(localMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0)
So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0)
})
Convey("Task should be ended after one interval", func() {
// wait for the end of the task (or timeout)
select {
case <-lse.Ended:
case <-time.After(time.Duration(int64(count)*interval.Nanoseconds()) + 1*time.Second):
}

So(t.State(), ShouldEqual, core.TaskEnded)

Convey("So all dependencies should have been usubscribed", func() {
So(remoteMockManager.UnsubscribeCallCount, ShouldEqual, remoteMockManager.SubscribeCallCount)
So(localMockManager.UnsubscribeCallCount, ShouldEqual, localMockManager.SubscribeCallCount)
Expand Down Expand Up @@ -370,9 +369,10 @@ func TestDistributedSubscriptions(t *testing.T) {
So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0)
})
Convey("Task should have been ended after reaching the end of window", func() {
// wait for the end of the task (or timeout)
// wait for the task to stop and plugins to be unsubscribed (or timeout)
select {
case <-lse.Ended:
case event := <-lse.UnsubscribedPluginEvents:
So(event.TaskID, ShouldEqual, t.ID())
case <-time.After(stop.Add(interval + 1*time.Second).Sub(start)):
}

Expand Down
8 changes: 6 additions & 2 deletions scheduler/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@ import "github.com/intelsdi-x/gomit"
import "github.com/intelsdi-x/snap/core/scheduler_event"

type listenToSchedulerEvent struct {
Ended chan struct{}
Ended chan struct{}
UnsubscribedPluginEvents chan *scheduler_event.PluginsUnsubscribedEvent
}

// NewListenToSchedulerEvent
func NewListenToSchedulerEvent() *listenToSchedulerEvent {
return &listenToSchedulerEvent{
Ended: make(chan struct{}),
UnsubscribedPluginEvents: make(chan *scheduler_event.PluginsUnsubscribedEvent),
}
}

func (l *listenToSchedulerEvent) HandleGomitEvent(e gomit.Event) {
switch e.Body.(type) {
switch msg := e.Body.(type) {
case *scheduler_event.TaskEndedEvent:
l.Ended <- struct{}{}
case *scheduler_event.PluginsUnsubscribedEvent:
l.UnsubscribedPluginEvents <- msg
}
}
5 changes: 5 additions & 0 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ func (t *task) UnsubscribePlugins() []serror.SnapError {
depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics)
var errs []serror.SnapError
for k := range depGroups {
event := scheduler_event.PluginsUnsubscribedEvent{
TaskID: t.ID(),
Plugins: depGroups[k].subscribedPlugins,
}
defer t.eventEmitter.Emit(event)
mgr, err := t.RemoteManagers.Get(k)
if err != nil {
errs = append(errs, serror.New(err))
Expand Down

0 comments on commit 9a32c87

Please sign in to comment.