Skip to content

Commit

Permalink
Refactor cloudwatch: Reset().Walk() on every Send(), like influx impl…
Browse files Browse the repository at this point in the history
… does.

Note there is a breaking API change, as the cloudwatch object now has optional parameters.
  • Loading branch information
feliksik committed Jul 31, 2017
1 parent 9689504 commit ee4f890
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 114 deletions.
289 changes: 189 additions & 100 deletions metrics/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cloudwatch

import (
"fmt"
"os"
"sync"
"time"

Expand All @@ -12,81 +13,131 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
)

const (
maxConcurrentRequests = 20
)

type Percentiles []struct {
s string
f float64
}

// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
sem chan struct{}
namespace string
numConcurrentRequests int
svc cloudwatchiface.CloudWatchAPI
counters map[string]*counter
gauges map[string]*gauge
histograms map[string]*histogram
mtx sync.RWMutex
sem chan struct{}
namespace string
svc cloudwatchiface.CloudWatchAPI
counters *lv.Space
gauges *lv.Space
histograms *lv.Space
*cwoptions
}

type cwoptions struct {
percentiles Percentiles
logger log.Logger
numConcurrentRequests int
}

type option func(*cwoptions)

func (s *cwoptions) apply(opt option) {
if opt != nil {
opt(s)
}
}

func WithLogger(logger log.Logger) option {
return func(o *cwoptions) {
o.logger = logger
}
}

func WithPercentiles(p Percentiles) option {
return func(o *cwoptions) {
validated := Percentiles{}
for _, entry := range p {
if entry.f < 0 || entry.f > 1 {
continue // illegal entry
}
validated = append(validated, entry)
}
o.percentiles = validated
}
}

func WithConcurrentRequests(n int) option {
return func(o *cwoptions) {
if n > maxConcurrentRequests {
n = maxConcurrentRequests
}
o.numConcurrentRequests = n
}
}

// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// NumConcurrent sets the number of simultaneous requests to Amazon.
// A good default value is 10 and the maximum is 20.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch {
if numConcurrent > maxConcurrentRequests {
numConcurrent = maxConcurrentRequests
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
useOptions := &cwoptions{
numConcurrentRequests: 10,
logger: log.NewLogfmtLogger(os.Stderr),
percentiles: Percentiles{
{"50", 0.50},
{"90", 0.90},
{"95", 0.95},
{"99", 0.99},
},
}

for _, opt := range options {
useOptions.apply(opt)
}

return &CloudWatch{
sem: make(chan struct{}, numConcurrent),
namespace: namespace,
numConcurrentRequests: numConcurrent,
sem: make(chan struct{}, useOptions.numConcurrentRequests),
namespace: namespace,
svc: svc,
counters: map[string]*counter{},
gauges: map[string]*gauge{},
histograms: map[string]*histogram{},
logger: logger,
counters: lv.NewSpace(),
gauges: lv.NewSpace(),
histograms: lv.NewSpace(),
cwoptions: useOptions,
}
}

// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
cw.mtx.Lock()
defer cw.mtx.Unlock()
c := &counter{c: generic.NewCounter(name)}
cw.counters[name] = c
return c
}

// NewGauge returns a gauge. Observations are aggregated and emitted once per
// write invocation.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
cw.mtx.Lock()
defer cw.mtx.Unlock()
g := &gauge{g: generic.NewGauge(name)}
cw.gauges[name] = g
return g
}

// NewHistogram returns a histogram. Observations are aggregated and emitted as
// per-quantile gauges, once per write invocation. 50 is a good default value
// for buckets.
func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram {
cw.mtx.Lock()
defer cw.mtx.Unlock()
h := &histogram{h: generic.NewHistogram(name, buckets)}
cw.histograms[name] = h
return h
return &Counter{
name: name,
obs: cw.counters.Observe,
}
}

// NewGauge returns an gauge.
func (cw *CloudWatch) NewGauge(name string) *Gauge {
return &Gauge{
name: name,
obs: cw.gauges.Observe,
add: cw.gauges.Add,
}
}

