Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds streaming capability to Snap
Browse files Browse the repository at this point in the history
Adds Streaming capability to Snap by utilizing grpc stream. Creates a
new rpc type for the grpc stream, and allows for plugins of this type
(streamcollectors) to send metrics to snap on a plugin initiated basis
instead of on Snap's collection interval.

Adds StreamMetrics test and rand test collector

Update glide lock/plugin dependency
  • Loading branch information
croseborough committed Mar 3, 2017
1 parent b5ec78f commit a7d4bc1
Show file tree
Hide file tree
Showing 24 changed files with 873 additions and 47 deletions.
69 changes: 69 additions & 0 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.STREAMGRPC:
c, e := client.NewStreamCollectorGrpcClient(
resp.ListenAddress,
DefaultClientTimeout,
resp.PublicKey,
!resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
default:
return nil, errors.New("Invalid RPCTYPE")
}
Expand Down Expand Up @@ -242,6 +252,12 @@ func (a *availablePlugin) Kill(r string) error {
}).Debug("deleting available plugin package")
os.RemoveAll(filepath.Dir(a.execPath))
}
// If it's a stremaing plugin, we need to signal the scheduler that
// this plugin is being killed.
if c, ok := a.client.(client.PluginStreamCollectorClient); ok {
c.Killed()
}

return a.ePlugin.Kill()
}

Expand Down Expand Up @@ -440,6 +456,59 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
return results, nil
}

func (ap *availablePlugins) streamMetrics(
pluginKey string,
metricTypes []core.Metric,
taskID string,
maxCollectDuration time.Duration,
maxMetricsBuffer int64) (chan []core.Metric, chan error, error) {

pool, serr := ap.getPool(pluginKey)
if serr != nil {
return nil, nil, serr
}
if pool == nil {
return nil, nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}

if pool.Strategy() == nil {
return nil, nil, errors.New("Plugin strategy not set")
}

config := metricTypes[0].Config()
cfg := map[string]ctypes.ConfigValue{}
if config != nil {
cfg = config.Table()
}

pool.RLock()
defer pool.RUnlock()
p, serr := pool.SelectAP(taskID, cfg)
if serr != nil {
return nil, nil, serr
}

cli, ok := p.(*availablePlugin).client.(client.PluginStreamCollectorClient)
if !ok {
return nil, nil, serror.New(errors.New("Invalid streaming client"))
}

metricChan, errChan, err := cli.StreamMetrics(metricTypes)
if err != nil {
return nil, nil, serror.New(err)
}
err = cli.UpdateCollectDuration(maxCollectDuration)
if err != nil {
return nil, nil, serror.New(err)
}
err = cli.UpdateMetricsBuffer(maxMetricsBuffer)
if err != nil {
return nil, nil, serror.New(err)
}

return metricChan, errChan, nil
}

func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator)
pool, serr := ap.getPool(key)
Expand Down
48 changes: 48 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,54 @@ func (p *pluginControl) CollectMetrics(id string, allTags map[string]map[string]
return
}

func (p *pluginControl) StreamMetrics(
id string,
allTags map[string]map[string]string,
maxCollectDuration time.Duration,
maxMetricsBuffer int64) (chan []core.Metric, chan error, []error) {
if !p.Started {
return nil, nil, []error{ErrControllerNotStarted}
}
errs := make([]error, 0)
pluginToMetricMap, serrs, err := p.subscriptionGroups.Get(id)
if err != nil {
controlLogger.WithFields(log.Fields{
"_block": "StreamMetrics",
"subscription-group-id": id,
}).Error(err)
errs = append(errs, err)
return nil, nil, errs
}

if serrs != nil {
for _, e := range serrs {
errs = append(errs, e)
}
}
if len(pluginToMetricMap) > 1 {
return nil, nil, append(errs, errors.New("Only 1 streaming collecting plugin per task"))
}
var metricChan chan []core.Metric
var errChan chan error
for pluginKey, pmt := range pluginToMetricMap {
for _, mt := range pmt.metricTypes {
if mt.Config() != nil {
mt.Config().ReverseMergeInPlace(
p.Config.Plugins.getPluginConfigDataNode(
core.CollectorPluginType,
pmt.plugin.Name(),
pmt.plugin.Version()))
}
}
metricChan, errChan, err = p.pluginRunner.AvailablePlugins().streamMetrics(pluginKey, pmt.metricTypes, id, maxCollectDuration, maxMetricsBuffer)
if err != nil {
errs = append(errs, err)
return nil, nil, errs
}
}
return metricChan, errChan, nil
}

// PublishMetrics
func (p *pluginControl) PublishMetrics(metrics []core.Metric, config map[string]ctypes.ConfigValue, taskID, pluginName string, pluginVersion int) []error {
// If control is not started we don't want tasks to be able to
Expand Down
85 changes: 85 additions & 0 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,91 @@ func TestFailedPlugin(t *testing.T) {
})
}

func TestStreamMetrics(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// adjust HB timeouts for test
plugin.PingTimeoutLimit = 1
plugin.PingTimeoutDurationDefault = time.Second * 1

// Create controller
config := getTestConfig()
c := New(config)
c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100
c.Start()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)

