diff --git a/metrics/cloudwatch2/cloudwatch2.go b/metrics/cloudwatch2/cloudwatch2.go new file mode 100644 index 000000000..4d006675e --- /dev/null +++ b/metrics/cloudwatch2/cloudwatch2.go @@ -0,0 +1,241 @@ +// Package cloudwatch2 emits all data as a StatisticsSet (rather than +// a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK. +package cloudwatch2 + +import ( + "math" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" + "golang.org/x/sync/errgroup" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/internal/convert" + "github.com/go-kit/kit/metrics/internal/lv" +) + +const ( + maxConcurrentRequests = 20 +) + +// CloudWatch receives metrics observations and forwards them to CloudWatch. +// Create a CloudWatch object, use it to create metrics, and pass those metrics as +// dependencies to the components that will use them. +// +// To regularly report metrics to CloudWatch, use the WriteLoop helper method. +type CloudWatch struct { + mtx sync.RWMutex + sem chan struct{} + namespace string + svc cloudwatchiface.CloudWatchAPI + counters *lv.Space + logger log.Logger + numConcurrentRequests int +} + +// Option is a function adapter to change config of the CloudWatch struct +type Option func(*CloudWatch) + +// WithLogger sets the Logger that will recieve error messages generated +// during the WriteLoop. By default, no logger is used. +func WithLogger(logger log.Logger) Option { + return func(cw *CloudWatch) { + cw.logger = logger + } +} + +// WithConcurrentRequests sets the upper limit on how many +// cloudwatch.PutMetricDataRequest may be under way at any +// given time. If n is greater than 20, 20 is used. By default, +// the max is set at 10 concurrent requests. +func WithConcurrentRequests(n int) Option { + return func(cw *CloudWatch) { + if n > maxConcurrentRequests { + n = maxConcurrentRequests + } + cw.numConcurrentRequests = n + } +} + +// New returns a CloudWatch object that may be used to create metrics. +// Namespace is applied to all created metrics and maps to the CloudWatch namespace. +// Callers must ensure that regular calls to Send are performed, either +// manually or with one of the helper methods. +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...Option) *CloudWatch { + cw := &CloudWatch{ + namespace: namespace, + svc: svc, + counters: lv.NewSpace(), + numConcurrentRequests: 10, + logger: log.NewNopLogger(), + } + + for _, optFunc := range options { + optFunc(cw) + } + + cw.sem = make(chan struct{}, cw.numConcurrentRequests) + + return cw +} + +// NewCounter returns a counter. Observations are aggregated and emitted once +// per write invocation. +func (cw *CloudWatch) NewCounter(name string) metrics.Counter { + return &Counter{ + name: name, + obs: cw.counters.Observe, + } +} + +// NewGauge returns an gauge. Under the covers, there is no distinctions +// in CloudWatch for how Counters/Histograms/Gauges are reported, so this +// just wraps a cloudwatch2.Counter. +func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { + return convert.NewCounterAsGauge(cw.NewCounter(name)) +} + +// NewHistogram returns a histogram. Under the covers, there is no distinctions +// in CloudWatch for how Counters/Histograms/Gauges are reported, so this +// just wraps a cloudwatch2.Counter. +func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram { + return convert.NewCounterAsHistogram(cw.NewCounter(name)) +} + +// WriteLoop is a helper method that invokes Send every time the passed +// channel fires. This method blocks until the channel is closed, so clients +// probably want to run it in its own goroutine. For typical usage, create a +// time.Ticker and pass its C channel to this method. +func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { + for range c { + if err := cw.Send(); err != nil { + cw.logger.Log("during", "Send", "err", err) + } + } +} + +// Send will fire an API request to CloudWatch with the latest stats for +// all metrics. It is preferred that the WriteLoop method is used. +func (cw *CloudWatch) Send() error { + cw.mtx.RLock() + defer cw.mtx.RUnlock() + now := time.Now() + + var datums []cloudwatch.MetricDatum + + cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + datums = append(datums, cloudwatch.MetricDatum{ + MetricName: aws.String(name), + Dimensions: makeDimensions(lvs...), + StatisticValues: stats(values), + Timestamp: aws.Time(now), + }) + return true + }) + + var batches [][]cloudwatch.MetricDatum + for len(datums) > 0 { + var batch []cloudwatch.MetricDatum + lim := len(datums) + if lim > maxConcurrentRequests { + lim = maxConcurrentRequests + } + batch, datums = datums[:lim], datums[lim:] + batches = append(batches, batch) + } + + var g errgroup.Group + for _, batch := range batches { + batch := batch + g.Go(func() error { + cw.sem <- struct{}{} + defer func() { + <-cw.sem + }() + req := cw.svc.PutMetricDataRequest(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.namespace), + MetricData: batch, + }) + _, err := req.Send() + return err + }) + } + return g.Wait() +} + +var zero = float64(0.0) + +// Just build this once to reduce construction costs whenever +// someone does a Send with no aggregated values. +var zeros = cloudwatch.StatisticSet{ + Maximum: &zero, + Minimum: &zero, + Sum: &zero, + SampleCount: &zero, +} + +func stats(a []float64) *cloudwatch.StatisticSet { + count := float64(len(a)) + if count == 0 { + return &zeros + } + + var sum float64 + var min = math.MaxFloat64 + var max = math.MaxFloat64 * -1 + for _, f := range a { + sum += f + if f < min { + min = f + } + if f > max { + max = f + } + } + + return &cloudwatch.StatisticSet{ + Maximum: &max, + Minimum: &min, + Sum: &sum, + SampleCount: &count, + } +} + +func makeDimensions(labelValues ...string) []cloudwatch.Dimension { + dimensions := make([]cloudwatch.Dimension, len(labelValues)/2) + for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { + dimensions[j] = cloudwatch.Dimension{ + Name: aws.String(labelValues[i]), + Value: aws.String(labelValues[i+1]), + } + } + return dimensions +} + +type observeFunc func(name string, lvs lv.LabelValues, value float64) + +// Counter is a counter. Observations are forwarded to a node +// object, and aggregated per timeseries. +type Counter struct { + name string + lvs lv.LabelValues + obs observeFunc +} + +// With implements metrics.Counter. +func (c *Counter) With(labelValues ...string) metrics.Counter { + return &Counter{ + name: c.name, + lvs: c.lvs.With(labelValues...), + obs: c.obs, + } +} + +// Add implements metrics.Counter. +func (c *Counter) Add(delta float64) { + c.obs(c.name, c.lvs, delta) +} diff --git a/metrics/cloudwatch2/cloudwatch2_test.go b/metrics/cloudwatch2/cloudwatch2_test.go new file mode 100644 index 000000000..1561299cc --- /dev/null +++ b/metrics/cloudwatch2/cloudwatch2_test.go @@ -0,0 +1,147 @@ +package cloudwatch2 + +import ( + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" +) + +func TestStats(t *testing.T) { + testCases := []struct { + name string + vals []float64 + xMin float64 + xMax float64 + xSum float64 + xCt float64 + }{ + { + "empty", + []float64{}, + 0.0, + 0.0, + 0.0, + 0.0, + }, + { + "single", + []float64{3.1416}, + 3.1416, + 3.1416, + 3.1416, + 1.0, + }, + { + "double", + []float64{1.0, 9.0}, + 1.0, + 9.0, + 10.0, + 2.0, + }, + { + "multiple", + []float64{5.0, 1.0, 9.0, 5.0}, + 1.0, + 9.0, + 20.0, + 4.0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + s := stats(tc.vals) + if tc.xMin != *s.Minimum { + t.Errorf("expected [%f]: %f\n", tc.xMin, *s.Minimum) + } + if tc.xMax != *s.Maximum { + t.Errorf("expected [%f]: %f\n", tc.xMax, *s.Maximum) + } + if tc.xSum != *s.Sum { + t.Errorf("expected [%f]: %f\n", tc.xSum, *s.Sum) + } + if tc.xCt != *s.SampleCount { + t.Errorf("expected [%f]: %f\n", tc.xCt, *s.SampleCount) + } + }) + } +} + +type mockCloudWatch struct { + cloudwatchiface.CloudWatchAPI + latestName string + latestData []cloudwatch.MetricDatum +} + +func (mcw *mockCloudWatch) PutMetricDataRequest(in *cloudwatch.PutMetricDataInput) cloudwatch.PutMetricDataRequest { + mcw.latestName = *in.Namespace + mcw.latestData = in.MetricData + return cloudwatch.PutMetricDataRequest{ + // To mock the V2 API, most of the functions spit + // out structs that you need to call Send() on. + // The non-intuitive thing is that to get the Send() to avoid actually + // going across the wire, you just create a dumb aws.Request with either + // aws.Request.Data defined (for succes) or with aws.Request.Error + // to simulate an Error. + Request: &aws.Request{Data: &cloudwatch.PutMetricDataOutput{}}, + Input: in, + } +} + +func TestSend(t *testing.T) { + ns := "example-namespace" + svc := &mockCloudWatch{} + cw := New(ns, svc) + + c := cw.NewCounter("c").With("charlie", "cat") + h := cw.NewHistogram("h").With("hotel", "horse") + g := cw.NewGauge("g").With("golf", "giraffe") + + c.Add(4.0) + c.Add(5.0) + c.Add(6.0) + h.Observe(3.0) + h.Observe(5.0) + h.Observe(7.0) + g.Set(2.0) + g.Set(5.0) + g.Set(8.0) + + err := cw.Send() + if err != nil { + t.Fatalf("unexpected: %v\n", err) + } + + if ns != svc.latestName { + t.Errorf("expected namespace %q; not %q\n", ns, svc.latestName) + } + + if len(svc.latestData) != 3 { + t.Errorf("expected 3 datums: %v\n", svc.latestData) + } + for _, datum := range svc.latestData { + initial := *datum.MetricName + if len(datum.Dimensions) != 1 { + t.Errorf("expected 1 dimension: %v\n", datum) + } + if !strings.HasPrefix(*datum.Dimensions[0].Name, initial) { + t.Errorf("expected %q in Name of %v\n", initial, datum.Dimensions) + } + if !strings.HasPrefix(*datum.Dimensions[0].Value, initial) { + t.Errorf("expected %q in Value of %v\n", initial, datum.Dimensions) + } + if datum.StatisticValues == nil { + t.Errorf("expected StatisticValues in %v\n", datum) + } + if *datum.StatisticValues.Sum != 15.0 { + t.Errorf("expected 15.0 for Sum in %v\n", datum) + } + if *datum.StatisticValues.SampleCount != 3.0 { + t.Errorf("expected 3.0 for SampleCount in %v\n", datum) + } + } +} diff --git a/metrics/internal/convert/convert.go b/metrics/internal/convert/convert.go new file mode 100644 index 000000000..9c83a8bef --- /dev/null +++ b/metrics/internal/convert/convert.go @@ -0,0 +1,135 @@ +// Package convert provides a way to use Counters, Histograms, or Gauges +// as one of the other types +package convert + +import "github.com/go-kit/kit/metrics" + +type counterHistogram struct { + c metrics.Counter +} + +// NewCounterAsHistogram returns a Histogram that actually writes the +// value on an underlying Counter +func NewCounterAsHistogram(c metrics.Counter) metrics.Histogram { + return counterHistogram{c} +} + +// With implements Histogram. +func (ch counterHistogram) With(labelValues ...string) metrics.Histogram { + return counterHistogram{ch.c.With(labelValues...)} +} + +// Observe implements histogram. +func (ch counterHistogram) Observe(value float64) { + ch.c.Add(value) +} + +type histogramCounter struct { + h metrics.Histogram +} + +// NewHistogramAsCounter returns a Counter that actually writes the +// value on an underlying Histogram +func NewHistogramAsCounter(h metrics.Histogram) metrics.Counter { + return histogramCounter{h} +} + +// With implements Counter. +func (hc histogramCounter) With(labelValues ...string) metrics.Counter { + return histogramCounter{hc.h.With(labelValues...)} +} + +// Add implements Counter. +func (hc histogramCounter) Add(delta float64) { + hc.h.Observe(delta) +} + +type counterGauge struct { + c metrics.Counter +} + +// NewCounterAsGauge returns a Gauge that actually writes the +// value on an underlying Counter +func NewCounterAsGauge(c metrics.Counter) metrics.Gauge { + return counterGauge{c} +} + +// With implements Gauge. +func (cg counterGauge) With(labelValues ...string) metrics.Gauge { + return counterGauge{cg.c.With(labelValues...)} +} + +// Set implements Gauge. +func (cg counterGauge) Set(value float64) { + cg.c.Add(value) +} + +// Add implements metrics.Gauge. +func (cg counterGauge) Add(delta float64) { + cg.c.Add(delta) +} + +type gaugeCounter struct { + g metrics.Gauge +} + +// NewGaugeAsCounter returns a Counter that actually writes the +// value on an underlying Gauge +func NewGaugeAsCounter(g metrics.Gauge) metrics.Counter { + return gaugeCounter{g} +} + +// With implements Counter. +func (gc gaugeCounter) With(labelValues ...string) metrics.Counter { + return gaugeCounter{gc.g.With(labelValues...)} +} + +// Add implements Counter. +func (gc gaugeCounter) Add(delta float64) { + gc.g.Set(delta) +} + +type histogramGauge struct { + h metrics.Histogram +} + +// NewHistogramAsGauge returns a Gauge that actually writes the +// value on an underlying Histogram +func NewHistogramAsGauge(h metrics.Histogram) metrics.Gauge { + return histogramGauge{h} +} + +// With implements Gauge. +func (hg histogramGauge) With(labelValues ...string) metrics.Gauge { + return histogramGauge{hg.h.With(labelValues...)} +} + +// Set implements Gauge. +func (hg histogramGauge) Set(value float64) { + hg.h.Observe(value) +} + +// Add implements metrics.Gauge. +func (hg histogramGauge) Add(delta float64) { + hg.h.Observe(delta) +} + +type gaugeHistogram struct { + g metrics.Gauge +} + +// NewGaugeAsHistogram returns a Histogram that actually writes the +// value on an underlying Gauge +func NewGaugeAsHistogram(g metrics.Gauge) metrics.Histogram { + return gaugeHistogram{g} +} + +// With implements Histogram. +func (gh gaugeHistogram) With(labelValues ...string) metrics.Histogram { + return gaugeHistogram{gh.g.With(labelValues...)} +} + +// Observe implements histogram. +func (gh gaugeHistogram) Observe(value float64) { + gh.g.Set(value) +} diff --git a/metrics/internal/convert/convert_test.go b/metrics/internal/convert/convert_test.go new file mode 100644 index 000000000..df3ecf023 --- /dev/null +++ b/metrics/internal/convert/convert_test.go @@ -0,0 +1,58 @@ +package convert + +import ( + "testing" + + "github.com/go-kit/kit/metrics/generic" + "github.com/go-kit/kit/metrics/teststat" +) + +func TestCounterHistogramConversion(t *testing.T) { + name := "my_counter" + c := generic.NewCounter(name) + h := NewCounterAsHistogram(c) + top := NewHistogramAsCounter(h).With("label", "counter").(histogramCounter) + mid := top.h.(counterHistogram) + low := mid.c.(*generic.Counter) + if want, have := name, low.Name; want != have { + t.Errorf("Name: want %q, have %q", want, have) + } + value := func() float64 { return low.Value() } + if err := teststat.TestCounter(top, value); err != nil { + t.Fatal(err) + } +} + +func TestCounterGaugeConversion(t *testing.T) { + name := "my_counter" + c := generic.NewCounter(name) + g := NewCounterAsGauge(c) + top := NewGaugeAsCounter(g).With("label", "counter").(gaugeCounter) + mid := top.g.(counterGauge) + low := mid.c.(*generic.Counter) + if want, have := name, low.Name; want != have { + t.Errorf("Name: want %q, have %q", want, have) + } + value := func() float64 { return low.Value() } + if err := teststat.TestCounter(top, value); err != nil { + t.Fatal(err) + } +} + +func TestHistogramGaugeConversion(t *testing.T) { + name := "my_histogram" + h := generic.NewHistogram(name, 50) + g := NewHistogramAsGauge(h) + top := NewGaugeAsHistogram(g).With("label", "histogram").(gaugeHistogram) + mid := top.g.(histogramGauge) + low := mid.h.(*generic.Histogram) + if want, have := name, low.Name; want != have { + t.Errorf("Name: want %q, have %q", want, have) + } + quantiles := func() (float64, float64, float64, float64) { + return low.Quantile(0.50), low.Quantile(0.90), low.Quantile(0.95), low.Quantile(0.99) + } + if err := teststat.TestHistogram(top, quantiles, 0.01); err != nil { + t.Fatal(err) + } +}