Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1310 from marcin-krolik/fix/1128
Browse files Browse the repository at this point in the history
(SDI-1827): Fix #1128 Loading collector twice causes task failure
  • Loading branch information
marcin-krolik authored Nov 17, 2016
2 parents d81f3b9 + 213ef0e commit 46c56b2
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 33 deletions.
22 changes: 11 additions & 11 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type catalogsMetrics interface {
Keys() []string
Subscribe([]string, int) error
Unsubscribe([]string, int) error
GetPlugin(core.Namespace, int) (*loadedPlugin, error)
GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error)
}

type managesSigning interface {
Expand Down Expand Up @@ -715,26 +715,26 @@ func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric

// apply defaults to the metric that may be present in the plugins
// configpolicy
if pluginCfg := mt.Plugin.ConfigPolicy.Get(mt.Namespace().Strings()); pluginCfg != nil {
if pluginCfg := mt.Plugin.Policy().Get(mt.Namespace().Strings()); pluginCfg != nil {
mt.config.ApplyDefaults(pluginCfg.Defaults())
}

// loaded plugin which exposes the metric
lp := mt.Plugin
key := lp.Key()
// cataloged plugin which exposes the metric
cp := mt.Plugin
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version())

// groups metricTypes by a plugin.Key()
pmt, _ := newMetricsGroupedByPlugin[key]

// pmt (plugin-metric-type) contains plugin and metrics types grouped to this plugin
pmt.plugin = lp
pmt.plugin = cp
pmt.metricTypes = append(pmt.metricTypes, mt)
newMetricsGroupedByPlugin[key] = pmt

plugin := subscribedPlugin{
name: lp.Name(),
typeName: lp.TypeName(),
version: lp.Version(),
name: cp.Name(),
typeName: cp.TypeName(),
version: cp.Version(),
config: cdata.NewNode(),
}

Expand Down Expand Up @@ -1046,7 +1046,7 @@ func (r *requestedPlugin) Config() *cdata.ConfigDataNode {

// just a tuple of loadedPlugin and metricType slice
type metricTypes struct {
plugin *loadedPlugin
plugin core.CatalogedPlugin
metricTypes []core.Metric
}

Expand All @@ -1058,7 +1058,7 @@ func (mts metricTypes) Metrics() []core.Metric {
return mts.metricTypes
}

func (mts metricTypes) Plugin() *loadedPlugin {
func (mts metricTypes) Plugin() core.CatalogedPlugin {
return mts.plugin
}

Expand Down
2 changes: 1 addition & 1 deletion control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (m *mc) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) {
return nil, nil
}

func (m *mc) GetPlugin(core.Namespace, int) (*loadedPlugin, error) {
func (m *mc) GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) {
return nil, nil
}

Expand Down
108 changes: 96 additions & 12 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *metricCatalogItem) Versions() map[int]core.Metric {
}

type metricType struct {
Plugin *loadedPlugin
Plugin core.CatalogedPlugin
namespace core.Namespace
version int
lastAdvertisedTime time.Time
Expand Down Expand Up @@ -163,12 +163,24 @@ func (m *metric) Version() int {
return m.version
}

func (m *metric) Data() interface{} { return nil }
func (m *metric) Description() string { return "" }
func (m *metric) Unit() string { return "" }
func (m *metric) Tags() map[string]string { return nil }
func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) }
func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) }
func (m *metric) Data() interface{} {
return nil
}
func (m *metric) Description() string {
return ""
}
func (m *metric) Unit() string {
return ""
}
func (m *metric) Tags() map[string]string {
return nil
}
func (m *metric) LastAdvertisedTime() time.Time {
return time.Unix(0, 0)
}
func (m *metric) Timestamp() time.Time {
return time.Unix(0, 0)
}

