diff --git a/.gitignore b/.gitignore index 27ccac90..267a1aa4 100644 --- a/.gitignore +++ b/.gitignore @@ -30,5 +30,6 @@ _testmain.go .DS_Store node_modules/ +.idea diff --git a/datadog/README.md b/datadog/README.md new file mode 100644 index 00000000..e4942b45 --- /dev/null +++ b/datadog/README.md @@ -0,0 +1 @@ +A buffered datadog reporter diff --git a/datadog/reporter.go b/datadog/reporter.go new file mode 100644 index 00000000..c4c48ce3 --- /dev/null +++ b/datadog/reporter.go @@ -0,0 +1,296 @@ +package datadog + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "sync" + "time" + + "github.com/uber-go/tally" +) + +const ( + // DefaultBufferSize contains the number of metrics the datadog reporter + // will capture before forcing a flush + DefaultBufferSize = 512 +) + +const ( + typeGauge = "gauge" + typeRate = "rate" + typeCounter = "counter" + typeTimer = "timer" +) + +var ( + poolMetric = &sync.Pool{ + New: func() interface{} { + return &metric{} + }, + } +) + +type metric struct { + Name string `json:"metric"` + Points [][]float64 `json:"points"` + Type string `json:"type"` + Host string `json:"host,omitempty"` + Tags []string `json:"tags,omitempty"` +} + +func (m *metric) Reset() { + m.Name = "" + m.Points = nil + m.Type = "" + m.Host = "" + m.Tags = nil +} + +type series struct { + Metrics []*metric `json:"series"` +} + +type Reporter struct { + metrics []*metric + bufSize int + offset int + handlerFunc func(*http.Request) (*http.Response, error) + output io.Writer + mux *sync.Mutex + endpoint string +} + +// Reporting returns whether the reporter has the ability to actively report. +func (r *Reporter) Reporting() bool { + return false +} + +// Tagging returns whether the reporter has the capability for tagged metrics. +func (r *Reporter) Tagging() bool { + return true +} + +// Capabilities returns the capabilities description of the reporter. +func (r *Reporter) Capabilities() tally.Capabilities { + return r +} + +// post encoded metrics to datadog +func (r *Reporter) post(data []byte) error { + req, err := http.NewRequest("POST", r.endpoint, bytes.NewReader(data)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := r.handlerFunc(req) + if err != nil { + return err + } + defer resp.Body.Close() + + io.Copy(r.output, resp.Body) + io.WriteString(r.output, "\n") + + return nil +} + +// submit the set of metrics to datadog. submit will retry the operation if +// unsuccessful +func (r *Reporter) submit(metrics []*metric) { + s := series{Metrics: metrics} + + data, err := json.Marshal(s) + if err != nil { + return + } + + for attempts := 0; attempts < 3; attempts++ { + if err := r.post(data); err != nil { + fmt.Fprintln(os.Stderr, "failed to connect to host") + fmt.Fprintln(r.output, "failed to connect to host") + time.Sleep(time.Second * 15) + continue + } + + break + } + + for _, m := range metrics { + m.Reset() + poolMetric.Put(m) + } +} + +// flush is a thread-unsafe flush. assumes caller has already obtained lock +func (r *Reporter) flush() { + if r.offset == 0 { + return + } + + var metrics []*metric + metrics = append(metrics, r.metrics[0:r.offset]...) + go r.submit(metrics) + + for i := 0; i < r.offset; i++ { + r.metrics[i] = nil + } + + r.offset = 0 +} + +// pushMetric provides a helper to push a metric onto the stack. captures lock +func (r *Reporter) pushMetric(m *metric) { + r.mux.Lock() + defer r.mux.Unlock() + + r.metrics[r.offset] = m + r.offset++ + + if r.offset == r.bufSize { + r.flush() + } +} + +// push is a helper function to push metrics onto the queue +func (r *Reporter) push(name, metricType string, tags map[string]string, value float64) { + m := poolMetric.Get().(*metric) + m.Name = name + m.Type = metricType + m.Points = [][]float64{ + { + float64(time.Now().Unix()), + value, + }, + } + for k, v := range tags { + m.Tags = append(m.Tags, k+":"+v) + } + + r.pushMetric(m) +} + +// Flush asks the reporter to flush all reported values. +func (r *Reporter) Flush() { + r.mux.Lock() + defer r.mux.Unlock() + + r.flush() +} + +// ReportCounter reports a counter value +func (r *Reporter) ReportCounter( + name string, + tags map[string]string, + value int64, +) { + r.push(name, typeCounter, tags, float64(value)) +} + +// ReportGauge reports a gauge value +func (r *Reporter) ReportGauge( + name string, + tags map[string]string, + value float64, +) { + r.push(name, typeGauge, tags, value) +} + +// ReportTimer reports a timer value +func (r *Reporter) ReportTimer( + name string, + tags map[string]string, + interval time.Duration, +) { + r.push(name, typeTimer, tags, float64(interval)/float64(time.Second)) +} + +// ReportHistogramValueSamples reports histogram samples for a bucket +func (r *Reporter) ReportHistogramValueSamples( + name string, + tags map[string]string, + buckets tally.Buckets, + bucketLowerBound, + bucketUpperBound float64, + samples int64, +) { + r.push(name+".min", typeGauge, tags, bucketLowerBound) + r.push(name+".max", typeGauge, tags, bucketUpperBound) + r.push(name+".count", typeRate, tags, float64(samples)) +} + +// ReportHistogramDurationSamples reports histogram samples for a bucket +func (r *Reporter) ReportHistogramDurationSamples( + name string, + tags map[string]string, + buckets tally.Buckets, + bucketLowerBound, + bucketUpperBound time.Duration, + samples int64, +) { + r.push(name+".min", typeGauge, tags, float64(bucketLowerBound)/float64(time.Second)) + r.push(name+".max", typeGauge, tags, float64(bucketUpperBound)/float64(time.Second)) + r.push(name+".count", typeRate, tags, float64(samples)) +} + +// Options holds the option values +type options struct { + bufferSize int + handlerFunc func(req *http.Request) (*http.Response, error) + writer io.Writer +} + +// Option provides functional arguments to datadog +type Option func(*options) + +// BufferSize specifies the size of the internal datadog buffer. Once the +// buffer number of metrics is reached, metrics will be posted to datadog +func BufferSize(n int) Option { + return func(o *options) { + o.bufferSize = n + } +} + +// HandlerFunc allows the http transport to be overridden; useful for testing +func HandlerFunc(h func(req *http.Request) (*http.Response, error)) Option { + return func(o *options) { + o.handlerFunc = h + } +} + +// Debug writes the datadog response to the provided writer +func Debug(w io.Writer) Option { + return func(o *options) { + o.writer = w + } +} + +// New returns a new datadog reporter +func New(apiKey string, opts ...Option) (*Reporter, error) { + endpoint := "https://app.datadoghq.com/api/v1/series?api_key=" + apiKey + + options := options{ + bufferSize: DefaultBufferSize, + handlerFunc: http.DefaultTransport.RoundTrip, + writer: ioutil.Discard, + } + for _, opt := range opts { + opt(&options) + } + + r := &Reporter{ + metrics: make([]*metric, options.bufferSize), + bufSize: options.bufferSize, + mux: &sync.Mutex{}, + endpoint: endpoint, + handlerFunc: options.handlerFunc, + output: options.writer, + } + + return r, nil +} diff --git a/datadog/reporter_test.go b/datadog/reporter_test.go new file mode 100644 index 00000000..69340a6b --- /dev/null +++ b/datadog/reporter_test.go @@ -0,0 +1,292 @@ +package datadog + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" +) + +func newHandler(t *testing.T, ch chan []*metric) func(req *http.Request) (*http.Response, error) { + return func(req *http.Request) (*http.Response, error) { + defer req.Body.Close() + data, err := ioutil.ReadAll(req.Body) + assert.Nil(t, err) + + s := series{} + assert.Nil(t, json.Unmarshal(data, &s)) + ch <- s.Metrics + + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(nil)), + }, nil + } +} + +func TestCapabilities(t *testing.T) { + reporter, err := New("blah", HandlerFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(nil)), + }, nil + })) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Second) + defer closer.Close() + + c := scope.Capabilities() + assert.NotNil(t, c) + assert.False(t, c.Reporting()) + assert.True(t, c.Tagging()) +} + +func TestGauge(t *testing.T) { + ch := make(chan []*metric) + defer close(ch) + + reporter, err := New("blah", HandlerFunc(newHandler(t, ch))) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Second) + defer closer.Close() + scope = scope.Tagged(map[string]string{"hello": "world"}) + + // Given + name := "sample" + value := 1.0 + + // When + g := scope.Gauge(name) + g.Update(value) + + // Then + assert.Nil(t, closer.Close()) + + metrics := <-ch // wait for content to arrive + + assert.Len(t, metrics, 1) + assertMetric(t, metrics[0], name, typeGauge, value, "hello:world") +} + +func TestCounter(t *testing.T) { + ch := make(chan []*metric) + defer close(ch) + + reporter, err := New("blah", HandlerFunc(newHandler(t, ch))) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Second) + defer closer.Close() + scope = scope.Tagged(map[string]string{"hello": "world"}) + + // Given + name := "sample" + value := int64(2) + + // When + g := scope.Counter(name) + g.Inc(value) + + // Then + assert.Nil(t, closer.Close()) + + metrics := <-ch // wait for content to arrive + + assert.Len(t, metrics, 1) + assertMetric(t, metrics[0], name, typeCounter, float64(value), "hello:world") +} + +func TestTimer(t *testing.T) { + ch := make(chan []*metric) + defer close(ch) + + reporter, err := New("blah", HandlerFunc(newHandler(t, ch))) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Second) + defer closer.Close() + scope = scope.Tagged(map[string]string{"hello": "world"}) + + // Given + name := "sample" + value := time.Duration(time.Second) + + // When + g := scope.Timer(name) + g.Record(value) + + // Then + assert.Nil(t, closer.Close()) + + metrics := <-ch // wait for content to arrive + + assert.Len(t, metrics, 1) + assertMetric(t, metrics[0], name, typeTimer, 1, "hello:world") +} + +func TestHistogramValue(t *testing.T) { + ch := make(chan []*metric) + defer close(ch) + + reporter, err := New("blah", HandlerFunc(newHandler(t, ch))) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Second) + defer closer.Close() + scope = scope.Tagged(map[string]string{"hello": "world"}) + + // Given + name := "sample" + value := 1.0 + + // When + g := scope.Histogram(name, tally.ValueBuckets{0, 1.0, 2.0}) + g.RecordValue(value) + g.RecordValue(value) + + // Then + assert.Nil(t, closer.Close()) + + metrics := <-ch // wait for content to arrive + + assert.Len(t, metrics, 3) + assertMetric(t, metrics[0], name+".min", typeGauge, 0, "hello:world") + assertMetric(t, metrics[1], name+".max", typeGauge, value, "hello:world") + assertMetric(t, metrics[2], name+".count", typeRate, 2, "hello:world") +} + +func TestHistogramDuration(t *testing.T) { + ch := make(chan []*metric) + defer close(ch) + + reporter, err := New("blah", HandlerFunc(newHandler(t, ch))) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Second) + defer closer.Close() + scope = scope.Tagged(map[string]string{"hello": "world"}) + + // Given + name := "sample" + value := time.Second + + // When + g := scope.Histogram(name, tally.DurationBuckets{0, time.Second, time.Minute}) + g.RecordDuration(value) + g.RecordDuration(value) + + // Then + assert.Nil(t, closer.Close()) + + metrics := <-ch // wait for content to arrive + + assert.Len(t, metrics, 3) + assertMetric(t, metrics[0], name+".min", typeGauge, 0, "hello:world") + assertMetric(t, metrics[1], name+".max", typeGauge, 1, "hello:world") + assertMetric(t, metrics[2], name+".count", typeRate, 2, "hello:world") +} + +func TestBufferSizeForcesFlush(t *testing.T) { + ch := make(chan []*metric) + defer close(ch) + + reporter, err := New("blah", HandlerFunc(newHandler(t, ch)), BufferSize(1)) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Hour) + defer closer.Close() + + // When + timer := scope.Timer("name") + timer.Record(time.Second) + + // Then + metrics := <-ch + assert.Len(t, metrics, 1) + assertMetric(t, metrics[0], "name", typeTimer, 1.0) +} + +func TestBufferSize(t *testing.T) { + bufSize := 123 + opts := options{} + BufferSize(bufSize)(&opts) + assert.EqualValues(t, bufSize, opts.bufferSize) +} + +func TestOutput(t *testing.T) { + opts := options{} + Debug(os.Stdout)(&opts) + assert.EqualValues(t, os.Stdout, opts.writer) +} + +func assertMetric(t *testing.T, m *metric, name, metricType string, value float64, tags ...string) { + assert.EqualValues(t, name, m.Name) + assert.EqualValues(t, metricType, m.Type) + + if len(tags) > 0 { + assert.EqualValues(t, tags, m.Tags) + } + assert.Len(t, m.Points, 1) + assert.Len(t, m.Points[0], 2) + assert.NotZero(t, m.Points[0][0]) + assert.EqualValues(t, value, m.Points[0][1]) +} + +func TestLive(t *testing.T) { + apiKey := os.Getenv("API_KEY") + if apiKey == "" { + t.SkipNow() + } + + reporter, err := New(apiKey) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Second) + defer closer.Close() + + gauge := scope.Gauge("test.gauge") + gauge.Update(1) + + counter := scope.Counter("test.counter") + counter.Inc(2) + + timer := scope.Timer("test.timer") + timer.Record(time.Second * 3) + + value := scope.Histogram("test.histogram.value", tally.ValueBuckets{0, 5, 10}) + value.RecordValue(4) + + duration := scope.Histogram("test.histogram.duration", tally.DurationBuckets{0, time.Second * 10, time.Second * 20}) + duration.RecordDuration(5 * time.Second) + + time.Sleep(time.Second * 5) +} + +func BenchmarkDatadog(t *testing.B) { + resp := &http.Response{ + Body: ioutil.NopCloser(strings.NewReader("")), + } + + reporter, err := New("blah", HandlerFunc(func(req *http.Request) (*http.Response, error) { + return resp, nil + })) + assert.Nil(t, err) + + scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Millisecond*250) + defer closer.Close() + + timer := scope.Timer("test.timer") + + for i := 0; i < t.N; i++ { + timer.Record(time.Second) + } +}