Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Support specifying an instance of dynamic metric
Browse files Browse the repository at this point in the history
  • Loading branch information
IzabellaRaulin committed Sep 21, 2016
1 parent 79bcb1b commit e182c1c
Show file tree
Hide file tree
Showing 17 changed files with 2,226 additions and 977 deletions.
203 changes: 78 additions & 125 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,13 @@ type managesPlugins interface {
}

type catalogsMetrics interface {
Get(core.Namespace, int) (*metricType, error)
GetQueriedNamespaces(core.Namespace) ([]core.Namespace, error)
UpdateQueriedNamespaces(core.Namespace)
GetMetric(core.Namespace, int) (*metricType, error)
GetMetrics(core.Namespace, int) ([]*metricType, error)
Add(*metricType)
AddLoadedMetricType(*loadedPlugin, core.Metric) error
RmUnloadedPluginMetrics(lp *loadedPlugin)
GetVersions(core.Namespace) ([]*metricType, error)
Fetch(core.Namespace) ([]*metricType, error)
Item() (string, []*metricType)
Next() bool
Keys() []string
Subscribe([]string, int) error
Unsubscribe([]string, int) error
Expand Down Expand Up @@ -645,17 +642,6 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged
return nil
}

// MatchQueryToNamespaces performs the process of matching the 'ns' with namespaces of all cataloged metrics
func (p *pluginControl) matchQueryToNamespaces(ns core.Namespace) ([]core.Namespace, serror.SnapError) {
// carry out the matching process
p.metricCatalog.UpdateQueriedNamespaces(ns)
nss, err := p.metricCatalog.GetQueriedNamespaces(ns)
if err != nil {
return nil, serror.New(err)
}
return nss, nil
}

func (p *pluginControl) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree) []serror.SnapError {
return p.subscriptionGroups.ValidateDeps(requested, plugins, configTree)
}
Expand Down Expand Up @@ -688,85 +674,80 @@ func (p *pluginControl) verifyPlugin(lp *loadedPlugin) error {
return nil
}

