Skip to content

Commit

Permalink
Change queue metrics to use opencensus metrics instead of stats, clos…
Browse files Browse the repository at this point in the history
…e to otel-go (open-telemetry#4220)

* Change queue metrics to use opencensus metrics instead of stats, close to otel

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Apply suggestions from code review

Co-authored-by: alrex <alrex.boten@gmail.com>

* Fix lint errors

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

Co-authored-by: alrex <alrex.boten@gmail.com>
  • Loading branch information
bogdandrutu and codeboten authored Oct 20, 2021
1 parent 4480a74 commit 964c857
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 121 deletions.
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings,
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
ExporterCreateSettings: set,
})
}, globalInstruments)
be.qrSender = newQueuedRetrySender(cfg.ID(), signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewLogsExporter(
req := newLogsRequest(ctx, ld, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(req.context(), req.count())
be.obsrep.recordLogsEnqueueFailure(req.context(), int64(req.count()))
}
return err
}, bs.consumerOptions...)
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
checkExporterEnqueueFailedLogsStats(t, fakeLogsExporterName, int64(15))
checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterName, int64(15))
}

func TestLogsExporter_WithSpan(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewMetricsExporter(
req := newMetricsRequest(ctx, md, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(req.context(), req.count())
be.obsrep.recordMetricsEnqueueFailure(req.context(), int64(req.count()))
}
return err
}, bs.consumerOptions...)
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 10 metric points rejected due to queue overflow
checkExporterEnqueueFailedMetricsStats(t, fakeMetricsExporterName, int64(10))
checkExporterEnqueueFailedMetricsStats(t, globalInstruments, fakeMetricsExporterName, int64(10))
}

