Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Updates control package to use metrics throughout
Browse files Browse the repository at this point in the history
Updates control package to use metrics throughout the workflow. This new
function prototype had to be changed in a few places.

Removes josnrc process/publish client creation since it is only able to
be used for collection.

Adds invalid rpctype error if unable to match the incoming rpctype.
  • Loading branch information
IRCody committed Aug 17, 2016
1 parent 4e6b19d commit d971fba
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 170 deletions.
39 changes: 16 additions & 23 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,11 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
//TODO(CDR): Add default cases to these switches to catch invalid rpctype?
default:
return nil, errors.New("Invalid RPCTYPE")
}
case plugin.PublisherPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewPublisherHttpJSONRPCClient(listenURL, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewPublisherNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
Expand All @@ -141,15 +136,11 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
default:
return nil, errors.New("Invalid RPCTYPE")
}
case plugin.ProcessorPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewProcessorHttpJSONRPCClient(listenURL, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewProcessorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
Expand All @@ -162,6 +153,8 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
default:
return nil, errors.New("Invalid RPCTYPE")
}
default:
return nil, errors.New("Cannot create a client for a plugin of the type: " + resp.Type.String())
Expand Down Expand Up @@ -446,7 +439,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
return results, nil
}

func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
var errs []error
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
Expand All @@ -472,7 +465,7 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p
return []error{errors.New("unable to cast client to PluginPublisherClient")}
}

errp := cli.Publish(contentType, content, config)
errp := cli.Publish(metrics, config)
if errp != nil {
return []error{errp}
}
Expand All @@ -481,38 +474,38 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p
return nil
}

func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) {
func (ap *availablePlugins) processMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) ([]core.Metric, []error) {
var errs []error
key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
return "", nil, errs
return nil, errs
}
if pool == nil {
return "", nil, []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})}
return nil, []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})}
}

pool.RLock()
defer pool.RUnlock()
p, err := pool.SelectAP(taskID, config)
if err != nil {
errs = append(errs, err)
return "", nil, errs
return nil, errs
}

cli, ok := p.(*availablePlugin).client.(client.PluginProcessorClient)
if !ok {
return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")}
return nil, []error{errors.New("unable to cast client to PluginProcessorClient")}
}

ct, c, errp := cli.Process(contentType, content, config)
mts, errp := cli.Process(metrics, config)
if errp != nil {
return "", nil, []error{errp}
return nil, []error{errp}
}
p.(*availablePlugin).hitCount++
p.(*availablePlugin).lastHitTime = time.Now()
return ct, c, nil
return mts, nil
}

func (ap *availablePlugins) findLatestPool(pType, name string) (strategy.Pool, serror.SnapError) {
Expand Down
22 changes: 5 additions & 17 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func (p *pluginControl) CollectMetrics(id string, allTags map[string]map[string]
}

// PublishMetrics
func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
func (p *pluginControl) PublishMetrics(metrics []core.Metric, config map[string]ctypes.ConfigValue, taskID, pluginName string, pluginVersion int) []error {
// If control is not started we don't want tasks to be able to
// go through a workflow.
if !p.Started {
Expand All @@ -990,15 +990,15 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi
merged[k] = v
}

return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, merged, taskID)
return p.pluginRunner.AvailablePlugins().publishMetrics(metrics, pluginName, pluginVersion, merged, taskID)
}

