Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds support for streaming collectors
Browse files Browse the repository at this point in the history
    - Adds streaming support
    - Updates deps
    - Fixes #1650
  • Loading branch information
jcooklin committed Jun 17, 2017
1 parent 20c0fb2 commit 0050af4
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 78 deletions.
6 changes: 6 additions & 0 deletions cmd/snaptel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (t *task) setScheduleFromCliOptions(ctx *cli.Context) error {
}
duration = &d
}

// It's a streaming collector
if strings.Compare(t.Schedule.Type, "streaming") == 0 {
return nil
}

// Grab the interval for the schedule (if one was provided). Note that if an
// interval value was not passed in and there is no interval defined for the
// schedule associated with this task, it's an error
Expand Down
23 changes: 20 additions & 3 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
if !security.TLSEnabled && resp.Meta.TLSEnabled {
return nil, errors.New(ErrMsgInsecureClient + "; plugin_name: " + resp.Meta.Name)
}
if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType {
if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType && resp.Type != plugin.StreamCollectorPluginType {
return nil, strategy.ErrBadType
}
ap := &availablePlugin{
Expand Down Expand Up @@ -172,6 +172,20 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
default:
return nil, errors.New("Invalid RPCTYPE")
}
case plugin.StreamCollectorPluginType:
switch resp.Meta.RPCType {
case plugin.STREAMGRPC:
c, e := client.NewStreamCollectorGrpcClient(
resp.ListenAddress,
DefaultClientTimeout,
security)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
default:
return nil, errors.New("Invalid RPCTYPE")
}
default:
return nil, errors.New("Cannot create a client for a plugin of the type: " + resp.Type.String())
}
Expand Down Expand Up @@ -279,7 +293,10 @@ func (a *availablePlugin) Kill(r string) error {
c.Killed()
}

return a.ePlugin.Kill()
if a.ePlugin != nil {
return a.ePlugin.Kill()
}
return nil
}

// CheckHealth checks the health of a plugin and updates
Expand Down Expand Up @@ -365,7 +382,7 @@ func newAvailablePlugins() *availablePlugins {
}