func TestMetricsExporter_WithSpan(t *testing.T) {
Expand Down
81 changes: 69 additions & 12 deletions exporter/exporterhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"

"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/obsreport"
Expand All @@ -28,31 +29,87 @@ import (
// into existing `obsreport` package once its functionally is not exposed
// as public API. For now this part is kept private.

var (
globalInstruments = newInstruments(metric.NewRegistry())
)

func init() {
metricproducer.GlobalManager().AddProducer(globalInstruments.registry)
}

type instruments struct {
registry *metric.Registry
queueSize *metric.Int64DerivedGauge
failedToEnqueueTraceSpans *metric.Int64Cumulative
failedToEnqueueMetricPoints *metric.Int64Cumulative
failedToEnqueueLogRecords *metric.Int64Cumulative
}

func newInstruments(registry *metric.Registry) *instruments {
insts := &instruments{
registry: registry,
}
insts.queueSize, _ = registry.AddInt64DerivedGauge(
obsmetrics.ExporterKey+"/queue_size",
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueTraceSpans, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_spans",
metric.WithDescription("Number of spans failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueMetricPoints, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_metric_points",
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueLogRecords, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_log_records",
metric.WithDescription("Number of log records failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

return insts
}

// obsExporter is a helper to add observability to a component.Exporter.
type obsExporter struct {
*obsreport.Exporter
mutators []tag.Mutator
failedToEnqueueTraceSpansEntry *metric.Int64CumulativeEntry
failedToEnqueueMetricPointsEntry *metric.Int64CumulativeEntry
failedToEnqueueLogRecordsEntry *metric.Int64CumulativeEntry
}

// newObsExporter creates a new observability exporter.
func newObsExporter(cfg obsreport.ExporterSettings) *obsExporter {
func newObsExporter(cfg obsreport.ExporterSettings, insts *instruments) *obsExporter {
labelValue := metricdata.NewLabelValue(cfg.ExporterID.String())
failedToEnqueueTraceSpansEntry, _ := insts.failedToEnqueueTraceSpans.GetEntry(labelValue)
failedToEnqueueMetricPointsEntry, _ := insts.failedToEnqueueMetricPoints.GetEntry(labelValue)
failedToEnqueueLogRecordsEntry, _ := insts.failedToEnqueueLogRecords.GetEntry(labelValue)

return &obsExporter{
obsreport.NewExporter(cfg),
[]tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
Exporter: obsreport.NewExporter(cfg),
failedToEnqueueTraceSpansEntry: failedToEnqueueTraceSpansEntry,
failedToEnqueueMetricPointsEntry: failedToEnqueueMetricPointsEntry,
failedToEnqueueLogRecordsEntry: failedToEnqueueLogRecordsEntry,
}
}

// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *obsExporter) recordTracesEnqueueFailure(ctx context.Context, numSpans int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
func (eor *obsExporter) recordTracesEnqueueFailure(_ context.Context, numSpans int64) {
eor.failedToEnqueueTraceSpansEntry.Inc(numSpans)
}

// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *obsExporter) recordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
func (eor *obsExporter) recordMetricsEnqueueFailure(_ context.Context, numMetricPoints int64) {
eor.failedToEnqueueMetricPointsEntry.Inc(numMetricPoints)
}

// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *obsExporter) recordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
func (eor *obsExporter) recordLogsEnqueueFailure(_ context.Context, numLogRecords int64) {
eor.failedToEnqueueLogRecordsEntry.Inc(numLogRecords)
}
62 changes: 15 additions & 47 deletions exporter/exporterhelper/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ package exporterhelper

import (
"context"
"reflect"
"sort"
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.opencensus.io/metric"
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/config"
Expand All @@ -37,66 +35,42 @@ func TestExportEnqueueFailure(t *testing.T) {

exporter := config.NewComponentID("fakeExporter")

insts := newInstruments(metric.NewRegistry())
obsrep := newObsExporter(obsreport.ExporterSettings{
Level: configtelemetry.LevelNormal,
ExporterID: exporter,
ExporterCreateSettings: set.ToExporterCreateSettings(),
})
}, insts)

logRecords := 7
logRecords := int64(7)
obsrep.recordLogsEnqueueFailure(context.Background(), logRecords)
checkExporterEnqueueFailedLogsStats(t, exporter, int64(logRecords))
checkExporterEnqueueFailedLogsStats(t, insts, exporter, logRecords)

spans := 12
spans := int64(12)
obsrep.recordTracesEnqueueFailure(context.Background(), spans)
checkExporterEnqueueFailedTracesStats(t, exporter, int64(spans))
checkExporterEnqueueFailedTracesStats(t, insts, exporter, spans)

metricPoints := 21
metricPoints := int64(21)
obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints)
checkExporterEnqueueFailedMetricsStats(t, exporter, int64(metricPoints))
checkExporterEnqueueFailedMetricsStats(t, insts, exporter, metricPoints)
}

// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func checkExporterEnqueueFailedTracesStats(t *testing.T, exporter config.ComponentID, spans int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans")
func checkExporterEnqueueFailedTracesStats(t *testing.T, insts *instruments, exporter config.ComponentID, spans int64) {
checkValueForProducer(t, insts.registry, tagsForExporterView(exporter), spans, "exporter/enqueue_failed_spans")
}

// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func checkExporterEnqueueFailedMetricsStats(t *testing.T, exporter config.ComponentID, metricPoints int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points")
func checkExporterEnqueueFailedMetricsStats(t *testing.T, insts *instruments, exporter config.ComponentID, metricPoints int64) {
checkValueForProducer(t, insts.registry, tagsForExporterView(exporter), metricPoints, "exporter/enqueue_failed_metric_points")
}

// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func checkExporterEnqueueFailedLogsStats(t *testing.T, exporter config.ComponentID, logRecords int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records")
}

// checkValueForView checks that for the current exported value in the view with the given name
// for {LegacyTagKeyReceiver: receiverName} is equal to "value".
func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
// Make sure the tags slice is sorted by tag keys.
sortTags(wantTags)

rows, err := view.RetrieveData(vName)
require.NoError(t, err)

for _, row := range rows {
// Make sure the tags slice is sorted by tag keys.
sortTags(row.Tags)
if reflect.DeepEqual(wantTags, row.Tags) {
sum := row.Data.(*view.SumData)
require.Equal(t, float64(value), sum.Value)
return
}
}

require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows)
func checkExporterEnqueueFailedLogsStats(t *testing.T, insts *instruments, exporter config.ComponentID, logRecords int64) {
checkValueForProducer(t, insts.registry, tagsForExporterView(exporter), logRecords, "exporter/enqueue_failed_log_records")
}

// tagsForExporterView returns the tags that are needed for the exporter views.
Expand All @@ -105,9 +79,3 @@ func tagsForExporterView(exporter config.ComponentID) []tag.Tag {
{Key: exporterTag, Value: exporter.String()},
}
}

func sortTags(tags []tag.Tag) {
sort.SliceStable(tags, func(i, j int) bool {
return tags[i].Key.Name() < tags[j].Key.Name()
})
}
16 changes: 0 additions & 16 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,18 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

var (
r = metric.NewRegistry()

queueSizeGauge, _ = r.AddInt64DerivedGauge(
obsmetrics.ExporterKey+"/queue_size",
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

errSendingQueueIsFull = errors.New("sending_queue is full")
)

func init() {
metricproducer.GlobalManager().AddProducer(r)
}

// RetrySettings defines configuration for retrying batches in case of export failure.
// The current supported strategy is exponential backoff.
type RetrySettings struct {
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/queued_retry_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er

// Start reporting queue length metric
if qrs.cfg.Enabled {
err := queueSizeGauge.UpsertEntry(func() int64 {
err := globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName()))
if err != nil {
Expand All @@ -215,7 +215,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er
func (qrs *queuedRetrySender) shutdown() {
// Cleanup queue metrics reporting
if qrs.cfg.Enabled {
_ = queueSizeGauge.UpsertEntry(func() int64 {
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qrs.fullName()))
}
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/queued_retry_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func onTemporaryFailure(logger *zap.Logger, req request, err error) error {
}

// start is invoked during service startup.
func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) error {
func (qrs *queuedRetrySender) start(context.Context, component.Host) error {
qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
req := item.(request)
_ = qrs.consumerSender.send(req)
Expand All @@ -108,7 +108,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er

// Start reporting queue length metric
if qrs.cfg.Enabled {
err := queueSizeGauge.UpsertEntry(func() int64 {
err := globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
Expand All @@ -123,7 +123,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er
func (qrs *queuedRetrySender) shutdown() {
// Cleanup queue metrics reporting
if qrs.cfg.Enabled {
_ = queueSizeGauge.UpsertEntry(func() int64 {
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qrs.fullName))
}
Expand Down
32 changes: 19 additions & 13 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
for i := 0; i < 7; i++ {
require.NoError(t, be.sender.send(newErrorRequest(context.Background())))
}
checkValueForProducer(t, defaultExporterTags, int64(7), "exporter/queue_size")
checkValueForGlobalManager(t, defaultExporterTags, int64(7), "exporter/queue_size")

assert.NoError(t, be.Shutdown(context.Background()))
checkValueForProducer(t, defaultExporterTags, int64(0), "exporter/queue_size")
checkValueForGlobalManager(t, defaultExporterTags, int64(0), "exporter/queue_size")
}

func TestNoCancellationContext(t *testing.T) {
Expand Down Expand Up @@ -486,24 +486,30 @@ func (ocs *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, wan
assert.EqualValues(t, want, atomic.LoadInt64(&ocs.droppedItemsCount))
}

// checkValueForProducer checks that the given metrics with wantTags is reported by one of the
// checkValueForGlobalManager checks that the given metrics with wantTags is reported by one of the
// metric producers
func checkValueForProducer(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
func checkValueForGlobalManager(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
producers := metricproducer.GlobalManager().GetAll()
for _, producer := range producers {
for _, metric := range producer.Read() {
if metric.Descriptor.Name == vName && len(metric.TimeSeries) > 0 {
lastValue := metric.TimeSeries[len(metric.TimeSeries)-1]
if tagsMatchLabelKeys(wantTags, metric.Descriptor.LabelKeys, lastValue.LabelValues) {
require.Equal(t, value, lastValue.Points[len(lastValue.Points)-1].Value.(int64))
return
}
if checkValueForProducer(t, producer, wantTags, value, vName) {
return
}
}
require.Fail(t, fmt.Sprintf("could not find metric %v with tags %s reported", vName, wantTags))
}

// checkValueForProducer checks that the given metrics with wantTags is reported by the metric producer
func checkValueForProducer(t *testing.T, producer metricproducer.Producer, wantTags []tag.Tag, value int64, vName string) bool {
for _, metric := range producer.Read() {
if metric.Descriptor.Name == vName && len(metric.TimeSeries) > 0 {
lastValue := metric.TimeSeries[len(metric.TimeSeries)-1]
if tagsMatchLabelKeys(wantTags, metric.Descriptor.LabelKeys, lastValue.LabelValues) {
require.Equal(t, value, lastValue.Points[len(lastValue.Points)-1].Value.(int64))
return true
}
}
}

require.Fail(t, fmt.Sprintf("could not find metric %v with tags %s reported", vName, wantTags))
return false
}

// tagsMatchLabelKeys returns true if provided tags match keys and values
Expand Down
Loading

0 comments on commit 964c857

Please sign in to comment.