// NewHistogram returns a histogram.
func (cw *CloudWatch) NewHistogram(name string) *Histogram {
return &Histogram{
name: name,
obs: cw.histograms.Observe,
}
}

// WriteLoop is a helper method that invokes Send every time the passed
Expand All @@ -110,42 +161,46 @@ func (cw *CloudWatch) Send() error {

var datums []*cloudwatch.MetricDatum

for name, c := range cw.counters {
cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := sum(values)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(c.c.LabelValues()...),
Value: aws.Float64(c.c.Value()),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}
return true
})

for name, g := range cw.gauges {
cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := last(values)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(g.g.LabelValues()...),
Value: aws.Float64(g.g.Value()),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}
return true
})

for name, h := range cw.histograms {
for _, p := range []struct {
s string
f float64
}{
{"50", 0.50},
{"90", 0.90},
{"95", 0.95},
{"99", 0.99},
} {
cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
histogram := generic.NewHistogram(name, 50)

for _, v := range values {
histogram.Observe(v)
}

for _, p := range cw.percentiles {
value := histogram.Quantile(p.f)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)),
Dimensions: makeDimensions(h.h.LabelValues()...),
Value: aws.Float64(h.h.Quantile(p.f)),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}
}
return true
})

var batches [][]*cloudwatch.MetricDatum
for len(datums) > 0 {
Expand Down Expand Up @@ -179,64 +234,98 @@ func (cw *CloudWatch) Send() error {
return firstErr
}

func sum(a []float64) float64 {
var v float64
for _, f := range a {
v += f
}
return v
}

func last(a []float64) float64 {
return a[len(a)-1]
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

// counter is a CloudWatch counter metric.
type counter struct {
c *generic.Counter
type observeFunc func(name string, lvs lv.LabelValues, value float64)

// Counter is a counter. Observations are forwarded to a node
// object, and aggregated (summed) per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements counter
func (c *counter) With(labelValues ...string) metrics.Counter {
c.c = c.c.With(labelValues...).(*generic.Counter)
return c
// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}

// Add implements counter.
func (c *counter) Add(delta float64) {
c.c.Add(delta)
// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}

// gauge is a CloudWatch gauge metric.
type gauge struct {
g *generic.Gauge
// Gauge is a gauge. Observations are forwarded to a node
// object, and aggregated (the last observation selected) per timeseries.
type Gauge struct {
name string
lvs lv.LabelValues
obs observeFunc
add observeFunc
}

// With implements gauge
func (g *gauge) With(labelValues ...string) metrics.Gauge {
g.g = g.g.With(labelValues...).(*generic.Gauge)
return g
// With implements metrics.Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
return &Gauge{
name: g.name,
lvs: g.lvs.With(labelValues...),
obs: g.obs,
add: g.add,
}
}

// Set implements gauge
func (g *gauge) Set(value float64) {
g.g.Set(value)
// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
g.obs(g.name, g.lvs, value)
}

// Add implements gauge
func (g *gauge) Add(delta float64) {
g.g.Add(delta)
// Add implements metrics.Gauge.
func (g *Gauge) Add(delta float64) {
g.add(g.name, g.lvs, delta)
}

// histogram is a CloudWatch histogram metric
type histogram struct {
h *generic.Histogram
// Histogram is an Influx histrogram. Observations are aggregated into a
// generic.Histogram and emitted as per-quantile gauges to the Influx server.
type Histogram struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements histogram
func (h *histogram) With(labelValues ...string) metrics.Histogram {
h.h = h.h.With(labelValues...).(*generic.Histogram)
return h
// With implements metrics.Histogram.
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
return &Histogram{
name: h.name,
lvs: h.lvs.With(labelValues...),
obs: h.obs,
}
}

// Observe implements histogram
func (h *histogram) Observe(value float64) {
h.h.Observe(value)
// Observe implements metrics.Histogram.
func (h *Histogram) Observe(value float64) {
h.obs(h.name, h.lvs, value)
}

func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
Expand Down
Loading

0 comments on commit ee4f890

Please sign in to comment.