func (ap *availablePlugins) insert(pl *availablePlugin) error {
if pl.pluginType != plugin.CollectorPluginType && pl.pluginType != plugin.ProcessorPluginType && pl.pluginType != plugin.PublisherPluginType {
if pl.pluginType != plugin.CollectorPluginType && pl.pluginType != plugin.ProcessorPluginType && pl.pluginType != plugin.PublisherPluginType && pl.pluginType != plugin.StreamCollectorPluginType {
return strategy.ErrBadType
}

Expand Down
8 changes: 4 additions & 4 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ func (p *pluginConfig) deletePluginConfigDataNodeFieldAll(key string) {
}

func (p *pluginConfig) switchPluginConfigType(pluginType core.PluginType) *pluginTypeConfigItem {
switch pluginType {
case core.CollectorPluginType:
switch {
case pluginType == core.CollectorPluginType || pluginType == core.StreamingCollectorPluginType:
return p.Collector
case core.ProcessorPluginType:
case pluginType == core.ProcessorPluginType:
return p.Processor
case core.PublisherPluginType:
case pluginType == core.PublisherPluginType:
return p.Publisher
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,8 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged
return nil
}

func (p *pluginControl) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree) []serror.SnapError {
return p.subscriptionGroups.ValidateDeps(requested, plugins, configTree)
func (p *pluginControl) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) []serror.SnapError {
return p.subscriptionGroups.ValidateDeps(requested, plugins, configTree, asserts...)
}

// SubscribeDeps will subscribe to collectors, processors and publishers. The collectors are subscribed by mapping the provided
Expand Down
35 changes: 33 additions & 2 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -468,14 +469,44 @@ func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan
if values != nil {
maxCollectDuration, ok := values["MaxCollectDuration"]
if ok {
t, ok := maxCollectDuration.(*ctypes.ConfigValueInt)
t, ok := maxCollectDuration.(ctypes.ConfigValueInt)
if ok {
// MaxCollectDuration was passed as an int therefore
// it is representing nanoseconds
arg.MaxCollectDuration = int64(t.Value)
} else {
t, ok := maxCollectDuration.(ctypes.ConfigValueStr)
if ok {
// MaxCollectDuration was passed as a string therefore
// it should be a string rep of a duration
dur, err := time.ParseDuration(t.Value)
if err != nil {
log.WithFields(
log.Fields{
"_block": "StreamMetrics",
"config-key": "MaxCollectDuration",
"hint": "value should be a parsable duration (e.g. 5s)",
"error": err.Error(),
},
).Warn("invalid config value")
}
arg.MaxCollectDuration = dur.Nanoseconds()
} else {
log.WithFields(
log.Fields{
"_block": "StreamMetrics",
"config-key": "MaxCollectDuration",
"type-provided": reflect.TypeOf(maxCollectDuration).String(),
"type-wanted": ctypes.ConfigValueStr{}.Type(),
"hint": "value should be a parsable duration (e.g. 5s)",
},
).Warn("wrong config value type")
}
}
}
maxMetricsBuffer, ok := values["MaxMetricsBuffer"]
if ok {
t, ok := maxMetricsBuffer.(*ctypes.ConfigValueInt)
t, ok := maxMetricsBuffer.(ctypes.ConfigValueInt)
if ok {
arg.MaxMetricsBuffer = int64(t.Value)
}
Expand Down
2 changes: 1 addition & 1 deletion control/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
"collector",
"processor",
"publisher",
"streamCollector",
"streaming-collector",
}

routingStrategyTypes = [...]string{
Expand Down
12 changes: 7 additions & 5 deletions control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

if resp.Type == plugin.CollectorPluginType {
if resp.Type == plugin.CollectorPluginType || resp.Type == plugin.StreamCollectorPluginType {
cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version)

if lPlugin.ConfigPolicy != nil {
Expand Down Expand Up @@ -543,9 +543,11 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
metricTypes, err := colClient.GetMetricTypes(cfg)
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"plugin-type": "collector",
"error": err.Error(),
"_block": "load-plugin",
"plugin-type": resp.Type.String(),
"error": err.Error(),
"plugin-name": ap.Name(),
"plugin-version": ap.Version(),
}).Error("error in getting metric types")
resultChan <- result{nil, serror.New(err)}
return
Expand Down Expand Up @@ -714,7 +716,7 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap
p.loadedPlugins.remove(plugin.Key())

// Remove any metrics from the catalog if this was a collector
if plugin.TypeName() == "collector" {
if plugin.TypeName() == core.CollectorPluginType.String() || plugin.TypeName() == core.StreamingCollectorPluginType.String() {
p.metricCatalog.RmUnloadedPluginMetrics(plugin)
}

Expand Down
30 changes: 11 additions & 19 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,27 +422,19 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID
return errors.New("pool not found")
}
if pool.SubscriptionCount() < pool.Count() {
lp, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion))
if lp != nil && lp.Details.Uri != nil {
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
"pool-count": pool.Count(),
"pool-subscription-count": pool.SubscriptionCount(),
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
"error": err.Error(),
}).Error("unable to get loaded plugin")
}
_, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion))
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
"plugin-uri": lp.Details.Uri,
}).Debug(fmt.Sprintf("unsubscribe called on standalone plugin"))
pool.SelectAndStop(taskID, "remote unsubscription event")
} else {
pool.SelectAndKill(taskID, "unsubscription event")
"_block": "handle-unsubscription",
"pool-count": pool.Count(),
"pool-subscription-count": pool.SubscriptionCount(),
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
"error": err.Error(),
}).Error("unable to get loaded plugin")
}
pool.SelectAndKill(taskID, "unsubscription event")
}
return nil
}
Expand Down
24 changes: 1 addition & 23 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type Pool interface {
Plugins() MapAvailablePlugin
RLock()
RUnlock()
SelectAndStop(taskID, reason string)
SelectAndKill(taskID, reason string)
SelectAP(taskID string, configID map[string]ctypes.ConfigValue) (AvailablePlugin, serror.SnapError)
Strategy() RoutingAndCaching
Expand Down Expand Up @@ -181,7 +180,7 @@ func (p *pool) IncRestartCount() {

// Insert inserts an AvailablePlugin into the pool
func (p *pool) Insert(a AvailablePlugin) error {
if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType {
if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType && a.Type() != plugin.StreamCollectorPluginType {
return ErrBadType
}
// If an empty pool is created, it does not have
Expand Down Expand Up @@ -306,27 +305,6 @@ func (p *pool) KillAll(reason string) {
}
}

// SelectAndStop selects, stops and removes the available plugin from the pool
func (p *pool) SelectAndStop(id, reason string) {
rp, err := p.Remove(p.plugins.Values(), id)
if err != nil {
log.WithFields(log.Fields{
"_block": "SelectAndStop",
"taskID": id,
"reason": reason,
}).Error(err)
return
}
if err := rp.Stop(reason); err != nil {
log.WithFields(log.Fields{
"_block": "SelectAndStop",
"taskID": id,
"reason": reason,
}).Error(err)
}
p.remove(rp.ID())
}

// SelectAndKill selects, kills and removes the available plugin from the pool
func (p *pool) SelectAndKill(id, reason string) {
rp, err := p.Remove(p.plugins.Values(), id)
Expand Down
14 changes: 12 additions & 2 deletions control/subscription_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ManagesSubscriptionGroups interface {
Remove(id string) []serror.SnapError
ValidateDeps(requested []core.RequestedMetric,
plugins []core.SubscribedPlugin,
configTree *cdata.ConfigDataTree) (serrs []serror.SnapError)
configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError)
validateMetric(metric core.Metric) (serrs []serror.SnapError)
}

Expand Down Expand Up @@ -203,14 +203,24 @@ func (s *subscriptionGroups) Process() (errs []serror.SnapError) {

func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric,
plugins []core.SubscribedPlugin,
configTree *cdata.ConfigDataTree) (serrs []serror.SnapError) {
configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError) {

// resolve requested metrics and map to collectors
pluginToMetricMap, collectors, errs := s.getMetricsAndCollectors(requested, configTree)
if errs != nil {
serrs = append(serrs, errs...)
}

// Validate if schedule type is streaming and we have a non-streaming plugin or vice versa
for _, assert := range asserts {
if serr := assert(collectors); serr != nil {
serrs = append(serrs, serr)
}
}
if len(serrs) > 0 {
return serrs
}

// validateMetricsTypes
for _, pmt := range pluginToMetricMap {
for _, mt := range pmt.Metrics() {
Expand Down
13 changes: 10 additions & 3 deletions core/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/asaskevich/govalidator"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/pkg/fileutils"
)

Expand All @@ -48,9 +49,10 @@ type PluginType int

func ToPluginType(name string) (PluginType, error) {
pts := map[string]PluginType{
"collector": 0,
"processor": 1,
"publisher": 2,
"collector": 0,
"processor": 1,
"publisher": 2,
"streaming-collector": 3,
}
t, ok := pts[name]
if !ok {
Expand All @@ -64,6 +66,7 @@ func CheckPluginType(id PluginType) bool {
0: "collector",
1: "processor",
2: "publisher",
3: "streaming-collector",
}

_, ok := pts[id]
Expand All @@ -86,6 +89,7 @@ func (pt PluginType) String() string {
"collector",
"processor",
"publisher",
"streaming-collector",
}[pt]
}

Expand All @@ -94,6 +98,7 @@ const (
CollectorPluginType PluginType = iota
ProcessorPluginType
PublisherPluginType
StreamingCollectorPluginType
)

type AvailablePlugin interface {
Expand Down Expand Up @@ -125,6 +130,8 @@ type SubscribedPlugin interface {
Config() *cdata.ConfigDataNode
}

type SubscribedPluginAssert func(plugins []SubscribedPlugin) serror.SnapError

type RequestedPlugin struct {
path string
checkSum [sha256.Size]byte
Expand Down
Loading

0 comments on commit 0050af4

Please sign in to comment.