Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes #1118 (#1122)
Browse files Browse the repository at this point in the history
Sets the current time as the advertised time in the returned metric when using grpc.
  • Loading branch information
jcooklin authored and pittma committed Aug 12, 2016
1 parent 906d19b commit 30aa88d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
4 changes: 4 additions & 0 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (g *grpcClient) GetMetricTypes(config plugin.ConfigType) ([]core.Metric, er
return nil, errors.New(reply.Error)
}

for _, metric := range reply.Metrics {
metric.LastAdvertisedTime = common.ToTime(time.Now())
}

results := common.ToCoreMetrics(reply.Metrics)
return results, nil
}
Expand Down
14 changes: 11 additions & 3 deletions grpc/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func ToMetric(co core.Metric) *Metric {
Nsec: int64(co.Timestamp().Nanosecond()),
},
LastAdvertisedTime: &Time{
Sec: co.LastAdvertisedTime().Unix(),
Nsec: int64(co.Timestamp().Nanosecond()),
Sec: time.Now().Unix(),
Nsec: int64(time.Now().Nanosecond()),
},
}
if co.Config() != nil {
Expand Down Expand Up @@ -127,12 +127,20 @@ func (m *metric) Unit() string { return m.unit }

// Convert common.Metric to core.Metric
func ToCoreMetric(mt *Metric) core.Metric {
var lastAdvertisedTime time.Time
// if the lastAdvertisedTime is not set we handle. -62135596800 represents the
// number of seconds from 0001-1970 and is the default value for time.Unix.
if mt.LastAdvertisedTime.Sec == int64(-62135596800) {
lastAdvertisedTime = time.Unix(time.Now().Unix(), int64(time.Now().Nanosecond()))
} else {
lastAdvertisedTime = time.Unix(mt.LastAdvertisedTime.Sec, mt.LastAdvertisedTime.Nsec)
}
ret := &metric{
namespace: ToCoreNamespace(mt.Namespace),
version: int(mt.Version),
tags: mt.Tags,
timeStamp: time.Unix(mt.Timestamp.Sec, mt.Timestamp.Nsec),
lastAdvertisedTime: time.Unix(mt.LastAdvertisedTime.Sec, mt.LastAdvertisedTime.Nsec),
lastAdvertisedTime: lastAdvertisedTime,
config: ConfigMapToConfig(mt.Config),
description: mt.Description,
unit: mt.Unit,
Expand Down
24 changes: 22 additions & 2 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,17 @@ func (p *processJob) Run() {
p.AddErrors(fmt.Errorf("unsupported metric type. {%v}", m))
}
}
enc.Encode(metrics)
err := enc.Encode(metrics)
if err != nil {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"plugin-name": p.name,
"plugin-version": p.version,
"error": err,
}).Error("encoding error")
}
_, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
Expand Down Expand Up @@ -467,7 +477,17 @@ func (p *publisherJob) Run() {
panic(fmt.Sprintf("unsupported type %T", mt))
}
}
enc.Encode(metrics)
err := enc.Encode(metrics)
if err != nil {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"plugin-name": p.name,
"plugin-version": p.version,
"error": err,
}).Error("encoding error")
}
errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
Expand Down

0 comments on commit 30aa88d

Please sign in to comment.