Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fix publishers and processors subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
mkleina committed Jul 13, 2017
1 parent a492827 commit b05093c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
4 changes: 2 additions & 2 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,15 +625,15 @@ func (ap *availablePlugins) findLatestPool(pType, name string) (strategy.Pool, s
var latest strategy.Pool
for key, pool := range ap.table {
tnv := strings.Split(key, core.Separator)
if tnv[0] == pType && tnv[1] == name {
if tnv[0] == pType && tnv[1] == name && pool.Count() > 0 {
latest = pool
break
}
}
if latest != nil {
for key, pool := range ap.table {
tnv := strings.Split(key, core.Separator)
if tnv[0] == pType && tnv[1] == name && pool.Version() > latest.Version() {
if tnv[0] == pType && tnv[1] == name && pool.Version() > latest.Version() && pool.Count() > 0 {
latest = pool
}
}
Expand Down
17 changes: 15 additions & 2 deletions control/subscription_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,6 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) {

// notice that requested plugins contains only processors and publishers
for _, plugin := range s.requestedPlugins {
// add processors and publishers to collectors just gathered
plugins = append(plugins, plugin)
// add defaults to plugins (exposed in a plugins ConfigPolicy)
if lp, err := s.pluginManager.get(
fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d",
Expand All @@ -445,6 +443,21 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) {
// set defaults to plugin config
plugin.Config().ApplyDefaults(policy.Defaults())
}

// update version info for subscribed processor or publisher
version := plugin.Version()
if version < 1 {
version = lp.Version()
}
s := subscribedPlugin{
name: plugin.Name(),
typeName: plugin.TypeName(),
version: version,
config: plugin.Config(),
}

// add processors and publishers to collectors just gathered
plugins = append(plugins, s)
}
}
// calculates those plugins that need to be subscribed and unsubscribed to
Expand Down
8 changes: 7 additions & 1 deletion scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestCollectPublishWorkflow(t *testing.T) {
So(err, ShouldBeNil)
rp2, err := core.NewRequestedPlugin(snap_publisher_file_path, c.GetTempDir(), nil)
So(err, ShouldBeNil)
_, err = c.Load(rp2)
plugPublisher, err := c.Load(rp2)
So(err, ShouldBeNil)
rp3, err := core.NewRequestedPlugin(snap_processor_passthru_path, c.GetTempDir(), nil)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -189,6 +189,12 @@ func TestCollectPublishWorkflow(t *testing.T) {
So(t.LastFailureMessage(), ShouldBeEmpty)
So(t.FailedCount(), ShouldEqual, 0)
So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect)

// check if task fails after unloading publisher
c.Unload(plugPublisher)
<-el.done
So(t.LastFailureMessage(), ShouldNotBeEmpty)
So(t.FailedCount(), ShouldBeGreaterThan, 0)
})
})
})
Expand Down

0 comments on commit b05093c

Please sign in to comment.