Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Updates scheduler managesMetrics interface
Browse files Browse the repository at this point in the history
Updates scheduler managesMetrics interface to use core.Metric all the
way through a workflow instead of switching to bytes for process and
publish. This removes the un-implemented contentType feature and
associated testing/partial implementations.
  • Loading branch information
IRCody committed Aug 17, 2016
1 parent a1e7df2 commit 168d842
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 791 deletions.
7 changes: 3 additions & 4 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,9 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
Convey("metrics are collected from mock1", func() {
for _, m := range mts1 {
if strings.Contains(m.Namespace().String(), "host") {
val, ok := m.Data().(int64)
// Because mock1 uses jsonrpc, all number typers are interpreted
// as float64
val, ok := m.Data().(float64)
So(ok, ShouldEqual, true)
So(val, ShouldBeLessThan, 100)
} else {
Expand All @@ -1738,9 +1740,6 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
}
}
})
// ensure the data coming back is from v1. V1's data is type string
_, ok := mts1[0].Data().(string)
So(ok, ShouldEqual, true)
Convey("Loading higher plugin version with less metrics", func() {
// Load version snap-plugin-collector-mock2
_, err := load(c, path.Join(fixtures.SnapPath, "plugin", "snap-plugin-collector-mock2"))
Expand Down
4 changes: 2 additions & 2 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ func ParseConfig(config *rpc.ConfigMap) map[string]ctypes.ConfigValue {
return c
}

func ToTime(t time.Time) *Time {
return &Time{
func ToTime(t time.Time) *rpc.Time {
return &rpc.Time{
Nsec: t.Unix(),
Sec: int64(t.Second()),
}
Expand Down
149 changes: 78 additions & 71 deletions grpc/common/common.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion grpc/controlproxy/controlproxy_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type mockClient struct {
ValidateReply *rpc.ValidateDepsReply
SubscribeReply *rpc.SubscribeDepsReply
UnsubscribeReply *rpc.UnsubscribeDepsReply
MatchReply *rpc.ExpandWildcardsReply
AutoDiscoReply *rpc.GetAutodiscoverPathsReply
}

Expand Down
354 changes: 67 additions & 287 deletions grpc/controlproxy/rpc/control.pb.go

Large diffs are not rendered by default.

253 changes: 42 additions & 211 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ limitations under the License.
package scheduler

import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/ctypes"
Expand Down Expand Up @@ -91,6 +87,7 @@ type job interface {
TypeString() string
TaskID() string
Run()
Metrics() []core.Metric
}

type jobType int
Expand Down Expand Up @@ -218,6 +215,10 @@ func (m *metric) Tags() map[string]string { return nil }
func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) }
func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) }

func (c *collectorJob) Metrics() []core.Metric {
return c.metrics
}

func (c *collectorJob) Run() {
log.WithFields(log.Fields{
"_module": "scheduler-job",
Expand Down Expand Up @@ -264,22 +265,23 @@ func (c *collectorJob) Run() {

type processJob struct {
*coreJob
processor processesMetrics
parentJob job
metrics []core.Metric
config map[string]ctypes.ConfigValue
contentType string
content []byte
processor processesMetrics
parentJob job
metrics []core.Metric
config map[string]ctypes.ConfigValue
}

func (pr *processJob) Metrics() []core.Metric {
return pr.metrics
}

func newProcessJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, processor processesMetrics, taskID string) job {
return &processJob{
parentJob: parentJob,
metrics: []core.Metric{},
coreJob: newCoreJob(processJobType, parentJob.Deadline(), taskID, pluginName, pluginVersion),
config: config,
processor: processor,
contentType: contentType,
parentJob: parentJob,
metrics: []core.Metric{},
coreJob: newCoreJob(processJobType, parentJob.Deadline(), taskID, pluginName, pluginVersion),
config: config,
processor: processor,
}
}

Expand All @@ -288,142 +290,46 @@ func (p *processJob) Run() {
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Debug("starting processor job")

var buf bytes.Buffer
enc := gob.NewEncoder(&buf)

switch pt := p.parentJob.(type) {
case *collectorJob:
switch p.contentType {
case plugin.SnapGOBContentType:
metrics := make([]plugin.MetricType, len(pt.metrics))
for i, m := range pt.metrics {
if mt, ok := m.(plugin.MetricType); ok {
metrics[i] = mt
} else {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": m,
}).Error("unsupported metric type")
p.AddErrors(fmt.Errorf("unsupported metric type. {%v}", m))
}
}
err := enc.Encode(metrics)
if err != nil {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"plugin-name": p.name,
"plugin-version": p.version,
"error": err,
}).Error("encoding error")
}
_, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with processor job")
}
p.AddErrors(errs...)
}
p.content = content
default:
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Error("unsupported content type")
p.AddErrors(fmt.Errorf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}

case *processJob:
// TODO: Remove switch statement and rely on processor to catch errors in type
// (separation of concerns; remove content-type definition from the framework?)
switch p.contentType {
case plugin.SnapGOBContentType:
_, content, errs := p.processor.ProcessMetrics(p.contentType, pt.content, p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with processor job")
}
p.AddErrors(errs...)
}
p.content = content
default:
mts, errs := p.processor.ProcessMetrics(p.parentJob.Metrics(), p.config, p.taskID, p.name, p.version)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Error("unsupported content type")
p.AddErrors(fmt.Errorf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
"error": e.Error(),
}).Error("error with processor job")
}
default:
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"parent-job-type": p.parentJob.Type(),
}).Error("unsupported parent job type")
p.AddErrors(fmt.Errorf("unsupported parent job type {%v}", p.parentJob.Type()))
p.AddErrors(errs...)
}
p.metrics = mts
}

