Skip to content

Commit

Permalink
feat(serializers.prometheusremotewrite): Log metric conversion errors (
Browse files Browse the repository at this point in the history
  • Loading branch information
hagen1778 authored Oct 1, 2024
1 parent 135fca5 commit 7df29b0
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 7 deletions.
1 change: 1 addition & 0 deletions plugins/serializers/prometheusremotewrite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ it is not included in the final metric name.
Prometheus labels are produced for each tag.

**Note:** String fields are ignored and do not produce Prometheus metrics.
Set **log_level** to `trace` to see all serialization issues.
36 changes: 32 additions & 4 deletions plugins/serializers/prometheusremotewrite/prometheusremotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@ import (
type MetricKey uint64

type Serializer struct {
SortMetrics bool `toml:"prometheus_sort_metrics"`
StringAsLabel bool `toml:"prometheus_string_as_label"`
SortMetrics bool `toml:"prometheus_sort_metrics"`
StringAsLabel bool `toml:"prometheus_string_as_label"`
Log telegraf.Logger `toml:"-"`
}

func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeBatch([]telegraf.Metric{metric})
}

func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var buf bytes.Buffer
var lastErr error
// traceAndKeepErr logs on Trace level every passed error.
// with each call it updates lastErr, so it can be logged later with higher level.
traceAndKeepErr := func(format string, a ...any) {
lastErr = fmt.Errorf(format, a...)
s.Log.Trace(lastErr)
}

var buf bytes.Buffer
var entries = make(map[MetricKey]prompb.TimeSeries)
var labels = make([]prompb.Label, 0)
for _, metric := range metrics {
Expand All @@ -41,6 +49,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type())
metricName, ok := prometheus.SanitizeMetricName(metricName)
if !ok {
traceAndKeepErr("failed to parse metric name %q", metricName)
continue
}

Expand All @@ -52,6 +61,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
case telegraf.Untyped:
value, ok := prometheus.SampleValue(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}
metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
Expand All @@ -78,14 +88,17 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

le, ok := metric.GetTag("le")
if !ok {
traceAndKeepErr("failed to parse %q: can't find `le` label", metricName)
continue
}
bound, err := strconv.ParseFloat(le, 64)
if err != nil {
traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, le, err)
continue
}
count, ok := prometheus.SampleCount(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -97,13 +110,15 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
case strings.HasSuffix(field.Key, "_sum"):
sum, ok := prometheus.SampleSum(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_sum", labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -119,35 +134,41 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
default:
traceAndKeepErr("failed to parse %q: series %q should have `_count`, `_sum` or `_bucket` suffix", metricName, field.Key)
continue
}
case telegraf.Summary:
switch {
case strings.HasSuffix(field.Key, "_sum"):
sum, ok := prometheus.SampleSum(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_sum", labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
default:
quantileTag, ok := metric.GetTag("quantile")
if !ok {
traceAndKeepErr("failed to parse %q: can't find `quantile` label", metricName)
continue
}
quantile, err := strconv.ParseFloat(quantileTag, 64)
if err != nil {
traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, quantileTag, err)
continue
}
value, ok := prometheus.SampleValue(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -162,18 +183,25 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
}

// A batch of metrics can contain multiple values for a single
// Prometheus sample. If this metric is older than the existing
// Prometheus sample. If this metric is older than the existing
// sample then we can skip over it.
m, ok := entries[metrickey]
if ok {
if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
traceAndKeepErr("metric %q has samples with timestamp %v older than already registered before", metric.Name(), metric.Time())
continue
}
}
entries[metrickey] = promts
}
}

if lastErr != nil {
// log only the last recorded error in the batch, as it could have many errors and logging each one
// could be too verbose. The following log line still provides enough info for user to act on.
s.Log.Errorf("some series were dropped, %d series left to send; last recorded error: %v", len(entries), lastErr)
}

var promTS = make([]prompb.TimeSeries, len(entries))
var i int
for _, promts := range entries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func BenchmarkRemoteWrite(b *testing.B) {
time.Unix(0, 0),
)
}
s := &Serializer{}
s := &Serializer{Log: &testutil.CaptureLogger{}}
for n := 0; n < b.N; n++ {
//nolint:errcheck // Benchmarking so skip the error check to avoid the unnecessary operations
s.SerializeBatch(batch)
Expand Down Expand Up @@ -188,6 +188,7 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Serializer{
Log: &testutil.CaptureLogger{},
SortMetrics: true,
}
data, err := s.Serialize(tt.metric)
Expand All @@ -201,6 +202,83 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
}
}

func TestRemoteWriteSerializeNegative(t *testing.T) {
clog := &testutil.CaptureLogger{}
s := &Serializer{Log: clog}

assert := func(msg string, err error) {
t.Helper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

lastMsg := clog.LastError()
if lastMsg == "" {
t.Fatal("expected non-empty last message")
}
if !strings.Contains(lastMsg, msg) {
t.Fatalf("expected to have log message %q; got %q instead", msg, lastMsg)
}
// reset logger so it can be reused again
clog.Clear()
}

m := testutil.MustMetric("@@!!", nil, map[string]interface{}{"!!": "@@"}, time.Unix(0, 0))
_, err := s.Serialize(m)
assert("failed to parse metric name", err)

m = testutil.MustMetric("prometheus", nil,
map[string]interface{}{
"http_requests_total": "asd",
},
time.Unix(0, 0),
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{
"le": "0.5",
},
map[string]interface{}{
"http_request_duration_seconds_bucket": "asd",
},
time.Unix(0, 0),
telegraf.Histogram,
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{
"code": "400",
"method": "post",
},
map[string]interface{}{
"http_requests_total": 3.0,
"http_requests_errors_total": "3.0",
},
time.Unix(0, 0),
telegraf.Gauge,
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.01a"},
map[string]interface{}{
"rpc_duration_seconds": 3102.0,
},
time.Unix(0, 0),
telegraf.Summary,
)
_, err = s.Serialize(m)
assert("failed to parse", err)
}

func TestRemoteWriteSerializeBatch(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -679,6 +757,7 @@ rpc_duration_seconds_sum 17560473
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Serializer{
Log: &testutil.CaptureLogger{},
SortMetrics: true,
StringAsLabel: tt.stringAsLabel,
}
Expand Down Expand Up @@ -733,7 +812,7 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
}

func BenchmarkSerialize(b *testing.B) {
s := &Serializer{}
s := &Serializer{Log: &testutil.CaptureLogger{}}
metrics := serializers.BenchmarkMetrics(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -743,7 +822,7 @@ func BenchmarkSerialize(b *testing.B) {
}

func BenchmarkSerializeBatch(b *testing.B) {
s := &Serializer{}
s := &Serializer{Log: &testutil.CaptureLogger{}}
m := serializers.BenchmarkMetrics(b)
metrics := m[:]
b.ResetTimer()
Expand Down

0 comments on commit 7df29b0

Please sign in to comment.