// ProcessMetrics
func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) {
func (p *pluginControl) ProcessMetrics(metrics []core.Metric, config map[string]ctypes.ConfigValue, taskID, pluginName string, pluginVersion int) ([]core.Metric, []error) {
// If control is not started we don't want tasks to be able to
// go through a workflow.
if !p.Started {
return "", nil, []error{ErrControllerNotStarted}
return nil, []error{ErrControllerNotStarted}
}
// merge global plugin config into the config for this request
// without over-writing the task specific config
Expand All @@ -1011,19 +1011,7 @@ func (p *pluginControl) ProcessMetrics(contentType string, content []byte, plugi
merged[k] = v
}

return p.pluginRunner.AvailablePlugins().processMetrics(contentType, content, pluginName, pluginVersion, merged, taskID)
}

// GetPluginContentTypes returns accepted and returned content types for the
// loaded plugin matching the provided name, type and version.
// If the version provided is 0 or less the newest plugin by version will be
// returned.
func (p *pluginControl) GetPluginContentTypes(n string, t core.PluginType, v int) ([]string, []string, error) {
lp, err := p.pluginManager.get(fmt.Sprintf("%s:%s:%d", t.String(), n, v))
if err != nil {
return nil, nil, err
}
return lp.Meta.AcceptedContentTypes, lp.Meta.ReturnedContentTypes, nil
return p.pluginRunner.AvailablePlugins().processMetrics(metrics, pluginName, pluginVersion, merged, taskID)
}

func (p *pluginControl) SetAutodiscoverPaths(paths []string) {
Expand Down
44 changes: 17 additions & 27 deletions control/control_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,28 @@ type ControlGRPCServer struct {
}

// --------- Scheduler's managesMetrics implementation ----------

func (pc *ControlGRPCServer) GetPluginContentTypes(ctx context.Context, r *rpc.GetPluginContentTypesRequest) (*rpc.GetPluginContentTypesReply, error) {
accepted, returned, err := pc.control.GetPluginContentTypes(r.Name, core.PluginType(int(r.PluginType)), int(r.Version))
reply := &rpc.GetPluginContentTypesReply{
AcceptedTypes: accepted,
ReturnedTypes: returned,
}
if err != nil {
reply.Error = err.Error()
}
return reply, nil
}

func (pc *ControlGRPCServer) PublishMetrics(ctx context.Context, r *rpc.PubProcMetricsRequest) (*rpc.ErrorReply, error) {
errs := pc.control.PublishMetrics(r.ContentType, r.Content, r.PluginName, int(r.PluginVersion), common.ParseConfig(r.Config), r.TaskId)
erro := make([]string, len(errs))
for i, v := range errs {
erro[i] = v.Error()
}
return &rpc.ErrorReply{Errors: erro}, nil
metrics := common.ToCoreMetrics(r.Metrics)
errs := pc.control.PublishMetrics(
metrics,
common.ParseConfig(r.Config),
r.TaskId, r.PluginName,
int(r.PluginVersion))

return &rpc.ErrorReply{Errors: errorsToStrings(errs)}, nil
}

func (pc *ControlGRPCServer) ProcessMetrics(ctx context.Context, r *rpc.PubProcMetricsRequest) (*rpc.ProcessMetricsReply, error) {
contentType, content, errs := pc.control.ProcessMetrics(r.ContentType, r.Content, r.PluginName, int(r.PluginVersion), common.ParseConfig(r.Config), r.TaskId)
erro := make([]string, len(errs))
for i, v := range errs {
erro[i] = v.Error()
}
metrics := common.ToCoreMetrics(r.Metrics)
mts, errs := pc.control.ProcessMetrics(
metrics,
common.ParseConfig(r.Config),
r.TaskId, r.PluginName,
int(r.PluginVersion))

reply := &rpc.ProcessMetricsReply{
ContentType: contentType,
Content: content,
Errors: erro,
Metrics: common.NewMetrics(mts),
Errors: errorsToStrings(errs),
}
return reply, nil
}
Expand Down
86 changes: 30 additions & 56 deletions control/control_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ limitations under the License.
package control

import (
"bytes"
"encoding/gob"
"net"
"path"
"testing"
Expand All @@ -32,13 +30,12 @@ import (
"golang.org/x/net/context"

log "github.com/Sirupsen/logrus"

"github.com/intelsdi-x/snap/control/fixtures"
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/grpc/common"
"github.com/intelsdi-x/snap/grpc/controlproxy"
"github.com/intelsdi-x/snap/grpc/controlproxy/rpc"
"github.com/intelsdi-x/snap/pkg/rpcutil"
. "github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -150,33 +147,6 @@ func TestGRPCServerScheduler(t *testing.T) {
So(client, ShouldNotBeNil)
})
})
//GetContentTypes
Convey("Getting Content Types", t, func() {
Convey("Should err if invalid plugin given", func() {
req := &rpc.GetPluginContentTypesRequest{
Name: "bogus",
PluginType: int32(0),
Version: int32(0),
}
reply, err := client.GetPluginContentTypes(context.Background(), req)
// We don't expect rpc errors
So(err, ShouldBeNil)
So(reply.Error, ShouldNotEqual, "")
So(reply.Error, ShouldResemble, "plugin not found")
})
Convey("Should return content types with valid plugin", func() {
req := &rpc.GetPluginContentTypesRequest{
Name: "mock",
PluginType: int32(0),
Version: 0,
}
reply, err := client.GetPluginContentTypes(context.Background(), req)
So(err, ShouldBeNil)
So(reply.Error, ShouldEqual, "")
So(reply.AcceptedTypes, ShouldContain, "snap.gob")
So(reply.ReturnedTypes, ShouldContain, "snap.gob")
})
})

