Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Support global tags in global config
Browse files Browse the repository at this point in the history
The tags will be applied in the order global tags < task manifest tags
  • Loading branch information
kindermoumoute committed Jan 5, 2017
1 parent 3e2dd25 commit 7b01c63
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 122 deletions.
45 changes: 32 additions & 13 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ type pluginConfigItem struct {
// UnmarshalJSON method in this same file needs to be modified to
// match the field mapping that is defined here
type Config struct {
MaxRunningPlugins int `json:"max_running_plugins"yaml:"max_running_plugins"`
PluginLoadTimeout int `json:"plugin_load_timeout"yaml:"plugin_load_timeout"`
PluginTrust int `json:"plugin_trust_level"yaml:"plugin_trust_level"`
AutoDiscoverPath string `json:"auto_discover_path"yaml:"auto_discover_path"`
KeyringPaths string `json:"keyring_paths"yaml:"keyring_paths"`
CacheExpiration jsonutil.Duration `json:"cache_expiration"yaml:"cache_expiration"`
Plugins *pluginConfig `json:"plugins"yaml:"plugins"`
ListenAddr string `json:"listen_addr,omitempty"yaml:"listen_addr"`
ListenPort int `json:"listen_port,omitempty"yaml:"listen_port"`
Pprof bool `json:"pprof"yaml:"pprof"`
MaxPluginRestarts int `json:"max_plugin_restarts"yaml:"max_plugin_restarts"`
MaxRunningPlugins int `json:"max_running_plugins"yaml:"max_running_plugins"`
PluginLoadTimeout int `json:"plugin_load_timeout"yaml:"plugin_load_timeout"`
PluginTrust int `json:"plugin_trust_level"yaml:"plugin_trust_level"`
AutoDiscoverPath string `json:"auto_discover_path"yaml:"auto_discover_path"`
KeyringPaths string `json:"keyring_paths"yaml:"keyring_paths"`
CacheExpiration jsonutil.Duration `json:"cache_expiration"yaml:"cache_expiration"`
Plugins *pluginConfig `json:"plugins"yaml:"plugins"`
Tags map[string]map[string]string `json:"tags,omitempty"yaml:"tags"`
ListenAddr string `json:"listen_addr,omitempty"yaml:"listen_addr"`
ListenPort int `json:"listen_port,omitempty"yaml:"listen_port"`
Pprof bool `json:"pprof"yaml:"pprof"`
MaxPluginRestarts int `json:"max_plugin_restarts"yaml:"max_plugin_restarts"`
}

const (
Expand Down Expand Up @@ -117,6 +118,11 @@ const (
"properties" : {},
"additionalProperties": true
},
"tags": {
"type": ["object", "null"],
"properties" : {},
"additionalProperties": true
},
"listen_addr": {
"type": "string"
},
Expand Down Expand Up @@ -147,6 +153,7 @@ func GetDefaultConfig() *Config {
KeyringPaths: defaultKeyringPaths,
CacheExpiration: jsonutil.Duration{defaultCacheExpiration},
Plugins: newPluginConfig(),
Tags: newPluginTags(),
Pprof: defaultPprof,
MaxPluginRestarts: MaxPluginRestartCount,
}
Expand Down Expand Up @@ -182,6 +189,10 @@ func newPluginConfig() *pluginConfig {
}
}

func newPluginTags() map[string]map[string]string {
return make(map[string]map[string]string)
}

func (p *Config) GetPluginConfigDataNode(pluginType core.PluginType, name string, ver int) cdata.ConfigDataNode {
return *p.Plugins.getPluginConfigDataNode(pluginType, name, ver)
}
Expand Down Expand Up @@ -295,14 +306,16 @@ func (p *pluginConfig) switchPluginConfigType(pluginType core.PluginType) *plugi
case core.PublisherPluginType:
return p.Publisher
}
// never happens
return nil
}