type processesConfigData interface {
Process(map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *cpolicy.ProcessingErrors)
Expand Down Expand Up @@ -250,6 +262,78 @@ func (m *metricType) Unit() string {
return m.unit
}

type catalogedPlugin struct {
name string
version int
signed bool
typeName plugin.PluginType
state pluginState
path string
loadedTime time.Time
configPolicy *cpolicy.ConfigPolicy
}

func (cp *catalogedPlugin) TypeName() string {
return cp.typeName.String()
}

func (cp *catalogedPlugin) Name() string {
return cp.name
}

func (cp *catalogedPlugin) Version() int {
return cp.version
}

func (cp *catalogedPlugin) IsSigned() bool {
return cp.signed
}

func (cp *catalogedPlugin) Status() string {
return string(cp.state)
}

func (cp *catalogedPlugin) PluginPath() string {
return cp.path
}

func (cp *catalogedPlugin) LoadedTimestamp() *time.Time {
return &cp.loadedTime
}

func (cp *catalogedPlugin) Policy() *cpolicy.ConfigPolicy {
return cp.configPolicy
}

func newCatalogedPlugin(lp *loadedPlugin) core.CatalogedPlugin {
cp := cpolicy.New()
for _, keyNode := range lp.Policy().GetAll() {
node := cpolicy.NewPolicyNode()
rules, err := keyNode.ConfigPolicyNode.CopyRules()
if err != nil {
log.WithFields(log.Fields{
"_module": "control",
"_file": "metrics.go,",
"_block": "newCatalogedPlugin",
"error": err.Error(),
}).Error("Unable to copy rules")
return nil
}
node.Add(rules...)
cp.Add(keyNode.Key, node)
}
return &catalogedPlugin{
name: lp.Name(),
version: lp.Version(),
signed: lp.IsSigned(),
typeName: lp.Type,
state: lp.State,
path: lp.PluginPath(),
loadedTime: lp.LoadedTime,
configPolicy: cp,
}
}

type metricCatalog struct {
tree *MTTrie
mutex *sync.Mutex
Expand Down Expand Up @@ -288,8 +372,9 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) e
}).Error("error adding loaded metric type")
return err
}

newMt := metricType{
Plugin: lp,
Plugin: newCatalogedPlugin(lp),
namespace: mt.Namespace(),
version: mt.Version(),
lastAdvertisedTime: mt.LastAdvertisedTime(),
Expand Down Expand Up @@ -326,7 +411,6 @@ func (mc *metricCatalog) Add(m *metricType) {

// adding key as a cataloged keys (mc.keys)
mc.keys = appendIfMissing(mc.keys, key)

mc.tree.Add(m)
}

Expand Down Expand Up @@ -366,7 +450,7 @@ func (mc *metricCatalog) GetMetric(requested core.Namespace, version int) (*metr
version: catalogedmt.Version(),
lastAdvertisedTime: catalogedmt.LastAdvertisedTime(),
tags: catalogedmt.Tags(),
policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()),
policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()),
config: catalogedmt.Config(),
unit: catalogedmt.Unit(),
description: catalogedmt.Description(),
Expand Down Expand Up @@ -414,7 +498,7 @@ func (mc *metricCatalog) GetMetrics(requested core.Namespace, version int) ([]*m
version: catalogedmt.Version(),
lastAdvertisedTime: catalogedmt.LastAdvertisedTime(),
tags: catalogedmt.Tags(),
policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()),
policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()),
config: catalogedmt.Config(),
unit: catalogedmt.Unit(),
description: catalogedmt.Description(),
Expand Down Expand Up @@ -510,7 +594,7 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error {
return m.Unsubscribe()
}

