Skip to content

Commit

Permalink
collect: Add aggregation support to collect
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Brandt committed Jul 8, 2015
1 parent 42266d2 commit a44ad58
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package collect // import "bosun.org/collect"

import (
"fmt"
"math"
"net/http"
"net/url"
"os"
"runtime"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -57,6 +59,7 @@ var (
counters = make(map[string]*addMetric)
sets = make(map[string]*setMetric)
puts = make(map[string]*putMetric)
aggs = make(map[string]*agMetric)
client = &http.Client{
Transport: &timeoutTransport{Transport: new(http.Transport)},
Timeout: time.Minute,
Expand Down Expand Up @@ -179,6 +182,104 @@ func setHostName() error {
return nil
}

type agMetric struct {
metric string
ts opentsdb.TagSet
values []float64
}

func AggregateMeta(metric string, desc string, rateType metadata.RateType, unit metadata.Unit) {
agStrings := []string{"avg", "count", "min", "median", "max", "95", "99"}
for _, ag := range agStrings {
metadata.AddMetricMeta(metric+"_"+ag, rateType, unit, desc)
}
}

func (am *agMetric) Process(now int64) {
var avg float64
for _, v := range am.values {
avg += v
}
avg /= float64(len(am.values))
extRoot := metricRoot + am.metric
tchan <- &opentsdb.DataPoint{
Metric: extRoot + "_avg",
Timestamp: now,
Value: avg,
Tags: am.ts,
}
tchan <- &opentsdb.DataPoint{
Metric: extRoot + "_count",
Timestamp: now,
Value: len(am.values),
Tags: am.ts,
}
tchan <- &opentsdb.DataPoint{
Metric: extRoot + "_min",
Timestamp: now,
Value: percentile(am.values, 0),
Tags: am.ts,
}
tchan <- &opentsdb.DataPoint{
Metric: extRoot + "_median",
Timestamp: now,
Value: percentile(am.values, .5),
Tags: am.ts,
}
tchan <- &opentsdb.DataPoint{
Metric: extRoot + "_max",
Timestamp: now,
Value: percentile(am.values, 1),
Tags: am.ts,
}
tchan <- &opentsdb.DataPoint{
Metric: extRoot + "_95",
Timestamp: now,
Value: percentile(am.values, .95),
Tags: am.ts,
}
tchan <- &opentsdb.DataPoint{
Metric: extRoot + "_99",
Timestamp: now,
Value: percentile(am.values, .99),
Tags: am.ts,
}
}

func percentile(dps []float64, p float64) (a float64) {
var x []float64
for _, v := range dps {
x = append(x, v)
}
sort.Float64s(x)
if p <= 0 {
return x[0]
}
if p >= 1 {
return x[len(x)-1]
}
i := p * float64(len(x)-1)
i = math.Ceil(i)
return x[int(i)]
}

func Sample(metric string, ts opentsdb.TagSet, v float64) error {
if err := check(metric, &ts); err != nil {
return err
}
tss := metric + ts.String()
mlock.Lock()
if aggs[tss] == nil {
aggs[tss] = &agMetric{
metric: metric,
ts: ts.Copy(),
}
}
aggs[tss].values = append(aggs[tss].values, v)
mlock.Unlock()
return nil
}

type setMetric struct {
metric string
ts opentsdb.TagSet
Expand Down Expand Up @@ -311,7 +412,11 @@ func collect() {
}
tchan <- dp
}
for _, am := range aggs {
am.Process(now)
}
puts = make(map[string]*putMetric)
aggs = make(map[string]*agMetric)
mlock.Unlock()
time.Sleep(Freq)
}
Expand Down

0 comments on commit a44ad58

Please sign in to comment.