Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
refactor, pull series metrics gathering into a function
Browse files Browse the repository at this point in the history
  • Loading branch information
fitzoh committed Feb 28, 2020
1 parent ed4457c commit e25199a
Showing 1 changed file with 36 additions and 31 deletions.
67 changes: 36 additions & 31 deletions cmd/mt-parrot/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (

var (
httpError = stats.NewCounter32("parrot.monitoring.error;error=http")
decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode")
decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode")
invalidError = stats.NewCounter32("parrot.monitoring.error;error=invalid")
)

var metricsBySeries []partitionMetrics

type seriesStats struct {
lastTs uint32
//the partition currently being checked
Expand All @@ -42,7 +44,7 @@ type partitionMetrics struct {
}

func monitor() {
metricsBySeries := initMetricsBySeries()
initMetricsBySeries()
for tick := range clock.AlignedTickLossless(queryInterval) {

query := graphite.ExecuteRenderQuery(buildRequest(tick))
Expand All @@ -54,41 +56,45 @@ func monitor() {
}

for _, s := range query.Decoded {
log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts)
partition, err := strconv.Atoi(s.Target)
if err != nil {
log.Debug("unable to parse partition", err)
invalidError.Inc()
continue
}
serStats := seriesStats{}
serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts
processPartitionSeries(s)
}
}
}

for _, dp := range s.Datapoints {

if math.IsNaN(dp.Val) {
serStats.nans += 1
continue
}
serStats.lastSeen = dp.Ts
if diff := dp.Val - float64(dp.Ts); diff != 0 {
log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff)
serStats.deltaSum += diff
serStats.numNonMatching += 1
}
}
func processPartitionSeries(s graphite.Series) {
log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts)
partition, err := strconv.Atoi(s.Target)
if err != nil {
log.Debug("unable to parse partition", err)
invalidError.Inc()
return
}
serStats := seriesStats{}
serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts

for _, dp := range s.Datapoints {

metrics := metricsBySeries[partition]
metrics.nanCount.Set(int(serStats.nans))
metrics.lag.Set(int(serStats.lastTs - serStats.lastSeen))
metrics.deltaSum.Set(int(serStats.deltaSum))
metrics.nonMatching.Set(int(serStats.numNonMatching))
if math.IsNaN(dp.Val) {
serStats.nans += 1
continue
}
serStats.lastSeen = dp.Ts
if diff := dp.Val - float64(dp.Ts); diff != 0 {
log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff)
serStats.deltaSum += diff
serStats.numNonMatching += 1
}
}

metrics := metricsBySeries[partition]
metrics.nanCount.Set(int(serStats.nans))
metrics.lag.Set(int(serStats.lastTs - serStats.lastSeen))
metrics.deltaSum.Set(int(serStats.deltaSum))
metrics.nonMatching.Set(int(serStats.numNonMatching))
}

func initMetricsBySeries() []partitionMetrics {
var metricsBySeries []partitionMetrics
func initMetricsBySeries() {
for p := 0; p < int(partitionCount); p++ {
metrics := partitionMetrics{
nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)),
Expand All @@ -98,7 +104,6 @@ func initMetricsBySeries() []partitionMetrics {
}
metricsBySeries = append(metricsBySeries, metrics)
}
return metricsBySeries
}

func buildRequest(now time.Time) *http.Request {
Expand Down

0 comments on commit e25199a

Please sign in to comment.