Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass along upstream and peer metrics configs to libhoney #227

Merged
merged 10 commits into from
Mar 24, 2021
20 changes: 10 additions & 10 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import (
"syscall"
"time"

libhoney "github.com/honeycombio/libhoney-go"
"github.com/honeycombio/libhoney-go/transmission"
statsd "gopkg.in/alexcesaro/statsd.v2"

"github.com/facebookgo/inject"
"github.com/facebookgo/startstop"
libhoney "github.com/honeycombio/libhoney-go"
"github.com/honeycombio/libhoney-go/transmission"
flag "github.com/jessevdk/go-flags"
"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -84,7 +82,7 @@ func main() {
// get desired implementation for each dependency to inject
lgr := logger.GetLoggerImplementation(c)
collector := collect.GetCollectorImplementation(c)
metricsr := metrics.GetMetricsImplementation(c)
metricsConfig := metrics.GetMetricsImplementation(c, "")
shrdr := sharder.GetSharderImplementation(c)
samplerFactory := &sample.SamplerFactory{}

Expand Down Expand Up @@ -117,8 +115,8 @@ func main() {
TLSHandshakeTimeout: 1200 * time.Millisecond,
}

sdUpstream, _ := statsd.New(statsd.Prefix("refinery.upstream"))
sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer"))
upstreamMetricsConfig := metrics.GetMetricsImplementation(c, "libhoney_upstream")
peerMetricsConfig := metrics.GetMetricsImplementation(c, "libhoney_peer")

userAgentAddition := "refinery/" + version
upstreamClient, err := libhoney.NewClient(libhoney.ClientConfig{
Expand All @@ -131,7 +129,7 @@ func main() {
Transport: upstreamTransport,
BlockOnSend: true,
EnableMsgpackEncoding: true,
Metrics: sdUpstream,
Metrics: upstreamMetricsConfig,
},
})
if err != nil {
Expand All @@ -149,7 +147,7 @@ func main() {
Transport: peerTransport,
DisableCompression: !c.GetCompressPeerCommunication(),
EnableMsgpackEncoding: true,
Metrics: sdPeer,
Metrics: peerMetricsConfig,
},
})
if err != nil {
Expand All @@ -168,7 +166,9 @@ func main() {
&inject.Object{Value: &transmit.DefaultTransmission{LibhClient: peerClient, Name: "peer_"}, Name: "peerTransmission"},
&inject.Object{Value: shrdr},
&inject.Object{Value: collector},
&inject.Object{Value: metricsr},
&inject.Object{Value: metricsConfig, Name: "metrics"},
&inject.Object{Value: upstreamMetricsConfig, Name: "upstreamMetrics"},
&inject.Object{Value: peerMetricsConfig, Name: "peerMetrics"},
&inject.Object{Value: version, Name: "version"},
&inject.Object{Value: samplerFactory},
&inject.Object{Value: &a},
Expand Down
2 changes: 1 addition & 1 deletion collect/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (d *DefaultInMemCache) Set(trace *types.Trace) *types.Trace {
if !oldTrace.Sent {
// if it hasn't already been sent,
// record that we're overrunning the buffer
d.Metrics.IncrementCounter("collect_cache_buffer_overrun")
d.Metrics.Increment("collect_cache_buffer_overrun")
// and return the trace so it can be sent.
retTrace = oldTrace
}
Expand Down
12 changes: 6 additions & 6 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,14 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// if the trace has already been sent, just pass along the span
if sentRecord, found := i.sentTraceCache.Get(sp.TraceID); found {
if sr, ok := sentRecord.(*traceSentRecord); ok {
i.Metrics.IncrementCounter("trace_sent_cache_hit")
i.Metrics.Increment("trace_sent_cache_hit")
i.dealWithSentTrace(sr.keep, sr.rate, sp)
return
}
}
// trace hasn't already been sent (or this span is really old); let's
// create a new trace to hold it
i.Metrics.IncrementCounter("trace_accepted")
i.Metrics.Increment("trace_accepted")

timeout, err := i.Config.GetTraceTimeout()
if err != nil {
Expand Down Expand Up @@ -420,9 +420,9 @@ func (i *InMemCollector) send(trace *types.Trace) {
i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds()))
i.Metrics.Histogram("trace_span_count", float64(len(trace.GetSpans())))
if trace.HasRootSpan {
i.Metrics.IncrementCounter("trace_send_has_root")
i.Metrics.Increment("trace_send_has_root")
} else {
i.Metrics.IncrementCounter("trace_send_no_root")
i.Metrics.Increment("trace_send_no_root")
}

var sampler sample.Sampler
Expand All @@ -448,11 +448,11 @@ func (i *InMemCollector) send(trace *types.Trace) {

// if we're supposed to drop this trace, and dry run mode is not enabled, then we're done.
if !shouldSend && !i.Config.GetIsDryRun() {
i.Metrics.IncrementCounter("trace_send_dropped")
i.Metrics.Increment("trace_send_dropped")
i.Logger.Info().WithString("trace_id", trace.TraceID).WithString("dataset", trace.Dataset).Logf("Dropping trace because of sampling, trace to dataset")
return
}
i.Metrics.IncrementCounter("trace_send_kept")
i.Metrics.Increment("trace_send_kept")

// ok, we're not dropping this trace; send all the spans
if i.Config.GetIsDryRun() && !shouldSend {
Expand Down
34 changes: 20 additions & 14 deletions metrics/honeycomb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type HoneycombMetrics struct {
//reportingFreq is the interval with which to report statistics
reportingFreq int64
reportingCancelFunc func()

prefix string
}

type counter struct {
Expand Down Expand Up @@ -229,7 +231,7 @@ func (h *HoneycombMetrics) reportToHoneycommb(ctx context.Context) {
h.countersLock.Lock()
for _, count := range h.counters {
count.lock.Lock()
ev.AddField(count.name, count.val)
ev.AddField(PrefixMetricName(h.prefix, count.name), count.val)
count.val = 0
count.lock.Unlock()
}
Expand All @@ -238,7 +240,7 @@ func (h *HoneycombMetrics) reportToHoneycommb(ctx context.Context) {
h.gaugesLock.Lock()
for _, gauge := range h.gauges {
gauge.lock.Lock()
ev.AddField(gauge.name, gauge.val)
ev.AddField(PrefixMetricName(h.prefix, gauge.name), gauge.val)
// gauges should remain where they are until changed
// gauge.val = 0
gauge.lock.Unlock()
Expand All @@ -253,12 +255,12 @@ func (h *HoneycombMetrics) reportToHoneycommb(ctx context.Context) {
p50Index := int(math.Floor(float64(len(histogram.vals)) * 0.5))
p95Index := int(math.Floor(float64(len(histogram.vals)) * 0.95))
p99Index := int(math.Floor(float64(len(histogram.vals)) * 0.99))
ev.AddField(histogram.name+"_p50", histogram.vals[p50Index])
ev.AddField(histogram.name+"_p95", histogram.vals[p95Index])
ev.AddField(histogram.name+"_p99", histogram.vals[p99Index])
ev.AddField(histogram.name+"_min", histogram.vals[0])
ev.AddField(histogram.name+"_max", histogram.vals[len(histogram.vals)-1])
ev.AddField(histogram.name+"_avg", average(histogram.vals))
ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_p50", histogram.vals[p50Index])
ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_p95", histogram.vals[p95Index])
ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_p99", histogram.vals[p99Index])
ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_min", histogram.vals[0])
ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_max", histogram.vals[len(histogram.vals)-1])
ev.AddField(PrefixMetricName(h.prefix, histogram.name)+"_avg", average(histogram.vals))
histogram.vals = histogram.vals[:0]
}
histogram.lock.Unlock()
Expand Down Expand Up @@ -317,35 +319,39 @@ func (h *HoneycombMetrics) Register(name string, metricType string) {
}
}

func (h *HoneycombMetrics) IncrementCounter(name string) {
func (h *HoneycombMetrics) Count(name string, n interface{}) {
count, ok := h.counters[name]
if !ok {
h.Register(name, "counter")
count = h.counters[name]
}
count.lock.Lock()
defer count.lock.Unlock()
count.val++
count.val = count.val + int(ConvertNumeric(n))
}

func (h *HoneycombMetrics) Increment(name string) {
h.Count(name, 1)
}

func (h *HoneycombMetrics) Gauge(name string, val float64) {
func (h *HoneycombMetrics) Gauge(name string, val interface{}) {
gauge, ok := h.gauges[name]
if !ok {
h.Register(name, "gauge")
gauge = h.gauges[name]
}
gauge.lock.Lock()
defer gauge.lock.Unlock()
gauge.val = val
gauge.val = ConvertNumeric(val)
}

func (h *HoneycombMetrics) Histogram(name string, obs float64) {
func (h *HoneycombMetrics) Histogram(name string, obs interface{}) {
histogram, ok := h.histograms[name]
if !ok {
h.Register(name, "histogram")
histogram = h.histograms[name]
}
histogram.lock.Lock()
defer histogram.lock.Unlock()
histogram.vals = append(histogram.vals, obs)
histogram.vals = append(histogram.vals, ConvertNumeric(obs))
}
51 changes: 45 additions & 6 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
type Metrics interface {
// Register declares a metric; metricType should be one of counter, gauge, histogram
Register(name string, metricType string)
IncrementCounter(name string)
Gauge(name string, val float64)
Histogram(name string, obs float64)
Increment(name string)
Gauge(name string, val interface{})
Count(name string, n interface{})
Histogram(name string, obs interface{})
}

func GetMetricsImplementation(c config.Config) Metrics {
func GetMetricsImplementation(c config.Config, prefix string) Metrics {
var metricsr Metrics
metricsType, err := c.GetMetricsType()
if err != nil {
Expand All @@ -24,12 +25,50 @@ func GetMetricsImplementation(c config.Config) Metrics {
}
switch metricsType {
case "honeycomb":
metricsr = &HoneycombMetrics{}
metricsr = &HoneycombMetrics{prefix: prefix}
case "prometheus":
metricsr = &PromMetrics{}
metricsr = &PromMetrics{prefix: prefix}
default:
fmt.Printf("unknown metrics type %s. Exiting.\n", metricsType)
os.Exit(1)
}
return metricsr
}

func ConvertNumeric(val interface{}) float64 {
switch n := val.(type) {
case int:
return float64(n)
case uint:
return float64(n)
case int64:
return float64(n)
case uint64:
return float64(n)
case int32:
return float64(n)
case uint32:
return float64(n)
case int16:
return float64(n)
case uint16:
return float64(n)
case int8:
return float64(n)
case uint8:
return float64(n)
case float64:
return n
case float32:
return float64(n)
default:
return 0
}
}

func PrefixMetricName(prefix string, name string) string {
if prefix != "" {
return fmt.Sprintf(`%s_%s`, prefix, name)
}
return name
}
16 changes: 11 additions & 5 deletions metrics/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@ func (m *MockMetrics) Register(name string, metricType string) {

m.Registrations[name] = metricType
}
func (m *MockMetrics) IncrementCounter(name string) {
func (m *MockMetrics) Increment(name string) {
m.lock.Lock()
defer m.lock.Unlock()

m.CounterIncrements[name] += 1
}
func (m *MockMetrics) Gauge(name string, val float64) {
func (m *MockMetrics) Gauge(name string, val interface{}) {
m.lock.Lock()
defer m.lock.Unlock()

m.GaugeRecords[name] = val
m.GaugeRecords[name] = ConvertNumeric(val)
}
func (m *MockMetrics) Histogram(name string, obs float64) {
func (m *MockMetrics) Count(name string, val interface{}) {
m.lock.Lock()
defer m.lock.Unlock()

m.CounterIncrements[name] += int(ConvertNumeric(val))
}
func (m *MockMetrics) Histogram(name string, val interface{}) {
m.lock.Lock()
defer m.lock.Unlock()

_, ok := m.Histograms[name]
if !ok {
m.Histograms[name] = make([]float64, 0)
}
m.Histograms[name] = append(m.Histograms[name], obs)
m.Histograms[name] = append(m.Histograms[name], ConvertNumeric(val))
}
7 changes: 4 additions & 3 deletions metrics/nullmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type NullMetrics struct{}
func (n *NullMetrics) Start() {}

func (n *NullMetrics) Register(name string, metricType string) {}
func (n *NullMetrics) IncrementCounter(name string) {}
func (n *NullMetrics) Gauge(name string, val float64) {}
func (n *NullMetrics) Histogram(name string, obs float64) {}
func (n *NullMetrics) Increment(name string) {}
func (n *NullMetrics) Gauge(name string, val interface{}) {}
func (n *NullMetrics) Count(name string, val interface{}) {}
func (n *NullMetrics) Histogram(name string, obs interface{}) {}
22 changes: 17 additions & 5 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type PromMetrics struct {
// them by name
metrics map[string]interface{}
lock sync.Mutex

prefix string
}

func (p *PromMetrics) Start() error {
Expand Down Expand Up @@ -56,41 +58,51 @@ func (p *PromMetrics) Register(name string, metricType string) {
case "counter":
newmet = promauto.NewCounter(prometheus.CounterOpts{
Name: name,
Namespace: p.prefix,
Help: name,
})
case "gauge":
newmet = promauto.NewGauge(prometheus.GaugeOpts{
Name: name,
Namespace: p.prefix,
Help: name,
})
case "histogram":
newmet = promauto.NewHistogram(prometheus.HistogramOpts{
Name: name,
Namespace: p.prefix,
Help: name,
})
}

p.metrics[name] = newmet
}

func (p *PromMetrics) IncrementCounter(name string) {
func (p *PromMetrics) Increment(name string) {
if counterIface, ok := p.metrics[name]; ok {
if counter, ok := counterIface.(prometheus.Counter); ok {
counter.Inc()
}
}
}
func (p *PromMetrics) Gauge(name string, val float64) {
func (p *PromMetrics) Count(name string, n interface{}) {
if counterIface, ok := p.metrics[name]; ok {
if counter, ok := counterIface.(prometheus.Counter); ok {
counter.Add(ConvertNumeric(n))
}
}
}
func (p *PromMetrics) Gauge(name string, val interface{}) {
if gaugeIface, ok := p.metrics[name]; ok {
if gauge, ok := gaugeIface.(prometheus.Gauge); ok {
gauge.Set(val)
gauge.Set(ConvertNumeric(val))
}
}
}
func (p *PromMetrics) Histogram(name string, obs float64) {
func (p *PromMetrics) Histogram(name string, obs interface{}) {
if histIface, ok := p.metrics[name]; ok {
if hist, ok := histIface.(prometheus.Histogram); ok {
hist.Observe(obs)
hist.Observe(ConvertNumeric(obs))
}
}
}
Loading