func (p *pluginConfig) mergePluginConfigDataNode(pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) {
// clear cache
p.pluginCache = make(map[string]*cdata.ConfigDataNode)
configItem := p.switchPluginConfigType(pluginType)
if configItem == nil {
return
}

// merge new config into existing
if res, ok := configItem.Plugins[name]; ok {
Expand Down Expand Up @@ -331,6 +344,9 @@ func (p *pluginConfig) deletePluginConfigDataNodeField(pluginType core.PluginTyp
// clear cache
p.pluginCache = make(map[string]*cdata.ConfigDataNode)
configItem := p.switchPluginConfigType(pluginType)
if configItem == nil {
return
}

if res, ok := configItem.Plugins[name]; ok {
if res2, ok2 := res.Versions[ver]; ok2 {
Expand All @@ -340,7 +356,7 @@ func (p *pluginConfig) deletePluginConfigDataNodeField(pluginType core.PluginTyp
res.DeleteItem(key)
return
}
p.Collector.All.DeleteItem(key)
configItem.All.DeleteItem(key)

}

Expand All @@ -358,6 +374,9 @@ func (p *pluginConfig) getPluginConfigDataNode(pluginType core.PluginType, name

// check for plugin config
configItem := p.switchPluginConfigType(pluginType)
if configItem == nil {
return nil
}
p.pluginCache[key].Merge(configItem.All)
if res, ok := configItem.Plugins[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
Expand Down
6 changes: 6 additions & 0 deletions control/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func TestPluginConfig(t *testing.T) {
So(cfg.Plugins.Collector.Plugins["pcm"].Versions[1].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"})
So(cfg.Plugins.Processor, ShouldNotBeNil)
So(cfg.Plugins.Processor.Plugins["movingaverage"].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"})
So(cfg.Tags["/intel/psutil"], ShouldNotBeNil)
So(cfg.Tags["/intel/psutil"]["context"], ShouldNotBeNil)
So(cfg.Tags["/intel/psutil"]["context"], ShouldEqual, "config_example")
So(cfg.Tags["/"], ShouldNotBeNil)
So(cfg.Tags["/"]["color"], ShouldNotBeNil)
So(cfg.Tags["/"]["color"], ShouldEqual, "green")

Convey("We can access the config for plugins", func() {
Convey("Getting the values of a specific version of a plugin", func() {
Expand Down
12 changes: 11 additions & 1 deletion control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ type managesPlugins interface {
SetMetricCatalog(catalogsMetrics)
GenerateArgs(logLevel int) plugin.Arg
SetPluginConfig(*pluginConfig)
SetPluginTags(map[string]map[string]string)
AddStandardAndWorkflowTags(core.Metric, map[string]map[string]string) core.Metric
SetPluginLoadTimeout(int)
}

Expand Down Expand Up @@ -174,6 +176,13 @@ func OptSetConfig(cfg *Config) PluginControlOpt {
}
}

// OptSetTags sets the plugin control tags.
func OptSetTags(tags map[string]map[string]string) PluginControlOpt {
return func(c *pluginControl) {
c.pluginManager.SetPluginTags(tags)
}
}

// MaximumPluginRestarts
func MaxPluginRestarts(cfg *Config) PluginControlOpt {
return func(*pluginControl) {
Expand All @@ -188,6 +197,7 @@ func New(cfg *Config) *pluginControl {
MaxRunningPlugins(cfg.MaxRunningPlugins),
CacheExpiration(cfg.CacheExpiration.Duration),
OptSetConfig(cfg),
OptSetTags(cfg.Tags),
MaxPluginRestarts(cfg),
}
c := &pluginControl{}
Expand Down Expand Up @@ -977,7 +987,7 @@ func (p *pluginControl) CollectMetrics(id string, allTags map[string]map[string]
// plugin authors to inadvertently overwrite or not pass along the data
// passed to CollectMetrics so we will help them out here.
for i := range m {
m[i] = addStandardAndWorkflowTags(m[i], allTags)
m[i] = p.pluginManager.AddStandardAndWorkflowTags(m[i], allTags)
}
metrics = append(metrics, m...)
wg.Done()
Expand Down
18 changes: 11 additions & 7 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ func (m *MockPluginManagerBadSwap) LoadPlugin(*pluginDetails, gomit.Emitter) (*l
func (m *MockPluginManagerBadSwap) UnloadPlugin(c core.Plugin) (*loadedPlugin, serror.SnapError) {
return nil, serror.New(errors.New("fake"))
}
func (m *MockPluginManagerBadSwap) get(string) (*loadedPlugin, error) { return nil, nil }
func (m *MockPluginManagerBadSwap) teardown() {}
func (m *MockPluginManagerBadSwap) SetPluginConfig(*pluginConfig) {}
func (m *MockPluginManagerBadSwap) SetPluginLoadTimeout(int) {}
func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) {}
func (m *MockPluginManagerBadSwap) SetEmitter(gomit.Emitter) {}
func (m *MockPluginManagerBadSwap) GenerateArgs(int) plugin.Arg { return plugin.Arg{} }
func (m *MockPluginManagerBadSwap) get(string) (*loadedPlugin, error) { return nil, nil }
func (m *MockPluginManagerBadSwap) teardown() {}
func (m *MockPluginManagerBadSwap) SetPluginConfig(*pluginConfig) {}
func (m *MockPluginManagerBadSwap) SetPluginTags(map[string]map[string]string) {}
func (m *MockPluginManagerBadSwap) AddStandardAndWorkflowTags(met core.Metric, allTags map[string]map[string]string) core.Metric {
return nil
}
func (m *MockPluginManagerBadSwap) SetPluginLoadTimeout(int) {}
func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) {}
func (m *MockPluginManagerBadSwap) SetEmitter(gomit.Emitter) {}
func (m *MockPluginManagerBadSwap) GenerateArgs(int) plugin.Arg { return plugin.Arg{} }

func (m *MockPluginManagerBadSwap) all() map[string]*loadedPlugin {
return m.loadedPlugins.table
Expand Down
33 changes: 0 additions & 33 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,39 +618,6 @@ func appendIfMissing(keys []string, ns string) []string {
return append(keys, ns)
}

func addStandardAndWorkflowTags(m core.Metric, allTags map[string]map[string]string) core.Metric {
hostname := hostnameReader.Hostname()

tags := m.Tags()
if tags == nil {
tags = map[string]string{}
}
// apply standard tag
tags[core.STD_TAG_PLUGIN_RUNNING_ON] = hostname

// apply tags from workflow
for ns, nsTags := range allTags {
if strings.HasPrefix(m.Namespace().String(), ns) {
for k, v := range nsTags {
tags[k] = v
}
}
}

metric := plugin.MetricType{
Namespace_: m.Namespace(),
Version_: m.Version(),
LastAdvertisedTime_: m.LastAdvertisedTime(),
Config_: m.Config(),
Data_: m.Data(),
Tags_: tags,
Description_: m.Description(),
Unit_: m.Unit(),
Timestamp_: m.Timestamp(),
}
return metric
}

// isTuple returns true when incoming namespace's element has been recognized as a tuple, otherwise returns false
// notice, that the tuple is a string which starts with `core.TuplePrefix`, ends with `core.TupleSuffix`
// and contains at least one `core.TupleSeparator`, e.g. (host0;host1)
Expand Down
3 changes: 2 additions & 1 deletion control/metrics_small_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func TestAddTagsFromWorkflow(t *testing.T) {
hostnameReader = &mockHostnameReader{}
tcs := prepareTestCases()
Convey("Adding tags to metric type", t, func() {
p := newPluginManager()
for _, tc := range tcs {
outputTags := addStandardAndWorkflowTags(tc.Metric, tc.InputTags).Tags()
outputTags := p.AddStandardAndWorkflowTags(tc.Metric, tc.InputTags).Tags()
So(outputTags, ShouldNotBeNil)
So(outputTags, ShouldResemble, tc.ExpectedTags)
}
Expand Down
78 changes: 77 additions & 1 deletion control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ type pluginManager struct {
loadedPlugins *loadedPlugins
logPath string
pluginConfig *pluginConfig
pluginTags map[string]map[string]string
pprof bool
}

Expand All @@ -249,6 +250,7 @@ func newPluginManager(opts ...pluginManagerOpt) *pluginManager {
loadedPlugins: newLoadedPlugins(),
logPath: logPath,
pluginConfig: newPluginConfig(),
pluginTags: newPluginTags(),
}

for _, opt := range opts {
Expand All @@ -274,6 +276,13 @@ func OptSetPluginConfig(cf *pluginConfig) pluginManagerOpt {
}
}

// OptSetPluginTags sets the tags on the plugin manager
func OptSetPluginTags(tags map[string]map[string]string) pluginManagerOpt {
return func(p *pluginManager) {
p.pluginTags = tags
}
}

// SetPluginLoadTimeout sets plugin load timeout
func (p *pluginManager) SetPluginLoadTimeout(to int) {
p.pluginLoadTimeout = to
Expand All @@ -284,6 +293,11 @@ func (p *pluginManager) SetPluginConfig(cf *pluginConfig) {
p.pluginConfig = cf
}

// SetPluginTags sets plugin tags
func (p *pluginManager) SetPluginTags(tags map[string]map[string]string) {
p.pluginTags = tags
}

// SetMetricCatalog sets metric catalog
func (p *pluginManager) SetMetricCatalog(mc catalogsMetrics) {
p.metricCatalog = mc
Expand Down Expand Up @@ -481,7 +495,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}

//Add standard tags
nmt = addStandardAndWorkflowTags(nmt, nil)
nmt = p.AddStandardAndWorkflowTags(nmt, nil)

if err := p.metricCatalog.AddLoadedMetricType(lPlugin, nmt); err != nil {
pmLogger.WithFields(log.Fields{
Expand Down Expand Up @@ -624,3 +638,65 @@ func (p *pluginManager) all() map[string]*loadedPlugin {
defer p.loadedPlugins.RUnlock()
return p.loadedPlugins.table
}

func hasPrefix(ns1 []string, ns2 []string) bool {
for i := range ns2 {
if i > len(ns1)-1 || ns1[i] != ns2[i] {
return false
}
}
return true
}

func split(ns string) []string {
// the first character is the separator
if len(ns) <= 1 {
return nil
}
sep := string(ns[0])
ns = strings.TrimSuffix(ns, sep)
ns = strings.TrimPrefix(ns, sep)

return strings.Split(ns, sep)
}

func (p *pluginManager) AddStandardAndWorkflowTags(m core.Metric, allTags map[string]map[string]string) core.Metric {
hostname := hostnameReader.Hostname()

tags := m.Tags()
if tags == nil {
tags = map[string]string{}
}
// apply standard tag
tags[core.STD_TAG_PLUGIN_RUNNING_ON] = hostname

// apply tags from global tags
for ns, nsTags := range p.pluginTags {
if hasPrefix(m.Namespace().Strings(), split(ns)) {
for k, v := range nsTags {
tags[k] = v
}
}
}
// apply tags from workflow
for ns, nsTags := range allTags {
if hasPrefix(m.Namespace().Strings(), split(ns)) {
for k, v := range nsTags {
tags[k] = v
}
}
}

metric := plugin.MetricType{
Namespace_: m.Namespace(),
Version_: m.Version(),
LastAdvertisedTime_: m.LastAdvertisedTime(),
Config_: m.Config(),
Data_: m.Data(),
Tags_: tags,
Description_: m.Description(),
Unit_: m.Unit(),
Timestamp_: m.Timestamp(),
}
return metric
}
7 changes: 6 additions & 1 deletion control/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ func TestLoadPlugin(t *testing.T) {
Convey("with a plugin config a plugin loads successfully", func() {
cfg := GetDefaultConfig()
cfg.Plugins.Collector.Plugins["mock"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true}))
p := newPluginManager(OptSetPluginConfig(cfg.Plugins))
tags := newPluginTags()
tags["/intel/mock"] = make(map[string]string)
tags["/intel/mock"]["context"] = "plugin_manager_test"
p := newPluginManager(OptSetPluginConfig(cfg.Plugins), OptSetPluginTags(tags))
p.SetMetricCatalog(newMetricCatalog())
lp, serr := loadPlugin(p, fixtures.PluginPathMock2)

Expand All @@ -127,7 +130,9 @@ func TestLoadPlugin(t *testing.T) {
So(mts[0].Description(), ShouldResemble, "mock description")
So(mts[0].Unit(), ShouldResemble, "mock unit")
So(mts[0].Tags(), ShouldContainKey, "plugin_running_on")
So(mts[0].Tags(), ShouldContainKey, "context")
So(mts[0].Tags()["plugin_running_on"], ShouldNotResemble, "")
So(mts[0].Tags()["context"], ShouldResemble, "plugin_manager_test")
})

Convey("for a plugin requiring a config an incomplete config will result in a load failure", func() {
Expand Down
9 changes: 9 additions & 0 deletions docs/SNAPTELD_CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ control:
1:
user: tiffany
password: new password

# tags section contains global tags that will be applied on collected metrics
# across tasks.
tags:
/intel/psutil:
datacenter: rennes
# tags all metrics
/:
country: france
```

### snapteld scheduler configurations
Expand Down
Loading

0 comments on commit 7b01c63

Please sign in to comment.