func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric, configTree *cdata.ConfigDataTree) ([]core.Metric, []core.SubscribedPlugin, []serror.SnapError) {
newMetrics := []core.Metric{}
// getMetricsAndCollectors returns metrics to be collected grouped by plugin and collectors which are used to collect all of them
func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric, configTree *cdata.ConfigDataTree) (map[string]metricTypes, []core.SubscribedPlugin, []serror.SnapError) {
newMetricsGroupedByPlugin := make(map[string]metricTypes)
newPlugins := []core.SubscribedPlugin{}
var serrs []serror.SnapError
for _, r := range requested {
// get expanded namespaces from requested metrics
newNss, err := p.matchQueryToNamespaces(r.Namespace())
// get all metric types available in metricCatalog which fulfill the requested namespace and version (if ver <=0 the latest version will be taken)
newMetrics, err := p.metricCatalog.GetMetrics(r.Namespace(), r.Version())
if err != nil {
log.WithFields(log.Fields{
"_block": "control",
"action": "expanding-requested-metrics",
"query": r.Namespace(),
"err": err,
}).Error("error matching requested namespace with metric catalog")
serrs = append(serrs, err)
serrs = append(serrs, serror.New(err))
continue
}

if controlLogger.Level >= log.DebugLevel {
for _, n := range newNss {
for _, m := range newMetrics {
controlLogger.WithFields(log.Fields{
"_block": "control",
"ns": n.String(),
}).Debug("Expanded namespaces found")
"_block": "control",
"ns": m.Namespace().String(),
"version": m.Version(),
}).Debug("Expanded namespaces found in metric catalog")
}
}

if len(newNss) > 0 {
for _, ns := range newNss {
// Get metric types from metric catalog
m, err := p.metricCatalog.Get(ns, r.Version())
if err != nil {
log.WithFields(log.Fields{
"_block": "control",
"action": "expanding requested metrics",
"namespace": ns.String(),
"version": r.Version(),
}).Error("error retreiving metric given a namespace and version.")
serrs = append(serrs, serror.New(fmt.Errorf("error retreiving metric %s:%d", ns.String(), r.Version())))
continue
}
// in case config tree doesn't have any configuration for current namespace
// it's needed to initialize config, otherwise it will stay nil and panic later on
config := configTree.Get(ns.Strings())
if config == nil {
config = cdata.NewNode()
}
newMetrics = append(newMetrics, &metric{
namespace: m.Namespace(),
version: m.Version(),
config: config,
})

config = configTree.Get([]string{""})
if config == nil {
config = cdata.NewNode()
}
plugin := subscribedPlugin{
name: m.Plugin.Name(),
typeName: m.Plugin.TypeName(),
version: m.Plugin.Version(),
config: config,
}
for _, mt := range newMetrics {
// in case config tree doesn't have any configuration for current namespace
// it's needed to initialize config, otherwise it will stay nil and panic later on
cfg := configTree.Get(mt.Namespace().Strings())
if cfg == nil {
cfg = cdata.NewNode()
}
// set config to metric
mt.config = cfg

// loaded plugin which exposes the metric
lp := mt.Plugin
key := lp.Key()

// groups metricTypes by a plugin.Key()
pmt, _ := newMetricsGroupedByPlugin[key]

// pmt (plugin-metric-type) contains plugin and metrics types grouped to this plugin
pmt.plugin = lp
pmt.metricTypes = append(pmt.metricTypes, mt)
newMetricsGroupedByPlugin[key] = pmt

plugin := subscribedPlugin{
name: lp.Name(),
typeName: lp.TypeName(),
version: lp.Version(),
config: cdata.NewNode(),
}

if !containsPlugin(newPlugins, plugin) {
newPlugins = append(newPlugins, plugin)
}
if !containsPlugin(newPlugins, plugin) {
newPlugins = append(newPlugins, plugin)
}
}
}

if controlLogger.Level >= log.DebugLevel {
for _, m := range newMetrics {
log.WithFields(log.Fields{
"_block": "control",
"action": "gather",
"metric": fmt.Sprintf("%s:%d", m.Namespace().String(), m.Version()),
}).Debug("gathered metrics from workflow request")
}
for _, pmt := range newMetricsGroupedByPlugin {
for _, m := range pmt.Metrics() {
log.WithFields(log.Fields{
"_block": "control",
"action": "gather",
"metric": fmt.Sprintf("%s:%d", m.Namespace().String(), m.Version()),
}).Debug("gathered metrics from workflow request")
}

}
for _, p := range newPlugins {
log.WithFields(log.Fields{
"_block": "control",
Expand All @@ -776,7 +757,7 @@ func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric
}
}

return newMetrics, newPlugins, serrs
return newMetricsGroupedByPlugin, newPlugins, serrs
}

// SetMonitorOptions exposes monitors options
Expand Down Expand Up @@ -831,7 +812,7 @@ func (p *pluginControl) FetchMetrics(ns core.Namespace, version int) ([]core.Cat
} else if version < 0 {
// -1 (or less) is specified meaning return the latest
if _, ok := nsMap[mt.Namespace().String()]; !ok {
mt, err = p.metricCatalog.Get(mt.Namespace(), version)
mt, err = p.metricCatalog.GetMetric(mt.Namespace(), version)
if err != nil {
return nil, err
}
Expand All @@ -848,15 +829,26 @@ func (p *pluginControl) FetchMetrics(ns core.Namespace, version int) ([]core.Cat
}

func (p *pluginControl) GetMetric(ns core.Namespace, ver int) (core.CatalogedMetric, error) {
return p.metricCatalog.Get(ns, ver)
return p.metricCatalog.GetMetric(ns, ver)
}

func (p *pluginControl) GetMetrics(ns core.Namespace, ver int) ([]core.CatalogedMetric, error) {
mts, err := p.metricCatalog.GetMetrics(ns, ver)
if err != nil {
return nil, err
}
rmts := make([]core.CatalogedMetric, len(mts))
for i, m := range mts {
rmts[i] = m
}
return rmts, nil
}

func (p *pluginControl) GetMetricVersions(ns core.Namespace) ([]core.CatalogedMetric, error) {
mts, err := p.metricCatalog.GetVersions(ns)
if err != nil {
return nil, err
}

rmts := make([]core.CatalogedMetric, len(mts))
for i, m := range mts {
rmts[i] = m
Expand All @@ -865,7 +857,7 @@ func (p *pluginControl) GetMetricVersions(ns core.Namespace) ([]core.CatalogedMe
}

func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool {
_, err := p.metricCatalog.Get(mns, ver)
_, err := p.metricCatalog.GetMetric(mns, ver)
if err == nil {
return true
}
Expand All @@ -883,7 +875,7 @@ func (p *pluginControl) CollectMetrics(id string, allTags map[string]map[string]
}

// Subscription groups are processed anytime a plugin is loaded/unloaded.
results, serrs, err := p.subscriptionGroups.Get(id)
pluginToMetricMap, serrs, err := p.subscriptionGroups.Get(id)
if err != nil {
controlLogger.WithFields(log.Fields{
"_block": "CollectorMetrics",
Expand Down Expand Up @@ -913,24 +905,12 @@ func (p *pluginControl) CollectMetrics(id string, allTags map[string]map[string]
}
}

pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, results)
if err != nil {
errs = append(errs, err)
return
}

cMetrics := make(chan []core.Metric)
cError := make(chan error)
var wg sync.WaitGroup

// For each available plugin call available plugin using RPC client and wait for response (goroutines)
for pluginKey, pmt := range pluginToMetricMap {
// merge global plugin config into the config for the metric
for _, mt := range pmt.metricTypes {
if mt.Config() != nil {
mt.Config().ReverseMerge(p.Config.Plugins.getPluginConfigDataNode(core.CollectorPluginType, pmt.plugin.Name(), pmt.plugin.Version()))
}
}

wg.Add(1)

Expand Down Expand Up @@ -1058,43 +1038,16 @@ type metricTypes struct {
metricTypes []core.Metric
}

func (p *metricTypes) Count() int {
return len(p.metricTypes)
func (mts metricTypes) Count() int {
return len(mts.metricTypes)
}

// groupMetricTypesByPlugin groups metricTypes by a plugin.Key() and returns appropriate structure
func groupMetricTypesByPlugin(cat catalogsMetrics, mts []core.Metric) (map[string]metricTypes, serror.SnapError) {
pmts := make(map[string]metricTypes)
// For each plugin type select a matching available plugin to call
for _, incomingmt := range mts {
version := incomingmt.Version()
if version == 0 {
// If the version is not provided we will choose the latest
version = -1
}
catalogedmt, err := cat.Get(incomingmt.Namespace(), version)
if err != nil {
return nil, serror.New(err)
}
returnedmt := plugin.MetricType{
Namespace_: catalogedmt.Namespace(),
LastAdvertisedTime_: catalogedmt.LastAdvertisedTime(),
Version_: incomingmt.Version(),
Tags_: catalogedmt.Tags(),
Config_: incomingmt.Config(),
Unit_: catalogedmt.Unit(),
}
lp := catalogedmt.Plugin
if lp == nil {
return nil, serror.New(errorMetricNotFound(incomingmt.Namespace().String()))
}
key := lp.Key()
pmt, _ := pmts[key]
pmt.plugin = lp
pmt.metricTypes = append(pmt.metricTypes, returnedmt)
pmts[key] = pmt
}
return pmts, nil
func (mts metricTypes) Metrics() []core.Metric {
return mts.metricTypes
}

func (mts metricTypes) Plugin() *loadedPlugin {
return mts.plugin
}

func containsPlugin(slice []core.SubscribedPlugin, lookup subscribedPlugin) bool {
Expand Down
2 changes: 1 addition & 1 deletion control/control_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestGRPCServerScheduler(t *testing.T) {
// we don't expect rpc errors
So(err, ShouldBeNil)
So(len(reply.Errors), ShouldNotEqual, 0)
So(reply.Errors[0].ErrorString, ShouldResemble, "Metric not found: /this/is/invalid")
So(reply.Errors[0].ErrorString, ShouldResemble, "Metric not found: /this/is/invalid (version: 1000)")
})
Convey("Should not error with valid inputs", func() {
req := &rpc.SubscribeDepsRequest{
Expand Down
Loading

0 comments on commit e182c1c

Please sign in to comment.