Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Fixes #820 - Task fails when metrics come from different versions of … #822

Merged
merged 1 commit into from
Apr 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ func GetDefaultConfig() *Config {
}
}

// NewPluginsConfig returns a map of *pluginConfigItems where the key is the plugin name.
func NewPluginsConfig() map[string]*pluginConfigItem {
return map[string]*pluginConfigItem{}
}

// NewPluginConfigItem returns a *pluginConfigItem.
func NewPluginConfigItem() *pluginConfigItem {
return &pluginConfigItem{
cdata.NewNode(),
map[int]*cdata.ConfigDataNode{},
}
}

func newPluginTypeConfigItem() *pluginTypeConfigItem {
return &pluginTypeConfigItem{
make(map[string]*pluginConfigItem),
Expand Down
14 changes: 4 additions & 10 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
// types.
colPlugins := make(map[string]*loadedPlugin)
for _, mt := range mts {
// If the version provided is <1 we will get the latest
// plugin for the given metric.
m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
Expand All @@ -585,16 +587,8 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
}))
continue
}
// if the metric subscription is to version -1, we need to carry
// that forward in the subscription.
if mt.Version() < 1 {
// make a copy of the loadedPlugin and overwrite the version.
npl := *m.Plugin
npl.Meta.Version = -1
colPlugins[npl.Key()] = &npl
} else {
colPlugins[m.Plugin.Key()] = m.Plugin
}

colPlugins[m.Plugin.Key()] = m.Plugin
}
if len(serrs) > 0 {
return plugins, serrs
Expand Down
77 changes: 55 additions & 22 deletions scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/scheduler_event"
"github.com/intelsdi-x/snap/pkg/promise"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"
Expand All @@ -43,9 +45,12 @@ import (

var (
SnapPath = os.Getenv("SNAP_PATH")
snap_collector_mock1_path = path.Join(SnapPath, "plugin", "snap-collector-mock1")
snap_collector_mock2_path = path.Join(SnapPath, "plugin", "snap-collector-mock2")
snap_processor_passthru_path = path.Join(SnapPath, "plugin", "snap-processor-passthru")
snap_publisher_file_path = path.Join(SnapPath, "plugin", "snap-publisher-file")

metricsToCollect = 3
)

type MockMetricType struct {
Expand All @@ -61,22 +66,35 @@ type mockPluginEvent struct {
EventNamespace string
}

type listenToPluginEvent struct {
plugin *mockPluginEvent
done chan struct{}
type eventListener struct {
plugin *mockPluginEvent
metricCollectCount int
MetricCollectFailureCount int
metricsCollectionDone chan bool
done chan struct{}
}

func newListenToPluginEvent() *listenToPluginEvent {
return &listenToPluginEvent{
func newEventListener() *eventListener {
return &eventListener{
done: make(chan struct{}),
}
}

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
func (l *eventListener) HandleGomitEvent(e gomit.Event) {
go func() {
switch e.Body.(type) {
case *control_event.LoadPluginEvent:
l.done <- struct{}{}
case *scheduler_event.MetricCollectedEvent:
if l.metricCollectCount > metricsToCollect {
l.done <- struct{}{}
}
l.metricCollectCount++
case *scheduler_event.MetricCollectionFailedEvent:
if l.MetricCollectFailureCount > 1 {
l.done <- struct{}{}
}
l.MetricCollectFailureCount++
default:
}
}()
Expand Down Expand Up @@ -105,8 +123,13 @@ func (m MockMetricType) Data() interface{} {
func TestCollectPublishWorkflow(t *testing.T) {
log.SetLevel(log.FatalLevel)
Convey("Given a started plugin control", t, func() {

c := control.New(control.GetDefaultConfig())
cfg := control.GetDefaultConfig()
cfg.Plugins.Collector.Plugins = control.NewPluginsConfig()
cfg.Plugins.Collector.Plugins["mock"] = control.NewPluginConfigItem()
cfg.Plugins.Collector.Plugins["mock"].Versions = map[int]*cdata.ConfigDataNode{}
cfg.Plugins.Collector.Plugins["mock"].Versions[1] = cdata.NewNode()
cfg.Plugins.Collector.Plugins["mock"].Versions[1].AddItem("test", ctypes.ConfigValueBool{Value: true})
c := control.New(cfg)
c.Start()
s := New(GetDefaultConfig())
s.SetMetricManager(c)
Expand All @@ -123,35 +146,43 @@ func TestCollectPublishWorkflow(t *testing.T) {
So(err, ShouldBeNil)
_, err = c.Load(rp3)
So(err, ShouldBeNil)
time.Sleep(100 * time.Millisecond)
rp4, err := core.NewRequestedPlugin(snap_collector_mock1_path)
So(err, ShouldBeNil)
_, err = c.Load(rp4)
So(err, ShouldBeNil)

metrics, err2 := c.MetricCatalog()
So(err2, ShouldBeNil)
So(metrics, ShouldNotBeEmpty)

// The following two metrics will result in both versions (1 and 2) of
// the mock plugin to be used. '/intel/mock/test' will be coming from
// mock version 1 due to the global config above.
w := wmap.NewWorkflowMap()
w.CollectNode.AddMetric("/intel/mock/foo", 2)
w.CollectNode.AddMetric("/intel/mock/foo", -1)
w.CollectNode.AddMetric("/intel/mock/test", -1)
w.CollectNode.AddConfigItem("/intel/mock/foo", "password", "secret")

pu := wmap.NewPublishNode("file", 3)
pu.AddConfigItem("file", "/tmp/snap-TestCollectPublishWorkflow.out")

pr := wmap.NewProcessNode("passthru", 1)
time.Sleep(100 * time.Millisecond)

pr.Add(pu)
w.CollectNode.Add(pr)

Convey("Start scheduler", func() {
err := s.Start()
So(err, ShouldBeNil)
Convey("Create task", func() {
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false)
Convey("Create and start task", func() {
el := newEventListener()
s.RegisterEventHandler("TestCollectPublishWorkflow", el)
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true)
So(err.Errors(), ShouldBeEmpty)
So(t, ShouldNotBeNil)
t.(*task).Spin()
time.Sleep(3 * time.Second)

<-el.done
So(t.LastFailureMessage(), ShouldBeEmpty)
So(t.FailedCount(), ShouldEqual, 0)
So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect)
})
})
})
Expand All @@ -167,7 +198,7 @@ func TestProcessChainingWorkflow(t *testing.T) {
s := New(GetDefaultConfig())
s.SetMetricManager(c)
Convey("create a workflow with chained processors", func() {
lpe := newListenToPluginEvent()
lpe := newEventListener()
c.RegisterEventHandler("Control.PluginLoaded", lpe)
rp, err := core.NewRequestedPlugin(snap_collector_mock2_path)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -208,12 +239,14 @@ func TestProcessChainingWorkflow(t *testing.T) {
err := s.Start()
So(err, ShouldBeNil)
Convey("Create task", func() {
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false)
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true)
s.RegisterEventHandler("TestProcessChainingWorkflow", lpe)
So(err.Errors(), ShouldBeEmpty)
So(t, ShouldNotBeNil)
t.(*task).Spin()
time.Sleep(3 * time.Second)

<-lpe.done
So(t.LastFailureMessage(), ShouldBeEmpty)
So(t.FailedCount(), ShouldEqual, 0)
So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect)
})
})
})
Expand Down