Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Communicate taskID to streaming plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
rashmigottipati committed Jul 31, 2017
1 parent f9aa843 commit 2c0478a
Show file tree
Hide file tree
Showing 13 changed files with 685 additions and 179 deletions.
2 changes: 1 addition & 1 deletion control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (ap *availablePlugins) streamMetrics(
return nil, nil, serror.New(errors.New("Invalid streaming client"))
}

metricChan, errChan, err := cli.StreamMetrics(metricTypes)
metricChan, errChan, err := cli.StreamMetrics(taskID, metricTypes)
if err != nil {
return nil, nil, serror.New(err)
}
Expand Down
2 changes: 1 addition & 1 deletion control/control_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestGRPCServerScheduler(t *testing.T) {
}

<-lpe.done
conn, err := rpcutil.GetClientConnection(c.Config.ListenAddr, c.Config.ListenPort)
conn, err := rpcutil.GetClientConnection(context.Background(), c.Config.ListenAddr, c.Config.ListenPort)

Convey("Creating an rpc connection", t, func() {
Convey("Should not error", func() {
Expand Down
2 changes: 1 addition & 1 deletion control/control_security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestSecureStreamingCollector(t *testing.T) {
mtsin := []core.Metric{}
m := plugin.MetricType{Namespace_: core.NewNamespace(strings.Fields("a b integer")...)}
mtsin = append(mtsin, m)
mch, errch, err := cli.StreamMetrics(mtsin)
mch, errch, err := cli.StreamMetrics("test-taskID", mtsin)
So(err, ShouldBeNil)
Convey("streaming should deliver metrics rather than error", func() {
select {
Expand Down
2 changes: 1 addition & 1 deletion control/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type PluginCollectorClient interface {

type PluginStreamCollectorClient interface {
PluginClient
StreamMetrics([]core.Metric) (chan []core.Metric, chan error, error)
StreamMetrics(string, []core.Metric) (chan []core.Metric, chan error, error)
GetMetricTypes(plugin.ConfigType) ([]core.Metric, error)
UpdateCollectedMetrics([]core.Metric) error
UpdatePluginConfig([]byte) error
Expand Down
32 changes: 22 additions & 10 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/pkg/rpcutil"
"google.golang.org/grpc/metadata"
)

// SecureSide identifies security mode to apply in securing gRPC
Expand All @@ -69,6 +70,7 @@ type grpcClient struct {
processor rpc.ProcessorClient
publisher rpc.PublisherClient
plugin pluginClient
context context.Context

// Channel used to signal death of stream collector to scheduler
killChan chan struct{}
Expand Down Expand Up @@ -123,7 +125,8 @@ func SecurityTLSOff() GRPCSecurity {

// NewCollectorGrpcClient returns a collector gRPC Client.
func NewCollectorGrpcClient(address string, timeout time.Duration, security GRPCSecurity) (PluginCollectorClient, error) {
p, err := newPluginGrpcClient(address, timeout, security, plugin.CollectorPluginType)
ctx := context.Background()
p, err := newPluginGrpcClient(ctx, address, timeout, security, plugin.CollectorPluginType)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +135,8 @@ func NewCollectorGrpcClient(address string, timeout time.Duration, security GRPC

// NewStreamCollectorGrpcClient returns a stream collector gRPC client
func NewStreamCollectorGrpcClient(address string, timeout time.Duration, security GRPCSecurity) (PluginStreamCollectorClient, error) {
p, err := newPluginGrpcClient(address, timeout, security, plugin.StreamCollectorPluginType)
ctx := context.Background()
p, err := newPluginGrpcClient(ctx, address, timeout, security, plugin.StreamCollectorPluginType)
if err != nil {
return nil, err
}
Expand All @@ -141,7 +145,8 @@ func NewStreamCollectorGrpcClient(address string, timeout time.Duration, securit

// NewProcessorGrpcClient returns a processor gRPC Client.
func NewProcessorGrpcClient(address string, timeout time.Duration, security GRPCSecurity) (PluginProcessorClient, error) {
p, err := newPluginGrpcClient(address, timeout, security, plugin.ProcessorPluginType)
ctx := context.Background()
p, err := newPluginGrpcClient(ctx, address, timeout, security, plugin.ProcessorPluginType)
if err != nil {
return nil, err
}
Expand All @@ -150,7 +155,8 @@ func NewProcessorGrpcClient(address string, timeout time.Duration, security GRPC

// NewPublisherGrpcClient returns a publisher gRPC Client.
func NewPublisherGrpcClient(address string, timeout time.Duration, security GRPCSecurity) (PluginPublisherClient, error) {
p, err := newPluginGrpcClient(address, timeout, security, plugin.PublisherPluginType)
ctx := context.Background()
p, err := newPluginGrpcClient(ctx, address, timeout, security, plugin.PublisherPluginType)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -249,7 +255,7 @@ func buildCredentials(security GRPCSecurity) (creds credentials.TransportCredent
}

// newPluginGrpcClient returns a configured gRPC Client.
func newPluginGrpcClient(address string, timeout time.Duration, security GRPCSecurity, typ plugin.PluginType) (interface{}, error) {
func newPluginGrpcClient(ctx context.Context, address string, timeout time.Duration, security GRPCSecurity, typ plugin.PluginType) (interface{}, error) {
address, port, err := parseAddress(address)
if err != nil {
return nil, err
Expand All @@ -259,7 +265,7 @@ func newPluginGrpcClient(address string, timeout time.Duration, security GRPCSec
if creds, err = buildCredentials(security); err != nil {
return nil, err
}
p, err = newGrpcClient(address, int(port), timeout, typ, creds)
p, err = newGrpcClient(ctx, address, int(port), timeout, typ, creds)
if err != nil {
return nil, err
}
Expand All @@ -279,15 +285,16 @@ func parseAddress(address string) (string, int64, error) {
return address, port, nil
}

func newGrpcClient(addr string, port int, timeout time.Duration, typ plugin.PluginType, creds credentials.TransportCredentials) (*grpcClient, error) {
func newGrpcClient(ctx context.Context, addr string, port int, timeout time.Duration, typ plugin.PluginType, creds credentials.TransportCredentials) (*grpcClient, error) {
var conn *grpc.ClientConn
var err error
if conn, err = rpcutil.GetClientConnectionWithCreds(addr, port, creds); err != nil {
if conn, err = rpcutil.GetClientConnectionWithCreds(ctx, addr, port, creds); err != nil {
return nil, err
}
p := &grpcClient{
timeout: timeout,
conn: conn,
context: ctx,
}

switch typ {
Expand Down Expand Up @@ -455,7 +462,7 @@ func (g *grpcClient) UpdateMetricsBuffer(maxMetricsBuffer int64) error {
return nil
}

func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan error, error) {
func (g *grpcClient) StreamMetrics(taskID string, mts []core.Metric) (chan []core.Metric, chan error, error) {
arg := &rpc.CollectArg{
Metrics_Arg: &rpc.MetricsArg{Metrics: NewMetrics(mts)},
}
Expand Down Expand Up @@ -514,7 +521,12 @@ func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan
}
}

s, err := g.streamCollector.StreamMetrics(context.Background())
header := metadata.New(map[string]string{
"task-id": taskID,
})
ctx := metadata.NewContext(g.context, header)

s, err := g.streamCollector.StreamMetrics(ctx)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 2c0478a

Please sign in to comment.