From 7c3f4b0bc157b18e64afd039fa686caafab98978 Mon Sep 17 00:00:00 2001 From: Vera Reynolds Date: Mon, 15 Mar 2021 11:03:52 -0600 Subject: [PATCH 1/9] wip: switch libhoney to using the refinery configured metrics --- cmd/refinery/main.go | 18 +++++++------- collect/cache/cache.go | 2 +- collect/collect.go | 12 ++++----- metrics/honeycomb.go | 34 +++++++++++++++----------- metrics/metrics.go | 51 ++++++++++++++++++++++++++++++++++----- metrics/mock.go | 2 +- metrics/nullmetrics.go | 2 +- metrics/prometheus.go | 22 +++++++++++++---- route/proxy.go | 2 +- route/route.go | 12 ++++----- sample/dynamic.go | 4 +-- sample/dynamic_ema.go | 4 +-- sample/rules.go | 8 +++--- sample/totalthroughput.go | 4 +-- transmit/transmit.go | 8 +++--- 15 files changed, 121 insertions(+), 64 deletions(-) diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index 9738cc2e1a..c1c5e235a7 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -9,12 +9,10 @@ import ( "syscall" "time" - libhoney "github.com/honeycombio/libhoney-go" - "github.com/honeycombio/libhoney-go/transmission" - statsd "gopkg.in/alexcesaro/statsd.v2" - "github.com/facebookgo/inject" "github.com/facebookgo/startstop" + libhoney "github.com/honeycombio/libhoney-go" + "github.com/honeycombio/libhoney-go/transmission" flag "github.com/jessevdk/go-flags" "github.com/sirupsen/logrus" @@ -84,7 +82,7 @@ func main() { // get desired implementation for each dependency to inject lgr := logger.GetLoggerImplementation(c) collector := collect.GetCollectorImplementation(c) - metricsr := metrics.GetMetricsImplementation(c) + metricsr := metrics.GetMetricsImplementation(c, "") shrdr := sharder.GetSharderImplementation(c) samplerFactory := &sample.SamplerFactory{} @@ -117,8 +115,10 @@ func main() { TLSHandshakeTimeout: 1200 * time.Millisecond, } - sdUpstream, _ := statsd.New(statsd.Prefix("refinery.upstream")) - sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer")) + metricsLibUpstream := metrics.GetMetricsImplementation(c, "libhoney_upstream") + metricsLibPeer := metrics.GetMetricsImplementation(c, "libhoney_peer") + //sdUpstream, _ := statsd.New(statsd.Prefix("refinery.upstream")) + //sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer")) userAgentAddition := "refinery/" + version upstreamClient, err := libhoney.NewClient(libhoney.ClientConfig{ @@ -131,7 +131,7 @@ func main() { Transport: upstreamTransport, BlockOnSend: true, EnableMsgpackEncoding: true, - Metrics: sdUpstream, + Metrics: metricsLibUpstream, }, }) if err != nil { @@ -149,7 +149,7 @@ func main() { Transport: peerTransport, DisableCompression: !c.GetCompressPeerCommunication(), EnableMsgpackEncoding: true, - Metrics: sdPeer, + Metrics: metricsLibPeer, }, }) if err != nil { diff --git a/collect/cache/cache.go b/collect/cache/cache.go index 91a457e0ae..69bf691b12 100644 --- a/collect/cache/cache.go +++ b/collect/cache/cache.go @@ -102,7 +102,7 @@ func (d *DefaultInMemCache) Set(trace *types.Trace) *types.Trace { if !oldTrace.Sent { // if it hasn't already been sent, // record that we're overrunning the buffer - d.Metrics.IncrementCounter("collect_cache_buffer_overrun") + d.Metrics.Increment("collect_cache_buffer_overrun") // and return the trace so it can be sent. retTrace = oldTrace } diff --git a/collect/collect.go b/collect/collect.go index c505bbe711..503fbc4401 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -318,14 +318,14 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // if the trace has already been sent, just pass along the span if sentRecord, found := i.sentTraceCache.Get(sp.TraceID); found { if sr, ok := sentRecord.(*traceSentRecord); ok { - i.Metrics.IncrementCounter("trace_sent_cache_hit") + i.Metrics.Increment("trace_sent_cache_hit") i.dealWithSentTrace(sr.keep, sr.rate, sp) return } } // trace hasn't already been sent (or this span is really old); let's // create a new trace to hold it - i.Metrics.IncrementCounter("trace_accepted") + i.Metrics.Increment("trace_accepted") timeout, err := i.Config.GetTraceTimeout() if err != nil { @@ -420,9 +420,9 @@ func (i *InMemCollector) send(trace *types.Trace) { i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds())) i.Metrics.Histogram("trace_span_count", float64(len(trace.GetSpans()))) if trace.HasRootSpan { - i.Metrics.IncrementCounter("trace_send_has_root") + i.Metrics.Increment("trace_send_has_root") } else { - i.Metrics.IncrementCounter("trace_send_no_root") + i.Metrics.Increment("trace_send_no_root") } var sampler sample.Sampler @@ -448,11 +448,11 @@ func (i *InMemCollector) send(trace *types.Trace) { // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. if !shouldSend && !i.Config.GetIsDryRun() { - i.Metrics.IncrementCounter("trace_send_dropped") + i.Metrics.Increment("trace_send_dropped") i.Logger.Info().WithString("trace_id", trace.TraceID).WithString("dataset", trace.Dataset).Logf("Dropping trace because of sampling, trace to dataset") return } - i.Metrics.IncrementCounter("trace_send_kept") + i.Metrics.Increment("trace_send_kept") // ok, we're not dropping this trace; send all the spans if i.Config.GetIsDryRun() && !shouldSend { diff --git a/metrics/honeycomb.go b/metrics/honeycomb.go index 3e89d635ab..de226a0f7a 100644 --- a/metrics/honeycomb.go +++ b/metrics/honeycomb.go @@ -38,6 +38,8 @@ type HoneycombMetrics struct { //reportingFreq is the interval with which to report statistics reportingFreq int64 reportingCancelFunc func() + + prefix string } type counter struct { @@ -229,7 +231,7 @@ func (h *HoneycombMetrics) reportToHoneycommb(ctx context.Context) { h.countersLock.Lock() for _, count := range h.counters { count.lock.Lock() - ev.AddField(count.name, count.val) + ev.AddField(PrefixMetricName(h.prefix, count.name), count.val) count.val = 0 count.lock.Unlock() } @@ -238,7 +240,7 @@ func (h *HoneycombMetrics) reportToHoneycommb(ctx context.Context) { h.gaugesLock.Lock() for _, gauge := range h.gauges { gauge.lock.Lock() - ev.AddField(gauge.name, gauge.val) + ev.AddField(PrefixMetricName(h.prefix, gauge.name), gauge.val) // gauges should remain where they are until changed // gauge.val = 0 gauge.lock.Unlock() @@ -253,12 +255,12 @@ func (h *HoneycombMetrics) reportToHoneycommb(ctx context.Context) { p50Index := int(math.Floor(float64(len(histogram.vals)) * 0.5)) p95Index := int(math.Floor(float64(len(histogram.vals)) * 0.95)) p99Index := int(math.Floor(float64(len(histogram.vals)) * 0.99)) - ev.AddField(histogram.name+"_p50", histogram.vals[p50Index]) - ev.AddField(histogram.name+"_p95", histogram.vals[p95Index]) - ev.AddField(histogram.name+"_p99", histogram.vals[p99Index]) - ev.AddField(histogram.name+"_min", histogram.vals[0]) - ev.AddField(histogram.name+"_max", histogram.vals[len(histogram.vals)-1]) - ev.AddField(histogram.name+"_avg", average(histogram.vals)) + ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_p50", histogram.vals[p50Index]) + ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_p95", histogram.vals[p95Index]) + ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_p99", histogram.vals[p99Index]) + ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_min", histogram.vals[0]) + ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_max", histogram.vals[len(histogram.vals)-1]) + ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_avg", average(histogram.vals)) histogram.vals = histogram.vals[:0] } histogram.lock.Unlock() @@ -317,7 +319,7 @@ func (h *HoneycombMetrics) Register(name string, metricType string) { } } -func (h *HoneycombMetrics) IncrementCounter(name string) { +func (h *HoneycombMetrics) Count(name string, n interface{}) { count, ok := h.counters[name] if !ok { h.Register(name, "counter") @@ -325,10 +327,14 @@ func (h *HoneycombMetrics) IncrementCounter(name string) { } count.lock.Lock() defer count.lock.Unlock() - count.val++ + count.val = count.val + int(ConvertNumeric(n)) +} + +func (h *HoneycombMetrics) Increment(name string) { + h.Count(name, 1) } -func (h *HoneycombMetrics) Gauge(name string, val float64) { +func (h *HoneycombMetrics) Gauge(name string, val interface{}) { gauge, ok := h.gauges[name] if !ok { h.Register(name, "gauge") @@ -336,10 +342,10 @@ func (h *HoneycombMetrics) Gauge(name string, val float64) { } gauge.lock.Lock() defer gauge.lock.Unlock() - gauge.val = val + gauge.val = ConvertNumeric(val) } -func (h *HoneycombMetrics) Histogram(name string, obs float64) { +func (h *HoneycombMetrics) Histogram(name string, obs interface{}) { histogram, ok := h.histograms[name] if !ok { h.Register(name, "histogram") @@ -347,5 +353,5 @@ func (h *HoneycombMetrics) Histogram(name string, obs float64) { } histogram.lock.Lock() defer histogram.lock.Unlock() - histogram.vals = append(histogram.vals, obs) + histogram.vals = append(histogram.vals, ConvertNumeric(obs)) } diff --git a/metrics/metrics.go b/metrics/metrics.go index e40d02837b..4d41d63807 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -10,12 +10,13 @@ import ( type Metrics interface { // Register declares a metric; metricType should be one of counter, gauge, histogram Register(name string, metricType string) - IncrementCounter(name string) - Gauge(name string, val float64) - Histogram(name string, obs float64) + Increment(name string) + Gauge(name string, val interface{}) + Count(name string, n interface{}) + Histogram(name string, obs interface{}) } -func GetMetricsImplementation(c config.Config) Metrics { +func GetMetricsImplementation(c config.Config, prefix string) Metrics { var metricsr Metrics metricsType, err := c.GetMetricsType() if err != nil { @@ -24,12 +25,50 @@ func GetMetricsImplementation(c config.Config) Metrics { } switch metricsType { case "honeycomb": - metricsr = &HoneycombMetrics{} + metricsr = &HoneycombMetrics{prefix: prefix} case "prometheus": - metricsr = &PromMetrics{} + metricsr = &PromMetrics{prefix: prefix} default: fmt.Printf("unknown metrics type %s. Exiting.\n", metricsType) os.Exit(1) } return metricsr } + +func ConvertNumeric(val interface{}) float64 { + switch n := val.(type) { + case int: + return float64(n) + case uint: + return float64(n) + case int64: + return float64(n) + case uint64: + return float64(n) + case int32: + return float64(n) + case uint32: + return float64(n) + case int16: + return float64(n) + case uint16: + return float64(n) + case int8: + return float64(n) + case uint8: + return float64(n) + case float64: + return n + case float32: + return float64(n) + default: + return 0 + } +} + +func PrefixMetricName(prefix string, name string) string { + if prefix != "" { + return fmt.Sprintf(`%s_%s`, prefix, name) + } + return name +} diff --git a/metrics/mock.go b/metrics/mock.go index 496a319585..16a6ce389c 100644 --- a/metrics/mock.go +++ b/metrics/mock.go @@ -27,7 +27,7 @@ func (m *MockMetrics) Register(name string, metricType string) { m.Registrations[name] = metricType } -func (m *MockMetrics) IncrementCounter(name string) { +func (m *MockMetrics) Increment(name string) { m.lock.Lock() defer m.lock.Unlock() diff --git a/metrics/nullmetrics.go b/metrics/nullmetrics.go index cc0985076a..6985f1df41 100644 --- a/metrics/nullmetrics.go +++ b/metrics/nullmetrics.go @@ -7,6 +7,6 @@ type NullMetrics struct{} func (n *NullMetrics) Start() {} func (n *NullMetrics) Register(name string, metricType string) {} -func (n *NullMetrics) IncrementCounter(name string) {} +func (n *NullMetrics) Increment(name string) {} func (n *NullMetrics) Gauge(name string, val float64) {} func (n *NullMetrics) Histogram(name string, obs float64) {} diff --git a/metrics/prometheus.go b/metrics/prometheus.go index 88fff63425..24c3df836f 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -20,6 +20,8 @@ type PromMetrics struct { // them by name metrics map[string]interface{} lock sync.Mutex + + prefix string } func (p *PromMetrics) Start() error { @@ -56,16 +58,19 @@ func (p *PromMetrics) Register(name string, metricType string) { case "counter": newmet = promauto.NewCounter(prometheus.CounterOpts{ Name: name, + Namespace: p.prefix, Help: name, }) case "gauge": newmet = promauto.NewGauge(prometheus.GaugeOpts{ Name: name, + Namespace: p.prefix, Help: name, }) case "histogram": newmet = promauto.NewHistogram(prometheus.HistogramOpts{ Name: name, + Namespace: p.prefix, Help: name, }) } @@ -73,24 +78,31 @@ func (p *PromMetrics) Register(name string, metricType string) { p.metrics[name] = newmet } -func (p *PromMetrics) IncrementCounter(name string) { +func (p *PromMetrics) Increment(name string) { if counterIface, ok := p.metrics[name]; ok { if counter, ok := counterIface.(prometheus.Counter); ok { counter.Inc() } } } -func (p *PromMetrics) Gauge(name string, val float64) { +func (p *PromMetrics) Count(name string, n interface{}) { + if counterIface, ok := p.metrics[name]; ok { + if counter, ok := counterIface.(prometheus.Counter); ok { + counter.Add(ConvertNumeric(n)) + } + } +} +func (p *PromMetrics) Gauge(name string, val interface{}) { if gaugeIface, ok := p.metrics[name]; ok { if gauge, ok := gaugeIface.(prometheus.Gauge); ok { - gauge.Set(val) + gauge.Set(ConvertNumeric(val)) } } } -func (p *PromMetrics) Histogram(name string, obs float64) { +func (p *PromMetrics) Histogram(name string, obs interface{}) { if histIface, ok := p.metrics[name]; ok { if hist, ok := histIface.(prometheus.Histogram); ok { - hist.Observe(obs) + hist.Observe(ConvertNumeric(obs)) } } } diff --git a/route/proxy.go b/route/proxy.go index 7916a4f771..354acd2571 100644 --- a/route/proxy.go +++ b/route/proxy.go @@ -12,7 +12,7 @@ import ( // response, blocking until it gets one. This is used for all non-event traffic // (eg team api key verification, markers, etc.) func (r *Router) proxy(w http.ResponseWriter, req *http.Request) { - r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_proxied") + r.Metrics.Increment(r.incomingOrPeer + "_router_proxied") r.Logger.Debug().Logf("proxying request for %s", req.URL.Path) upstreamTarget, err := r.Config.GetHoneycombAPI() if err != nil { diff --git a/route/route.go b/route/route.go index e1f38e1710..427618936f 100644 --- a/route/route.go +++ b/route/route.go @@ -257,7 +257,7 @@ func (r *Router) debugTrace(w http.ResponseWriter, req *http.Request) { // event is handler for /1/event/ func (r *Router) event(w http.ResponseWriter, req *http.Request) { - r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_event") + r.Metrics.Increment(r.incomingOrPeer + "_router_event") defer req.Body.Close() bodyReader, err := r.getMaybeCompressedBody(req) @@ -322,7 +322,7 @@ func (r *Router) requestToEvent(req *http.Request, reqBod []byte) (*types.Event, } func (r *Router) batch(w http.ResponseWriter, req *http.Request) { - r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_batch") + r.Metrics.Increment(r.incomingOrPeer + "_router_batch") defer req.Body.Close() reqID := req.Context().Value(types.RequestIDContextKey{}) @@ -514,7 +514,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { } if traceID == "" { // not part of a trace. send along upstream - r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_nonspan") + r.Metrics.Increment(r.incomingOrPeer + "_router_nonspan") debugLog.WithString("api_host", ev.APIHost). WithString("dataset", ev.Dataset). Logf("sending non-trace event from batch") @@ -526,7 +526,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { // ok, we're a span. Figure out if we should handle locally or pass on to a peer targetShard := r.Sharder.WhichShard(traceID) if r.incomingOrPeer == "incoming" && !targetShard.Equals(r.Sharder.MyShard()) { - r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_peer") + r.Metrics.Increment(r.incomingOrPeer + "_router_peer") debugLog.WithString("peer", targetShard.GetAddress()). Logf("Sending span from batch to my peer") ev.APIHost = targetShard.GetAddress() @@ -550,12 +550,12 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { err = r.Collector.AddSpanFromPeer(span) } if err != nil { - r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_dropped") + r.Metrics.Increment(r.incomingOrPeer + "_router_dropped") debugLog.Logf("Dropping span from batch, channel full") return err } - r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_span") + r.Metrics.Increment(r.incomingOrPeer + "_router_span") debugLog.Logf("Accepting span from batch for collection into a trace") return nil } diff --git a/sample/dynamic.go b/sample/dynamic.go index 1192ae3132..eda9e3d3ec 100644 --- a/sample/dynamic.go +++ b/sample/dynamic.go @@ -64,9 +64,9 @@ func (d *DynamicSampler) GetSampleRate(trace *types.Trace) (uint, bool) { "trace_id": trace.TraceID, }).Logf("got sample rate and decision") if shouldKeep { - d.Metrics.IncrementCounter("dynsampler_num_kept") + d.Metrics.Increment("dynsampler_num_kept") } else { - d.Metrics.IncrementCounter("dynsampler_num_dropped") + d.Metrics.Increment("dynsampler_num_dropped") } d.Metrics.Histogram("dynsampler_sample_rate", float64(rate)) return uint(rate), shouldKeep diff --git a/sample/dynamic_ema.go b/sample/dynamic_ema.go index f36ac5bcd3..1fd44d8b21 100644 --- a/sample/dynamic_ema.go +++ b/sample/dynamic_ema.go @@ -76,9 +76,9 @@ func (d *EMADynamicSampler) GetSampleRate(trace *types.Trace) (uint, bool) { "trace_id": trace.TraceID, }).Logf("got sample rate and decision") if shouldKeep { - d.Metrics.IncrementCounter("dynsampler_num_kept") + d.Metrics.Increment("dynsampler_num_kept") } else { - d.Metrics.IncrementCounter("dynsampler_num_dropped") + d.Metrics.Increment("dynsampler_num_dropped") } d.Metrics.Histogram("dynsampler_sample_rate", float64(rate)) return uint(rate), shouldKeep diff --git a/sample/rules.go b/sample/rules.go index 90000f03f7..265565be4b 100644 --- a/sample/rules.go +++ b/sample/rules.go @@ -41,9 +41,9 @@ func (s *RulesBasedSampler) GetSampleRate(trace *types.Trace) (rate uint, keep b if rule.Condition == nil { s.Metrics.Histogram("rulessampler_sample_rate", float64(rule.SampleRate)) if keep { - s.Metrics.IncrementCounter("rulessampler_num_kept") + s.Metrics.Increment("rulessampler_num_kept") } else { - s.Metrics.IncrementCounter("dynsampler_num_dropped") + s.Metrics.Increment("dynsampler_num_dropped") } logger.WithFields(map[string]interface{}{ "rate": rate, @@ -122,9 +122,9 @@ func (s *RulesBasedSampler) GetSampleRate(trace *types.Trace) (rate uint, keep b if matched == len(rule.Condition) { s.Metrics.Histogram("rulessampler_sample_rate", float64(rule.SampleRate)) if keep { - s.Metrics.IncrementCounter("rulessampler_num_kept") + s.Metrics.Increment("rulessampler_num_kept") } else { - s.Metrics.IncrementCounter("dynsampler_num_dropped") + s.Metrics.Increment("dynsampler_num_dropped") } logger.WithFields(map[string]interface{}{ "rate": rate, diff --git a/sample/totalthroughput.go b/sample/totalthroughput.go index 3752f34d25..d31ca68b81 100644 --- a/sample/totalthroughput.go +++ b/sample/totalthroughput.go @@ -67,9 +67,9 @@ func (d *TotalThroughputSampler) GetSampleRate(trace *types.Trace) (uint, bool) "trace_id": trace.TraceID, }).Logf("got sample rate and decision") if shouldKeep { - d.Metrics.IncrementCounter("dynsampler_num_kept") + d.Metrics.Increment("dynsampler_num_kept") } else { - d.Metrics.IncrementCounter("dynsampler_num_dropped") + d.Metrics.Increment("dynsampler_num_dropped") } d.Metrics.Histogram("dynsampler_sample_rate", float64(rate)) return uint(rate), shouldKeep diff --git a/transmit/transmit.go b/transmit/transmit.go index 8bf87ae29f..ab811ed1e0 100644 --- a/transmit/transmit.go +++ b/transmit/transmit.go @@ -105,7 +105,7 @@ func (d *DefaultTransmission) EnqueueEvent(ev *types.Event) { err := libhEv.SendPresampled() if err != nil { - d.Metrics.IncrementCounter(d.Name + counterEnqueueErrors) + d.Metrics.Increment(d.Name + counterEnqueueErrors) d.Logger.Error(). WithString("error", err.Error()). WithField("request_id", ev.Context.Value(types.RequestIDContextKey{})). @@ -164,13 +164,13 @@ func (d *DefaultTransmission) processResponses( if honeycombAPI == apiHost { // if the API host matches the configured honeycomb API, // count it as an API error - d.Metrics.IncrementCounter(d.Name + counterResponseErrorsAPI) + d.Metrics.Increment(d.Name + counterResponseErrorsAPI) } else { // otherwise, it's probably a peer error - d.Metrics.IncrementCounter(d.Name + counterResponseErrorsPeer) + d.Metrics.Increment(d.Name + counterResponseErrorsPeer) } } else { - d.Metrics.IncrementCounter(d.Name + counterResponse20x) + d.Metrics.Increment(d.Name + counterResponse20x) } case <-ctx.Done(): return From d45db6e16a60eba3438ef08aad186f22aaeb3be4 Mon Sep 17 00:00:00 2001 From: Vera Reynolds Date: Tue, 16 Mar 2021 10:30:38 -0600 Subject: [PATCH 2/9] wip: add metrics instances to injection graph --- cmd/refinery/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index c1c5e235a7..8fe587b805 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -168,6 +168,9 @@ func main() { &inject.Object{Value: &transmit.DefaultTransmission{LibhClient: peerClient, Name: "peer_"}, Name: "peerTransmission"}, &inject.Object{Value: shrdr}, &inject.Object{Value: collector}, + &inject.Object{Value: metricsr, Name: "foo"}, + &inject.Object{Value: metricsLibPeer, Name: "bar"}, + &inject.Object{Value: metricsLibUpstream, Name: "foof"}, &inject.Object{Value: metricsr}, &inject.Object{Value: version, Name: "version"}, &inject.Object{Value: samplerFactory}, From e3e323bb2f9f8a387d0ad92ebea6ad7b08e2af6b Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 22 Mar 2021 11:03:11 +0000 Subject: [PATCH 3/9] tidy up upstread and peer metrics setup --- cmd/refinery/main.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index 8fe587b805..effacd60e7 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -82,7 +82,7 @@ func main() { // get desired implementation for each dependency to inject lgr := logger.GetLoggerImplementation(c) collector := collect.GetCollectorImplementation(c) - metricsr := metrics.GetMetricsImplementation(c, "") + metricsConfig := metrics.GetMetricsImplementation(c, "") shrdr := sharder.GetSharderImplementation(c) samplerFactory := &sample.SamplerFactory{} @@ -115,10 +115,8 @@ func main() { TLSHandshakeTimeout: 1200 * time.Millisecond, } - metricsLibUpstream := metrics.GetMetricsImplementation(c, "libhoney_upstream") - metricsLibPeer := metrics.GetMetricsImplementation(c, "libhoney_peer") - //sdUpstream, _ := statsd.New(statsd.Prefix("refinery.upstream")) - //sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer")) + upstreamMetricsConfig := metrics.GetMetricsImplementation(c, "libhoney_upstream") + peerMetricsConfig := metrics.GetMetricsImplementation(c, "libhoney_peer") userAgentAddition := "refinery/" + version upstreamClient, err := libhoney.NewClient(libhoney.ClientConfig{ @@ -131,7 +129,7 @@ func main() { Transport: upstreamTransport, BlockOnSend: true, EnableMsgpackEncoding: true, - Metrics: metricsLibUpstream, + Metrics: upstreamMetricsConfig, }, }) if err != nil { @@ -149,7 +147,7 @@ func main() { Transport: peerTransport, DisableCompression: !c.GetCompressPeerCommunication(), EnableMsgpackEncoding: true, - Metrics: metricsLibPeer, + Metrics: peerMetricsConfig, }, }) if err != nil { @@ -168,10 +166,9 @@ func main() { &inject.Object{Value: &transmit.DefaultTransmission{LibhClient: peerClient, Name: "peer_"}, Name: "peerTransmission"}, &inject.Object{Value: shrdr}, &inject.Object{Value: collector}, - &inject.Object{Value: metricsr, Name: "foo"}, - &inject.Object{Value: metricsLibPeer, Name: "bar"}, - &inject.Object{Value: metricsLibUpstream, Name: "foof"}, - &inject.Object{Value: metricsr}, + &inject.Object{Value: metricsConfig, Name: "metrics"}, + &inject.Object{Value: upstreamMetricsConfig, Name: "upstreamMetrics"}, + &inject.Object{Value: peerMetricsConfig, Name: "peerMetrics"}, &inject.Object{Value: version, Name: "version"}, &inject.Object{Value: samplerFactory}, &inject.Object{Value: &a}, From f95e964824bb1a21299f69bbbb5f26cfd0af5e14 Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 22 Mar 2021 11:21:13 +0000 Subject: [PATCH 4/9] add missing Count func to NullMetrics --- metrics/nullmetrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metrics/nullmetrics.go b/metrics/nullmetrics.go index 6985f1df41..507e179ac9 100644 --- a/metrics/nullmetrics.go +++ b/metrics/nullmetrics.go @@ -9,4 +9,5 @@ func (n *NullMetrics) Start() {} func (n *NullMetrics) Register(name string, metricType string) {} func (n *NullMetrics) Increment(name string) {} func (n *NullMetrics) Gauge(name string, val float64) {} +func (n *NullMetrics) Count(name string, n interface{}) {} func (n *NullMetrics) Histogram(name string, obs float64) {} From cdcee47600ec3670e6e8cf64b0c21b79532d8344 Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 22 Mar 2021 11:23:25 +0000 Subject: [PATCH 5/9] fix param names in NullMetrics --- metrics/nullmetrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metrics/nullmetrics.go b/metrics/nullmetrics.go index 507e179ac9..49dab20501 100644 --- a/metrics/nullmetrics.go +++ b/metrics/nullmetrics.go @@ -9,5 +9,5 @@ func (n *NullMetrics) Start() {} func (n *NullMetrics) Register(name string, metricType string) {} func (n *NullMetrics) Increment(name string) {} func (n *NullMetrics) Gauge(name string, val float64) {} -func (n *NullMetrics) Count(name string, n interface{}) {} -func (n *NullMetrics) Histogram(name string, obs float64) {} +func (n *NullMetrics) Count(name string, val interface{}) {} +func (n *NullMetrics) Histogram(name string, val float64) {} From ad41e5e6e57d7f4ded15cc8203ec89f079ff27b0 Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 22 Mar 2021 11:33:26 +0000 Subject: [PATCH 6/9] fix derp on param names --- metrics/nullmetrics.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metrics/nullmetrics.go b/metrics/nullmetrics.go index 49dab20501..577eabb325 100644 --- a/metrics/nullmetrics.go +++ b/metrics/nullmetrics.go @@ -8,6 +8,6 @@ func (n *NullMetrics) Start() {} func (n *NullMetrics) Register(name string, metricType string) {} func (n *NullMetrics) Increment(name string) {} -func (n *NullMetrics) Gauge(name string, val float64) {} -func (n *NullMetrics) Count(name string, val interface{}) {} -func (n *NullMetrics) Histogram(name string, val float64) {} +func (n *NullMetrics) Gauge(name string, val interface{}) {} +func (n *NullMetrics) Count(name string, n interface{}) {} +func (n *NullMetrics) Histogram(name string, obs interface{}) {} From fa2272e55cb42d5477878e892e27128693d8e49f Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 22 Mar 2021 11:46:04 +0000 Subject: [PATCH 7/9] de-dupe NullMetrics Count param name --- metrics/nullmetrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/nullmetrics.go b/metrics/nullmetrics.go index 577eabb325..18d7eb5099 100644 --- a/metrics/nullmetrics.go +++ b/metrics/nullmetrics.go @@ -9,5 +9,5 @@ func (n *NullMetrics) Start() {} func (n *NullMetrics) Register(name string, metricType string) {} func (n *NullMetrics) Increment(name string) {} func (n *NullMetrics) Gauge(name string, val interface{}) {} -func (n *NullMetrics) Count(name string, n interface{}) {} +func (n *NullMetrics) Count(name string, val interface{}) {} func (n *NullMetrics) Histogram(name string, obs interface{}) {} From cf76670db1eb8ebd251fe83f308100b8d64fd01a Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 22 Mar 2021 11:46:21 +0000 Subject: [PATCH 8/9] add Count func to metrics/mock --- metrics/mock.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/metrics/mock.go b/metrics/mock.go index 16a6ce389c..f53490d1d4 100644 --- a/metrics/mock.go +++ b/metrics/mock.go @@ -39,6 +39,12 @@ func (m *MockMetrics) Gauge(name string, val float64) { m.GaugeRecords[name] = val } +func (m *MockMetrics) Count(name string, n interface{}) { + m.lock.Lock() + defer m.lock.Unlock() + + m.CounterIncrements[name] += int(ConvertNumeric(n)) +} func (m *MockMetrics) Histogram(name string, obs float64) { m.lock.Lock() defer m.lock.Unlock() From b0f3c18c2eba783fdb90ab47e786d4adadd4c0e3 Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 22 Mar 2021 11:52:30 +0000 Subject: [PATCH 9/9] fix up remaining MockMetrics tests --- metrics/mock.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metrics/mock.go b/metrics/mock.go index f53490d1d4..a2d6e9dbb6 100644 --- a/metrics/mock.go +++ b/metrics/mock.go @@ -33,19 +33,19 @@ func (m *MockMetrics) Increment(name string) { m.CounterIncrements[name] += 1 } -func (m *MockMetrics) Gauge(name string, val float64) { +func (m *MockMetrics) Gauge(name string, val interface{}) { m.lock.Lock() defer m.lock.Unlock() - m.GaugeRecords[name] = val + m.GaugeRecords[name] = ConvertNumeric(val) } -func (m *MockMetrics) Count(name string, n interface{}) { +func (m *MockMetrics) Count(name string, val interface{}) { m.lock.Lock() defer m.lock.Unlock() - m.CounterIncrements[name] += int(ConvertNumeric(n)) + m.CounterIncrements[name] += int(ConvertNumeric(val)) } -func (m *MockMetrics) Histogram(name string, obs float64) { +func (m *MockMetrics) Histogram(name string, val interface{}) { m.lock.Lock() defer m.lock.Unlock() @@ -53,5 +53,5 @@ func (m *MockMetrics) Histogram(name string, obs float64) { if !ok { m.Histograms[name] = make([]float64, 0) } - m.Histograms[name] = append(m.Histograms[name], obs) + m.Histograms[name] = append(m.Histograms[name], ConvertNumeric(val)) }