Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics/influx: fix the package implementation #369

Merged
merged 5 commits into from
Oct 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions metrics/influx/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package influx

import (
"fmt"
"regexp"

influxdb "github.com/influxdata/influxdb/client/v2"

"github.com/go-kit/kit/log"
)

func ExampleCounter() {
in := New(map[string]string{"a": "b"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
counter := in.NewCounter("influx_counter")
counter.Add(10)
counter.With("error", "true").Add(1)
counter.With("error", "false").Add(2)
counter.Add(50)

client := &bufWriter{}
in.WriteTo(client)

expectedLines := []string{
`(influx_counter,a=b count=60) [0-9]{19}`,
`(influx_counter,a=b,error=true count=1) [0-9]{19}`,
`(influx_counter,a=b,error=false count=2) [0-9]{19}`,
}

if err := extractAndPrintMessage(expectedLines, client.buf.String()); err != nil {
fmt.Println(err.Error())
}

// Output:
// influx_counter,a=b count=60
// influx_counter,a=b,error=true count=1
// influx_counter,a=b,error=false count=2
}

func ExampleGauge() {
in := New(map[string]string{"a": "b"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
gauge := in.NewGauge("influx_gauge")
gauge.Set(10)
gauge.With("error", "true").Set(2)
gauge.With("error", "true").Set(1)
gauge.With("error", "false").Set(2)
gauge.Set(50)

client := &bufWriter{}
in.WriteTo(client)

expectedLines := []string{
`(influx_gauge,a=b value=50) [0-9]{19}`,
`(influx_gauge,a=b,error=true value=1) [0-9]{19}`,
`(influx_gauge,a=b,error=false value=2) [0-9]{19}`,
}

if err := extractAndPrintMessage(expectedLines, client.buf.String()); err != nil {
fmt.Println(err.Error())
}

// Output:
// influx_gauge,a=b value=50
// influx_gauge,a=b,error=true value=1
// influx_gauge,a=b,error=false value=2
}

func ExampleHistogram() {
in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
histogram := in.NewHistogram("influx_histogram")
histogram.Observe(float64(10))
histogram.With("error", "true").Observe(float64(1))
histogram.With("error", "false").Observe(float64(2))
histogram.Observe(float64(50))

client := &bufWriter{}
in.WriteTo(client)

expectedLines := []string{
`(influx_histogram,foo=alpha p50=10,p90=50,p95=50,p99=50) [0-9]{19}`,
`(influx_histogram,error=true,foo=alpha p50=1,p90=1,p95=1,p99=1) [0-9]{19}`,
`(influx_histogram,error=false,foo=alpha p50=2,p90=2,p95=2,p99=2) [0-9]{19}`,
}

if err := extractAndPrintMessage(expectedLines, client.buf.String()); err != nil {
fmt.Println(err.Error())
}

// Output:
// influx_histogram,foo=alpha p50=10,p90=50,p95=50,p99=50
// influx_histogram,error=true,foo=alpha p50=1,p90=1,p95=1,p99=1
// influx_histogram,error=false,foo=alpha p50=2,p90=2,p95=2,p99=2
}

func extractAndPrintMessage(expected []string, msg string) error {
for _, pattern := range expected {
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(msg)
if len(match) != 2 {
return fmt.Errorf("Pattern not found! {%s} [%s]: %v\n", pattern, msg, match)
}
fmt.Println(match[1])
}
return nil
}
60 changes: 33 additions & 27 deletions metrics/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
)

Expand All @@ -20,14 +21,13 @@ import (
// one data point per flush, with a "count" field that reflects all adds since
// the last flush. Gauges are modeled as a timeseries with one data point per
// flush, with a "value" field that reflects the current state of the gauge.
// Histograms are modeled as a timeseries with one data point per observation,
// with a "value" field that reflects each observation; use e.g. the HISTOGRAM
// aggregate function to compute histograms.
// Histograms are modeled as a timeseries with one data point per combination of tags,
// with a set of quantile fields that reflects the p50, p90, p95 & p99.
//
// Influx tags are immutable, attached to the Influx object, and given to each
// metric at construction. Influx fields are mapped to Go kit label values, and
// may be mutated via With functions. Actual metric values are provided as
// fields with specific names depending on the metric.
// Influx tags are attached to the Influx object, can be given to each
// metric at construction and can be updated anytime via With function. Influx fields
// are mapped to Go kit label values directly by this collector. Actual metric
// values are provided as fields with specific names depending on the metric.
//
// All observations are collected in memory locally, and flushed on demand.
type Influx struct {
Expand Down Expand Up @@ -108,10 +108,10 @@ func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
now := time.Now()

in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
fields := fieldsFrom(lvs)
fields["count"] = sum(values)
tags := mergeTags(in.tags, lvs)
var p *influxdb.Point
p, err = influxdb.NewPoint(name, in.tags, fields, now)
fields := map[string]interface{}{"count": sum(values)}
p, err = influxdb.NewPoint(name, tags, fields, now)
if err != nil {
return false
}
Expand All @@ -123,10 +123,10 @@ func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
}

in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
fields := fieldsFrom(lvs)
fields["value"] = last(values)
tags := mergeTags(in.tags, lvs)
var p *influxdb.Point
p, err = influxdb.NewPoint(name, in.tags, fields, now)
fields := map[string]interface{}{"value": last(values)}
p, err = influxdb.NewPoint(name, tags, fields, now)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please update the comment on the Influx struct to reflect the new role of fields and tags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Let's see what do you think about it...

BTW, very nice project!!!

if err != nil {
return false
}
Expand All @@ -138,16 +138,23 @@ func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
}

in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
fields := fieldsFrom(lvs)
ps := make([]*influxdb.Point, len(values))
for i, v := range values {
fields["value"] = v // overwrite each time
ps[i], err = influxdb.NewPoint(name, in.tags, fields, now)
if err != nil {
return false
}
histogram := generic.NewHistogram(name, 50)
tags := mergeTags(in.tags, lvs)
var p *influxdb.Point
for _, v := range values {
histogram.Observe(v)
}
fields := map[string]interface{}{
"p50": histogram.Quantile(0.50),
"p90": histogram.Quantile(0.90),
"p95": histogram.Quantile(0.95),
"p99": histogram.Quantile(0.99),
}
p, err = influxdb.NewPoint(name, tags, fields, now)
if err != nil {
return false
}
bp.AddPoints(ps)
bp.AddPoint(p)
return true
})
if err != nil {
Expand All @@ -157,15 +164,14 @@ func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
return w.Write(bp)
}

func fieldsFrom(labelValues []string) map[string]interface{} {
func mergeTags(tags map[string]string, labelValues []string) map[string]string {
if len(labelValues)%2 != 0 {
panic("fieldsFrom received a labelValues with an odd number of strings")
panic("mergeTags received a labelValues with an odd number of strings")
}
fields := make(map[string]interface{}, len(labelValues)/2)
for i := 0; i < len(labelValues); i += 2 {
fields[labelValues[i]] = labelValues[i+1]
tags[labelValues[i]] = labelValues[i+1]
}
return fields
return tags
}

func sum(a []float64) float64 {
Expand Down
17 changes: 9 additions & 8 deletions metrics/influx/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
influxdb "github.com/influxdata/influxdb/client/v2"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/teststat"
)

Expand Down Expand Up @@ -49,18 +48,20 @@ func TestGauge(t *testing.T) {

func TestHistogram(t *testing.T) {
in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
re := regexp.MustCompile(`influx_histogram,foo=alpha bar="beta",value=([0-9\.]+) [0-9]+`)
re := regexp.MustCompile(`influx_histogram,bar=beta,foo=alpha p50=([0-9\.]+),p90=([0-9\.]+),p95=([0-9\.]+),p99=([0-9\.]+) [0-9]+`)
histogram := in.NewHistogram("influx_histogram").With("bar", "beta")
quantiles := func() (float64, float64, float64, float64) {
w := &bufWriter{}
in.WriteTo(w)
h := generic.NewHistogram("h", 50)
matches := re.FindAllStringSubmatch(w.buf.String(), -1)
for _, match := range matches {
f, _ := strconv.ParseFloat(match[1], 64)
h.Observe(f)
match := re.FindStringSubmatch(w.buf.String())
if len(match) != 5 {
t.Errorf("These are not the quantiles you're looking for: %v\n", match)
}
return h.Quantile(0.50), h.Quantile(0.90), h.Quantile(0.95), h.Quantile(0.99)
var result [4]float64
for i, q := range match[1:] {
result[i], _ = strconv.ParseFloat(q, 64)
}
return result[0], result[1], result[2], result[3]
}
if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
t.Fatal(err)
Expand Down