diff --git a/control/config.go b/control/config.go index 57d141890..6b36a3fe4 100644 --- a/control/config.go +++ b/control/config.go @@ -84,6 +84,19 @@ func GetDefaultConfig() *Config { } } +// NewPluginsConfig returns a map of *pluginConfigItems where the key is the plugin name. +func NewPluginsConfig() map[string]*pluginConfigItem { + return map[string]*pluginConfigItem{} +} + +// NewPluginConfigItem returns a *pluginConfigItem. +func NewPluginConfigItem() *pluginConfigItem { + return &pluginConfigItem{ + cdata.NewNode(), + map[int]*cdata.ConfigDataNode{}, + } +} + func newPluginTypeConfigItem() *pluginTypeConfigItem { return &pluginTypeConfigItem{ make(map[string]*pluginConfigItem), diff --git a/control/control.go b/control/control.go index 4ed6566a1..f60b58c15 100644 --- a/control/control.go +++ b/control/control.go @@ -577,6 +577,8 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se // types. colPlugins := make(map[string]*loadedPlugin) for _, mt := range mts { + // If the version provided is <1 we will get the latest + // plugin for the given metric. m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version()) if err != nil { serrs = append(serrs, serror.New(err, map[string]interface{}{ @@ -585,16 +587,8 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se })) continue } - // if the metric subscription is to version -1, we need to carry - // that forward in the subscription. - if mt.Version() < 1 { - // make a copy of the loadedPlugin and overwrite the version. - npl := *m.Plugin - npl.Meta.Version = -1 - colPlugins[npl.Key()] = &npl - } else { - colPlugins[m.Plugin.Key()] = m.Plugin - } + + colPlugins[m.Plugin.Key()] = m.Plugin } if len(serrs) > 0 { return plugins, serrs diff --git a/scheduler/workflow_test.go b/scheduler/workflow_test.go index 8e59f7141..0d222a3f1 100644 --- a/scheduler/workflow_test.go +++ b/scheduler/workflow_test.go @@ -33,6 +33,8 @@ import ( "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" "github.com/intelsdi-x/snap/core/control_event" + "github.com/intelsdi-x/snap/core/ctypes" + "github.com/intelsdi-x/snap/core/scheduler_event" "github.com/intelsdi-x/snap/pkg/promise" "github.com/intelsdi-x/snap/pkg/schedule" "github.com/intelsdi-x/snap/scheduler/wmap" @@ -43,9 +45,12 @@ import ( var ( SnapPath = os.Getenv("SNAP_PATH") + snap_collector_mock1_path = path.Join(SnapPath, "plugin", "snap-collector-mock1") snap_collector_mock2_path = path.Join(SnapPath, "plugin", "snap-collector-mock2") snap_processor_passthru_path = path.Join(SnapPath, "plugin", "snap-processor-passthru") snap_publisher_file_path = path.Join(SnapPath, "plugin", "snap-publisher-file") + + metricsToCollect = 3 ) type MockMetricType struct { @@ -61,22 +66,35 @@ type mockPluginEvent struct { EventNamespace string } -type listenToPluginEvent struct { - plugin *mockPluginEvent - done chan struct{} +type eventListener struct { + plugin *mockPluginEvent + metricCollectCount int + MetricCollectFailureCount int + metricsCollectionDone chan bool + done chan struct{} } -func newListenToPluginEvent() *listenToPluginEvent { - return &listenToPluginEvent{ +func newEventListener() *eventListener { + return &eventListener{ done: make(chan struct{}), } } -func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) { +func (l *eventListener) HandleGomitEvent(e gomit.Event) { go func() { switch e.Body.(type) { case *control_event.LoadPluginEvent: l.done <- struct{}{} + case *scheduler_event.MetricCollectedEvent: + if l.metricCollectCount > metricsToCollect { + l.done <- struct{}{} + } + l.metricCollectCount++ + case *scheduler_event.MetricCollectionFailedEvent: + if l.MetricCollectFailureCount > 1 { + l.done <- struct{}{} + } + l.MetricCollectFailureCount++ default: } }() @@ -105,8 +123,13 @@ func (m MockMetricType) Data() interface{} { func TestCollectPublishWorkflow(t *testing.T) { log.SetLevel(log.FatalLevel) Convey("Given a started plugin control", t, func() { - - c := control.New(control.GetDefaultConfig()) + cfg := control.GetDefaultConfig() + cfg.Plugins.Collector.Plugins = control.NewPluginsConfig() + cfg.Plugins.Collector.Plugins["mock"] = control.NewPluginConfigItem() + cfg.Plugins.Collector.Plugins["mock"].Versions = map[int]*cdata.ConfigDataNode{} + cfg.Plugins.Collector.Plugins["mock"].Versions[1] = cdata.NewNode() + cfg.Plugins.Collector.Plugins["mock"].Versions[1].AddItem("test", ctypes.ConfigValueBool{Value: true}) + c := control.New(cfg) c.Start() s := New(GetDefaultConfig()) s.SetMetricManager(c) @@ -123,21 +146,26 @@ func TestCollectPublishWorkflow(t *testing.T) { So(err, ShouldBeNil) _, err = c.Load(rp3) So(err, ShouldBeNil) - time.Sleep(100 * time.Millisecond) + rp4, err := core.NewRequestedPlugin(snap_collector_mock1_path) + So(err, ShouldBeNil) + _, err = c.Load(rp4) + So(err, ShouldBeNil) metrics, err2 := c.MetricCatalog() So(err2, ShouldBeNil) So(metrics, ShouldNotBeEmpty) + // The following two metrics will result in both versions (1 and 2) of + // the mock plugin to be used. '/intel/mock/test' will be coming from + // mock version 1 due to the global config above. w := wmap.NewWorkflowMap() - w.CollectNode.AddMetric("/intel/mock/foo", 2) + w.CollectNode.AddMetric("/intel/mock/foo", -1) + w.CollectNode.AddMetric("/intel/mock/test", -1) w.CollectNode.AddConfigItem("/intel/mock/foo", "password", "secret") pu := wmap.NewPublishNode("file", 3) pu.AddConfigItem("file", "/tmp/snap-TestCollectPublishWorkflow.out") - pr := wmap.NewProcessNode("passthru", 1) - time.Sleep(100 * time.Millisecond) pr.Add(pu) w.CollectNode.Add(pr) @@ -145,13 +173,16 @@ func TestCollectPublishWorkflow(t *testing.T) { Convey("Start scheduler", func() { err := s.Start() So(err, ShouldBeNil) - Convey("Create task", func() { - t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false) + Convey("Create and start task", func() { + el := newEventListener() + s.RegisterEventHandler("TestCollectPublishWorkflow", el) + t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true) So(err.Errors(), ShouldBeEmpty) So(t, ShouldNotBeNil) - t.(*task).Spin() - time.Sleep(3 * time.Second) - + <-el.done + So(t.LastFailureMessage(), ShouldBeEmpty) + So(t.FailedCount(), ShouldEqual, 0) + So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect) }) }) }) @@ -167,7 +198,7 @@ func TestProcessChainingWorkflow(t *testing.T) { s := New(GetDefaultConfig()) s.SetMetricManager(c) Convey("create a workflow with chained processors", func() { - lpe := newListenToPluginEvent() + lpe := newEventListener() c.RegisterEventHandler("Control.PluginLoaded", lpe) rp, err := core.NewRequestedPlugin(snap_collector_mock2_path) So(err, ShouldBeNil) @@ -208,12 +239,14 @@ func TestProcessChainingWorkflow(t *testing.T) { err := s.Start() So(err, ShouldBeNil) Convey("Create task", func() { - t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false) + t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true) + s.RegisterEventHandler("TestProcessChainingWorkflow", lpe) So(err.Errors(), ShouldBeEmpty) So(t, ShouldNotBeNil) - t.(*task).Spin() - time.Sleep(3 * time.Second) - + <-lpe.done + So(t.LastFailureMessage(), ShouldBeEmpty) + So(t.FailedCount(), ShouldEqual, 0) + So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect) }) }) })