func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (*loadedPlugin, error) {
func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (core.CatalogedPlugin, error) {
mt, err := mc.tree.GetMetric(mns.Strings(), ver)
if err != nil {
log.WithFields(log.Fields{
Expand Down
11 changes: 8 additions & 3 deletions control/mttrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"sort"
"strings"

"github.com/intelsdi-x/snap/core"
)

/*
Expand Down Expand Up @@ -72,7 +74,8 @@ func NewMTTrie() *MTTrie {
func (m *MTTrie) String() string {
out := ""
for _, mt := range m.gatherMetricTypes() {
out += fmt.Sprintf("%s => %s\n", mt.Key(), mt.Plugin.Key())
pluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version())
out += fmt.Sprintf("%s => %s\n", mt.Key(), pluginKey)
}
return out
}
Expand All @@ -92,9 +95,11 @@ func (m *MTTrie) gatherMetricTypes() []metricType {
}

// DeleteByPlugin removes all metrics from the catalog if they match a loadedPlugin
func (m *MTTrie) DeleteByPlugin(lp *loadedPlugin) {
func (m *MTTrie) DeleteByPlugin(cp core.CatalogedPlugin) {
for _, mt := range m.gatherMetricTypes() {
if mt.Plugin.Key() == lp.Key() {
mtPluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version())
cpKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version())
if mtPluginKey == cpKey {
// remove this metric
m.RemoveMetric(mt)
}
Expand Down
48 changes: 48 additions & 0 deletions control/plugin/cpolicy/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,54 @@ func NewPolicyNode() *ConfigPolicyNode {
}
}

func (c *ConfigPolicyNode) CopyRules() ([]Rule, error) {
rules := []Rule{}
for _, rule := range c.rules {
var err error
switch rule.(type) {
case *BoolRule:
var newBoolRule *BoolRule
if rule.Default() != nil {
newBoolRule, err = NewBoolRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueBool).Value)
} else {
newBoolRule, err = NewBoolRule(rule.Key(), rule.Required())
}
rules = append(rules, newBoolRule)
case *StringRule:
var newStringRule *StringRule
if rule.Default() != nil {
newStringRule, err = NewStringRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueStr).Value)
} else {
newStringRule, err = NewStringRule(rule.Key(), rule.Required())
}
rules = append(rules, newStringRule)
case *FloatRule:
var newFloatRule *FloatRule
if rule.Default() != nil {
newFloatRule, err = NewFloatRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueFloat).Value)
} else {
newFloatRule, err = NewFloatRule(rule.Key(), rule.Required())
}
rules = append(rules, newFloatRule)
case *IntRule:
var newIntRule *IntRule
if rule.Default() != nil {
newIntRule, err = NewIntegerRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueInt).Value)
} else {
newIntRule, err = NewIntegerRule(rule.Key(), rule.Required())
}
rules = append(rules, newIntRule)
default:
return []Rule{}, errors.New(fmt.Sprint("Unknown rule type"))
}

if err != nil {
return []Rule{}, errors.New(fmt.Sprintf("Could not create rule %s type %s ", rule.Key(), rule.Type()))
}
}
return rules, nil
}

// UnmarshalJSON unmarshals JSON into a ConfigPolicyNode
func (c *ConfigPolicyNode) UnmarshalJSON(data []byte) error {
m := map[string]interface{}{}
Expand Down
21 changes: 15 additions & 6 deletions control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
return nil, serror.New(err)
}

key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version)
if _, exists := p.loadedPlugins.table[key]; exists {
return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{
"plugin-name": resp.Meta.Name,
"plugin-version": resp.Meta.Version,
"plugin-type": resp.Type.String(),
})
}

ap, err := newAvailablePlugin(resp, emitter, ePlugin)
if err != nil {
pmLogger.WithFields(log.Fields{
Expand Down Expand Up @@ -362,7 +371,13 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}).Error("error in getting config policy")
return nil, serror.New(err)
}

lPlugin.ConfigPolicy = cp
lPlugin.Meta = resp.Meta
lPlugin.Type = resp.Type
lPlugin.Token = resp.Token
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

if resp.Type == plugin.CollectorPluginType {
cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version)
Expand Down Expand Up @@ -494,12 +509,6 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
return nil, serror.New(e)
}

lPlugin.Meta = resp.Meta
lPlugin.Type = resp.Type
lPlugin.Token = resp.Token
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

aErr := p.loadedPlugins.add(lPlugin)
if aErr != nil {
pmLogger.WithFields(log.Fields{
Expand Down

0 comments on commit 46c56b2

Please sign in to comment.