Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend OTLP: Adding OTLP as a configurable backend for GoStatsD #656

Merged
merged 8 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@
build/dev/data/
/.idea
/.vscode
/.tools
*.toml

# Ensure all testdata changes are included in git adds
!/**/testdata/*
14 changes: 14 additions & 0 deletions examples/opentelemetry/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
version: '3.8'

services:
otel-collector:
image: otel/opentelemetry-collector:0.92.0
container_name: otelcol
statsd-sender:
build: .
container_name: statsd-gen
statsd-server:
image: gostatsd:local
volumes:
- ./gostatsd.toml:/etc/gostatsd/config.toml
30 changes: 30 additions & 0 deletions internal/fixtures/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package fixtures

import (
"io"
"testing"

"github.com/sirupsen/logrus"
)

type writer struct {
tb testing.TB
}

var _ io.Writer = (*writer)(nil)

func (w writer) Write(p []byte) (int, error) {
w.tb.Log(string(p))
return len(p), nil
}

func NewTestLogger(tb testing.TB, opts ...func(logrus.FieldLogger)) logrus.FieldLogger {
l := logrus.New()

for _, opt := range opts {
opt(l)
}
l.SetOutput(writer{tb: tb})

return l
}
2 changes: 2 additions & 0 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/atlassian/gostatsd/pkg/backends/influxdb"
"github.com/atlassian/gostatsd/pkg/backends/newrelic"
"github.com/atlassian/gostatsd/pkg/backends/null"
"github.com/atlassian/gostatsd/pkg/backends/otlp"
"github.com/atlassian/gostatsd/pkg/backends/statsdaemon"
"github.com/atlassian/gostatsd/pkg/backends/stdout"
"github.com/atlassian/gostatsd/pkg/transport"
Expand All @@ -24,6 +25,7 @@ var backends = map[string]gostatsd.BackendFactory{
graphite.BackendName: graphite.NewClientFromViper,
influxdb.BackendName: influxdb.NewClientFromViper,
null.BackendName: null.NewClientFromViper,
otlp.BackendName: otlp.NewClientFromViper,
statsdaemon.BackendName: statsdaemon.NewClientFromViper,
stdout.BackendName: stdout.NewClientFromViper,
cloudwatch.BackendName: cloudwatch.NewClientFromViper,
Expand Down
244 changes: 244 additions & 0 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package otlp

import (
"context"
"fmt"
"math"
"net/http"
"runtime"
"runtime/debug"
"strconv"
"sync/atomic"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"go.uber.org/multierr"

"github.com/atlassian/gostatsd"
"github.com/atlassian/gostatsd/pkg/backends/otlp/internal/data"
"github.com/atlassian/gostatsd/pkg/transport"
)

const (
BackendName = `otlp`
)

// Backend contains additional meta data in order
// to export values as OTLP metrics.
// The zero value is not safe to use.
type Backend struct {
droppedMetrics uint64

endpoint string
convertTimersToGauges bool
is data.InstrumentationScope
resourceKeys gostatsd.Tags
discarded gostatsd.TimerSubtypes

logger logrus.FieldLogger
client *http.Client
sem chan struct{}
}

var _ gostatsd.Backend = (*Backend)(nil)

func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transport.TransportPool) (gostatsd.Backend, error) {
cfg, err := NewConfig(v)
if err != nil {
return nil, err
}

tc, err := pool.Get(cfg.Transport)
if err != nil {
return nil, err
}

version := runtime.Version()
if bi, ok := debug.ReadBuildInfo(); ok {
version = bi.Main.Version
}

return &Backend{
endpoint: cfg.Endpoint,
convertTimersToGauges: cfg.Conversion == ConversionAsGauge,
is: data.NewInstrumentationScope("gostatsd/aggregation", version),
resourceKeys: cfg.ResourceKeys,
discarded: cfg.TimerSubtypes,
client: tc.Client,
logger: logger,
sem: make(chan struct{}, cfg.MaxRequests),
}, nil
}

func (*Backend) Name() string {
return BackendName
}

func (*Backend) SendEvent(ctx context.Context, e *gostatsd.Event) error {
// Events are currently ignored and dropped
return nil
}

func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, cb gostatsd.SendCallback) {
group := make(Group)

mm.Counters.Each(func(name, _ string, cm gostatsd.Counter) {
resources, attributes := splitTagsByKeys(cm.Tags, bd.resourceKeys)

m := data.NewMetric(name).SetSum(
data.NewSum(
data.NewNumberDataPoint(
uint64(cm.Timestamp),
data.WithNumberDataPointMap(attributes),
data.WithNumberDatapointIntValue(cm.Value),
),
),
)
group.Insert(bd.is, resources, m)
})

mm.Gauges.Each(func(name, _ string, gm gostatsd.Gauge) {
resources, attributes := splitTagsByKeys(gm.Tags, bd.resourceKeys)

m := data.NewMetric(name).SetGauge(
data.NewGauge(
data.NewNumberDataPoint(
uint64(gm.Timestamp),
data.WithNumberDataPointMap(attributes),
data.WithNumberDataPointDoubleValue(gm.Value),
),
),
)

group.Insert(bd.is, resources, m)
})

mm.Sets.Each(func(name, _ string, sm gostatsd.Set) {
resources, attributes := splitTagsByKeys(sm.Tags, bd.resourceKeys)

m := data.NewMetric(name).SetGauge(
data.NewGauge(
data.NewNumberDataPoint(
uint64(sm.Timestamp),
data.WithNumberDataPointMap(attributes),
data.WithNumberDatapointIntValue(int64(len(sm.Values))),
),
),
)

group.Insert(bd.is, resources, m)
})

mm.Timers.Each(func(name, _ string, t gostatsd.Timer) {
resources, attributes := splitTagsByKeys(t.Tags, bd.resourceKeys)

switch bd.convertTimersToGauges {
case true:
if len(t.Histogram) != 0 {
btags := data.NewMap()
btags.Merge(attributes)
for boundry, count := range t.Histogram {
if math.IsInf(float64(boundry), 1) {
btags.Insert("le", "+Inf")
} else {
btags.Insert("le", strconv.FormatFloat(float64(boundry), 'f', -1, 64))
}
group.Insert(
bd.is,
resources,
data.NewMetric(fmt.Sprintf("%s.histogram", name)).SetGauge(
data.NewGauge(data.NewNumberDataPoint(
uint64(t.Timestamp),
data.WithNumberDataPointMap(btags),
data.WithNumberDatapointIntValue(int64(count)),
)),
),
)
}
return
}
calcs := []struct {
discarded bool
suffix string
value func(data.NumberDataPoint)
}{
{discarded: bd.discarded.Lower, suffix: "lower", value: data.WithNumberDataPointDoubleValue(t.Min)},
{discarded: bd.discarded.Upper, suffix: "upper", value: data.WithNumberDataPointDoubleValue(t.Max)},
{discarded: bd.discarded.Count, suffix: "count", value: data.WithNumberDatapointIntValue(int64(t.Count))},
{discarded: bd.discarded.CountPerSecond, suffix: "count_ps", value: data.WithNumberDataPointDoubleValue(t.PerSecond)},
{discarded: bd.discarded.Mean, suffix: "mean", value: data.WithNumberDataPointDoubleValue(t.Mean)},
{discarded: bd.discarded.Median, suffix: "median", value: data.WithNumberDataPointDoubleValue(t.Median)},
{discarded: bd.discarded.StdDev, suffix: "std", value: data.WithNumberDataPointDoubleValue(t.StdDev)},
{discarded: bd.discarded.Sum, suffix: "sum", value: data.WithNumberDataPointDoubleValue(t.Sum)},
{discarded: bd.discarded.SumSquares, suffix: "sum_squares", value: data.WithNumberDataPointDoubleValue(t.SumSquares)},
}

for _, calc := range calcs {
if calc.discarded {
continue
}
group.Insert(
bd.is,
resources,
data.NewMetric(fmt.Sprintf("%s.%s", name, calc.suffix)).SetGauge(
data.NewGauge(data.NewNumberDataPoint(
uint64(t.Timestamp),
data.WithNumberDataPointMap(attributes),
calc.value,
)),
),
)
}

for _, pct := range t.Percentiles {
group.Insert(bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, pct.Str)).SetGauge(
data.NewGauge(data.NewNumberDataPoint(
uint64(t.Timestamp),
data.WithNumberDataPointMap(attributes),
data.WithNumberDataPointDoubleValue(pct.Float),
)),
))
}

case false:
// Computing Timers as Histograms
opts := []func(data.HistogramDataPoint){
data.WithHistogramDataPointStatistics(t.Values),
data.WithHistogramDataPointAttributes(attributes),
}
if len(t.Histogram) != 0 {
opts = append(opts, data.WithHistogramDataPointCumulativeBucketValues(t.Histogram))
}
group.Insert(bd.is, resources, data.NewMetric(name).SetHistogram(
data.NewHistogram(data.NewHistogramDataPoint(uint64(t.Timestamp), opts...)),
))
}

})

if err := bd.postMetrics(ctx, group.Values()); err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.endpoint,
}).Error("Issues trying to submit data")
cb(multierr.Errors(err))
}
}

func (c *Backend) postMetrics(ctx context.Context, resourceMetrics []data.ResourceMetrics) error {
req, err := data.NewMetricsRequest(ctx, c.endpoint)
if err != nil {
atomic.AddUint64(&c.droppedMetrics, uint64(len(resourceMetrics)))
return err
}

c.sem <- struct{}{}
resp, err := c.client.Do(req)
MovieStoreGuy marked this conversation as resolved.
Show resolved Hide resolved
<-c.sem
if err != nil {
atomic.AddUint64(&c.droppedMetrics, uint64(len(resourceMetrics)))
return err
}
dropped, err := data.ProcessMetricResponse(resp)
atomic.AddUint64(&c.droppedMetrics, uint64(dropped))
return err
}
Loading
Loading