// Load plugin
_, e := load(c, fixtures.PluginPathStreamRand1)
So(e, ShouldBeNil)
<-lpe.done
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 3)

cd := cdata.NewNode()
cd.AddItem("testint", ctypes.ConfigValueInt{Value: 3})
cd.AddItem("testfloat", ctypes.ConfigValueFloat{Value: 0.14})
cd.AddItem("teststring", ctypes.ConfigValueStr{Value: "pi"})
m1 := fixtures.MockMetricType{
Namespace_: core.NewNamespace("random", "integer"),
Cfg: cd,
}
m2 := fixtures.MockMetricType{
Namespace_: core.NewNamespace("random", "float"),
Cfg: cd,
}
m3 := fixtures.MockMetricType{
Namespace_: core.NewNamespace("random", "string"),
Cfg: cd,
}

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)

r := []core.RequestedMetric{}
for _, m := range []fixtures.MockMetricType{m1, m2, m3} {
r = append(r, m)
}

cdt := cdata.NewTree()
cdt.Add([]string{"random"}, cd)
taskHit := "hitting"

Convey("create a pool, add subscriptions and start plugins", func() {
serrs := c.SubscribeDeps(taskHit, r, []core.SubscribedPlugin{subscribedPlugin{typeName: "collector", name: "test-rand-streamer", version: 1}}, cdt)
So(serrs, ShouldBeNil)

pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)

Convey("stream metrics", func() {

metrics, errors, err := c.StreamMetrics(taskHit, nil, time.Second, 0)
So(err, ShouldBeNil)
select {
case mts := <-metrics:
So(mts, ShouldNotBeNil)
So(len(mts), ShouldEqual, 3)
case errs := <-errors:
t.Fatal(errs)
case <-time.After(time.Second * 10):
t.Fatal("Failed to get a response from stream metrics")
}

ap := c.AvailablePlugins()
So(ap, ShouldNotBeEmpty)
So(pool.Strategy(), ShouldNotBeNil)
So(pool.Strategy().String(), ShouldEqual, plugin.DefaultRouting.String())
c.Stop()
})
})
})
}

func TestCollectMetrics(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// adjust HB timeouts for test
Expand Down
3 changes: 3 additions & 0 deletions control/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (

PluginNameMock2 = "snap-plugin-collector-mock2"
PluginPathMock2 = helper.PluginFilePath(PluginNameMock2)

PluginNameStreamRand1 = "snap-plugin-stream-collector-rand1"
PluginPathStreamRand1 = helper.PluginFilePath(PluginNameStreamRand1)
)

// mocks a metric type
Expand Down
4 changes: 2 additions & 2 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (g *grpcClient) Kill(reason string) error {

_, err := g.plugin.Kill(getContext(g.timeout), &rpc.KillArg{Reason: reason})
g.conn.Close()
g.Killed()
if err != nil {
return err
}
Expand Down Expand Up @@ -380,8 +381,7 @@ func (g *grpcClient) handleInStream(
metricChan chan []core.Metric,
errChan chan error) {
go func() {
done := false
for !done {
for {
in, err := g.stream.Recv()
if err != nil {
errChan <- err
Expand Down
7 changes: 5 additions & 2 deletions control/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
CollectorPluginType PluginType = iota
ProcessorPluginType
PublisherPluginType
StreamCollectorPluginType
)

type RoutingStrategyType int
Expand Down Expand Up @@ -73,8 +74,9 @@ type RPCType int

const (
// IMPORTANT: keep consistency across snap-plugin-lib, GRPC must be equal 2
NativeRPC RPCType = 0
GRPC RPCType = 2
NativeRPC RPCType = 0
GRPC RPCType = 2
STREAMGRPC RPCType = 3
)

var (
Expand All @@ -88,6 +90,7 @@ var (
"collector",
"processor",
"publisher",
"streamCollector",
}

routingStrategyTypes = [...]string{
Expand Down
11 changes: 8 additions & 3 deletions control/plugin/rpc/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ service StreamCollector {
message CollectArg{
// Request these metrics to be collected on the plugins schedule
MetricsArg Metrics_Arg = 1;
// Set minimum collection duration --duration in ns
// Set Maximum collection duration in ns. The events will be buffered
// until the max duration is reached and/or the maxMetric buffer amount is
// reached. 0 means the events will be sent immediately.
int64 MaxCollectDuration = 2;
// Set max number of metrics to buffer before forcing send
// 0 means no forced send
// Set max number of metrics to buffer before forcing sending. Events
// are sent as soon as MaxMetricsBuffer is reached or MaxCollectDuration
// is exceeded, whichever happens first. If MaxMetricsBuffer is 0 metrics
// will be sent immediately. If MaxCollectDuration is set to 0 then
// maxMetricsBuffer will not be used as metrics will be sent immediately.
int64 MaxMetricsBuffer = 3;
// Blob of domain specific info
bytes Other = 4;
Expand Down
2 changes: 2 additions & 0 deletions core/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func makeSchedule(s Schedule) (schedule.Schedule, error) {
return nil, err
}
return sch, nil
case "streaming":
return schedule.NewStreamingSchedule(), nil
default:
return nil, errors.New("unknown schedule type " + s.Type)
}
Expand Down
Loading

0 comments on commit a7d4bc1

Please sign in to comment.