// Verify that validate deps is properly passing through errors
Convey("Validating Deps", t, func() {
Expand Down Expand Up @@ -264,8 +234,7 @@ func TestGRPCServerScheduler(t *testing.T) {
mts = common.ToCoreMetrics(reply.Metrics)
})
})
//our content to pass to publish
var content []byte

//process
Convey("ProcessMetrics", t, func() {
req := &rpc.SubscribeDepsRequest{
Expand All @@ -280,37 +249,32 @@ func TestGRPCServerScheduler(t *testing.T) {
_, err := client.SubscribeDeps(context.Background(), req)
So(err, ShouldBeNil)
Convey("should error with invalid inputs", func() {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
metrics := make([]plugin.MetricType, len(mts))
for i, m := range mts {
mt := plugin.NewMetricType(m.Namespace(), m.Timestamp(), m.Tags(), m.Unit(), m.Data())
metrics[i] = *mt
req := &rpc.PubProcMetricsRequest{
Metrics: common.NewMetrics([]core.Metric{fixtures.ValidMetric}),
PluginName: "passthru-invalid",
PluginVersion: 1,
TaskId: "my-snowflake-id",
Config: common.ToConfigMap(map[string]ctypes.ConfigValue{}),
}
enc.Encode(metrics)
req := controlproxy.GetPubProcReq("snap.gob", buf.Bytes(), "passthru-invalid", 1, map[string]ctypes.ConfigValue{}, "my-snowflake-id")
reply, err := client.ProcessMetrics(context.Background(), req)
// we don't expect rpc errors
So(err, ShouldBeNil)
So(len(reply.Errors), ShouldNotEqual, 0)
// content to pass to publisher
})
Convey("should not error with valid inputs", func() {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
metrics := make([]plugin.MetricType, len(mts))
for i, m := range mts {
mt := plugin.NewMetricType(m.Namespace(), m.Timestamp(), m.Tags(), m.Unit(), m.Data())
metrics[i] = *mt
req := &rpc.PubProcMetricsRequest{
Metrics: common.NewMetrics(mts),
PluginName: "passthru",
PluginVersion: 1,
TaskId: "my-snowflake-id",
Config: common.ToConfigMap(map[string]ctypes.ConfigValue{}),
}
enc.Encode(metrics)
req := controlproxy.GetPubProcReq("snap.gob", buf.Bytes(), "passthru", 1, map[string]ctypes.ConfigValue{}, "my-snowflake-id")
reply, err := client.ProcessMetrics(context.Background(), req)
// we don't expect rpc errors
So(err, ShouldBeNil)
So(len(reply.Errors), ShouldEqual, 0)
// content to pass to publisher
content = reply.Content

})
})
//publishmetrics
Expand All @@ -327,10 +291,14 @@ func TestGRPCServerScheduler(t *testing.T) {
_, err := client.SubscribeDeps(context.Background(), req)
So(err, ShouldBeNil)

Convey("should error with invalid inputs", func() {
config := make(map[string]ctypes.ConfigValue)
config["file"] = ctypes.ConfigValueStr{Value: "/tmp/grpcservertest.snap"}
req := controlproxy.GetPubProcReq("snap.gob", content, "mock-file-invalid", 3, config, "my-snowflake-id")
Convey("Should error with invalid inputs", func() {
req := &rpc.PubProcMetricsRequest{
Metrics: common.NewMetrics([]core.Metric{fixtures.ValidMetric}),
PluginName: "mock-file-invalid",
PluginVersion: 3,
TaskId: "my-snowflake-id",
Config: common.ToConfigMap(map[string]ctypes.ConfigValue{}),
}
reply, err := client.PublishMetrics(context.Background(), req)
// we don't expect rpc errors
So(err, ShouldBeNil)
Expand All @@ -341,7 +309,13 @@ func TestGRPCServerScheduler(t *testing.T) {
Convey("should not error with valid inputs", func() {
config := make(map[string]ctypes.ConfigValue)
config["file"] = ctypes.ConfigValueStr{Value: "/tmp/grpcservertest.snap"}
req := controlproxy.GetPubProcReq("snap.gob", content, "mock-file", 3, config, "my-snowflake-id")
req := &rpc.PubProcMetricsRequest{
Metrics: common.NewMetrics([]core.Metric{fixtures.ValidMetric}),
PluginName: "mock-file",
PluginVersion: 3,
TaskId: "my-snowflake-id",
Config: common.ToConfigMap(config),
}
reply, err := client.PublishMetrics(context.Background(), req)
// we don't expect rpc errors
So(err, ShouldBeNil)
Expand Down
Loading

0 comments on commit d971fba

Please sign in to comment.