type publisherJob struct {
*coreJob
parentJob job
publisher publishesMetrics
config map[string]ctypes.ConfigValue
contentType string
parentJob job
publisher publishesMetrics
config map[string]ctypes.ConfigValue
}

func (pu *publisherJob) Metrics() []core.Metric {
return []core.Metric{}
}

func newPublishJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, publisher publishesMetrics, taskID string) job {
return &publisherJob{
parentJob: parentJob,
publisher: publisher,
coreJob: newCoreJob(publishJobType, parentJob.Deadline(), taskID, pluginName, pluginVersion),
config: config,
contentType: contentType,
parentJob: parentJob,
publisher: publisher,
coreJob: newCoreJob(publishJobType, parentJob.Deadline(), taskID, pluginName, pluginVersion),
config: config,
}
}

Expand All @@ -432,99 +338,24 @@ func (p *publisherJob) Run() {
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Debug("starting publisher job")
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)

switch p.parentJob.Type() {
case collectJobType:
switch p.contentType {
case plugin.SnapGOBContentType:
metrics := make([]plugin.MetricType, len(p.parentJob.(*collectorJob).metrics))
for i, m := range p.parentJob.(*collectorJob).metrics {
switch mt := m.(type) {
case plugin.MetricType:
metrics[i] = mt
default:
panic(fmt.Sprintf("unsupported type %T", mt))
}
}
err := enc.Encode(metrics)
if err != nil {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"plugin-name": p.name,
"plugin-version": p.version,
"error": err,
}).Error("encoding error")
}
errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with publisher job")
}
p.AddErrors(errs...)
}
default:
errs := p.publisher.PublishMetrics(p.parentJob.Metrics(), p.config, p.taskID, p.name, p.version)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Fatal("unsupported content type")
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
case processJobType:
// TODO: Remove switch statement and rely on publisher to catch errors in type
// (separation of concerns; remove content-type definition from the framework?)
switch p.contentType {
case plugin.SnapGOBContentType:
errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with publisher job")
}
p.AddErrors(errs...)
}
"error": e.Error(),
}).Error("error with publisher job")
}
default:
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"parent-job-type": p.parentJob.Type(),
}).Fatal("unsupported parent job type")
panic("unsupported job type")
p.AddErrors(errs...)
}
}
Loading

0 comments on commit 168d842

Please sign in to comment.