diff --git a/obsreport/obsreport_receiver.go b/obsreport/obsreport_receiver.go index 173c5e751ad0..c2fdbb76749e 100644 --- a/obsreport/obsreport_receiver.go +++ b/obsreport/obsreport_receiver.go @@ -78,6 +78,10 @@ type ReceiverSettings struct { // NewReceiver creates a new Receiver. func NewReceiver(cfg ReceiverSettings) *Receiver { + return newReceiver(cfg, featuregate.GetRegistry()) +} + +func newReceiver(cfg ReceiverSettings, registry *featuregate.Registry) *Receiver { rec := &Receiver{ level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel, spanNamePrefix: obsmetrics.ReceiverPrefix + cfg.ReceiverID.String(), @@ -91,7 +95,7 @@ func NewReceiver(cfg ReceiverSettings) *Receiver { meter: cfg.ReceiverCreateSettings.MeterProvider.Meter(receiverScope), logger: cfg.ReceiverCreateSettings.Logger, - useOtelForMetrics: featuregate.GetRegistry().IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID), + useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID), otelAttrs: []attribute.KeyValue{ attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()), attribute.String(obsmetrics.TransportKey, cfg.Transport), diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index 6f99386a625c..69dbbc250b62 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -25,6 +25,8 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/obsreport/obsreporttest" "go.opentelemetry.io/collector/receiver/scrapererror" @@ -50,149 +52,165 @@ type testParams struct { err error } -func TestReceiveTraceDataOp(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry() - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) +func testTelemetry(t *testing.T, testFunc func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry)) { + t.Run("WithOC", func(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() + testFunc(tt, featuregate.NewRegistry()) + }) - params := []testParams{ - {items: 13, err: errFake}, - {items: 42, err: nil}, - } - for i, param := range params { - rec := NewReceiver(ReceiverSettings{ - ReceiverID: receiver, - Transport: transport, - ReceiverCreateSettings: tt.ToReceiverCreateSettings(), - }) - ctx := rec.StartTracesOp(parentCtx) - assert.NotNil(t, ctx) - rec.EndTracesOp(ctx, format, params[i].items, param.err) - } + t.Run("WithOTel", func(t *testing.T) { + registry := featuregate.NewRegistry() + obsreportconfig.RegisterInternalMetricFeatureGate(registry) + require.NoError(t, registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true})) - spans := tt.SpanRecorder.Ended() - require.Equal(t, len(params), len(spans)) + tt, err := obsreporttest.SetupTelemetry() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - var acceptedSpans, refusedSpans int - for i, span := range spans { - assert.Equal(t, "receiver/"+receiver.String()+"/TraceDataReceived", span.Name()) - switch { - case params[i].err == nil: - acceptedSpans += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedSpansKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - refusedSpans += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedSpansKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected param: %v", params[i]) - } - } - require.NoError(t, obsreporttest.CheckReceiverTraces(tt, receiver, transport, int64(acceptedSpans), int64(refusedSpans))) + testFunc(tt, registry) + }) } -func TestReceiveLogsOp(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry() - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) +func TestReceiveTraceDataOp(t *testing.T) { + testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() - parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() + params := []testParams{ + {items: 13, err: errFake}, + {items: 42, err: nil}, + } + for i, param := range params { + rec := newReceiver(ReceiverSettings{ + ReceiverID: receiver, + Transport: transport, + ReceiverCreateSettings: tt.ToReceiverCreateSettings(), + }, registry) + ctx := rec.StartTracesOp(parentCtx) + assert.NotNil(t, ctx) + rec.EndTracesOp(ctx, format, params[i].items, param.err) + } - params := []testParams{ - {items: 13, err: errFake}, - {items: 42, err: nil}, - } - for i, param := range params { - rec := NewReceiver(ReceiverSettings{ - ReceiverID: receiver, - Transport: transport, - ReceiverCreateSettings: tt.ToReceiverCreateSettings(), - }) - ctx := rec.StartLogsOp(parentCtx) - assert.NotNil(t, ctx) - rec.EndLogsOp(ctx, format, params[i].items, param.err) - } + spans := tt.SpanRecorder.Ended() + require.Equal(t, len(params), len(spans)) + + var acceptedSpans, refusedSpans int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiver.String()+"/TraceDataReceived", span.Name()) + switch { + case params[i].err == nil: + acceptedSpans += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedSpansKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + case errors.Is(params[i].err, errFake): + refusedSpans += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedSpansKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected param: %v", params[i]) + } + } + require.NoError(t, obsreporttest.CheckReceiverTraces(tt, receiver, transport, int64(acceptedSpans), int64(refusedSpans))) + }) +} - spans := tt.SpanRecorder.Ended() - require.Equal(t, len(params), len(spans)) +func TestReceiveLogsOp(t *testing.T) { + testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() - var acceptedLogRecords, refusedLogRecords int - for i, span := range spans { - assert.Equal(t, "receiver/"+receiver.String()+"/LogsReceived", span.Name()) - switch { - case params[i].err == nil: - acceptedLogRecords += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedLogRecordsKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - refusedLogRecords += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedLogRecordsKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected param: %v", params[i]) + params := []testParams{ + {items: 13, err: errFake}, + {items: 42, err: nil}, } - } - require.NoError(t, obsreporttest.CheckReceiverLogs(tt, receiver, transport, int64(acceptedLogRecords), int64(refusedLogRecords))) + for i, param := range params { + rec := newReceiver(ReceiverSettings{ + ReceiverID: receiver, + Transport: transport, + ReceiverCreateSettings: tt.ToReceiverCreateSettings(), + }, registry) + ctx := rec.StartLogsOp(parentCtx) + assert.NotNil(t, ctx) + rec.EndLogsOp(ctx, format, params[i].items, param.err) + } + + spans := tt.SpanRecorder.Ended() + require.Equal(t, len(params), len(spans)) + + var acceptedLogRecords, refusedLogRecords int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiver.String()+"/LogsReceived", span.Name()) + switch { + case params[i].err == nil: + acceptedLogRecords += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedLogRecordsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + case errors.Is(params[i].err, errFake): + refusedLogRecords += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedLogRecordsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected param: %v", params[i]) + } + } + require.NoError(t, obsreporttest.CheckReceiverLogs(tt, receiver, transport, int64(acceptedLogRecords), int64(refusedLogRecords))) + }) } func TestReceiveMetricsOp(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry() - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() + testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() - params := []testParams{ - {items: 23, err: errFake}, - {items: 29, err: nil}, - } - for i, param := range params { - rec := NewReceiver(ReceiverSettings{ - ReceiverID: receiver, - Transport: transport, - ReceiverCreateSettings: tt.ToReceiverCreateSettings(), - }) - ctx := rec.StartMetricsOp(parentCtx) - assert.NotNil(t, ctx) - rec.EndMetricsOp(ctx, format, params[i].items, param.err) - } - - spans := tt.SpanRecorder.Ended() - require.Equal(t, len(params), len(spans)) + params := []testParams{ + {items: 23, err: errFake}, + {items: 29, err: nil}, + } + for i, param := range params { + rec := newReceiver(ReceiverSettings{ + ReceiverID: receiver, + Transport: transport, + ReceiverCreateSettings: tt.ToReceiverCreateSettings(), + }, registry) + ctx := rec.StartMetricsOp(parentCtx) + assert.NotNil(t, ctx) + rec.EndMetricsOp(ctx, format, params[i].items, param.err) + } - var acceptedMetricPoints, refusedMetricPoints int - for i, span := range spans { - assert.Equal(t, "receiver/"+receiver.String()+"/MetricsReceived", span.Name()) - switch { - case params[i].err == nil: - acceptedMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedMetricPointsKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - refusedMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedMetricPointsKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected param: %v", params[i]) + spans := tt.SpanRecorder.Ended() + require.Equal(t, len(params), len(spans)) + + var acceptedMetricPoints, refusedMetricPoints int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiver.String()+"/MetricsReceived", span.Name()) + switch { + case params[i].err == nil: + acceptedMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedMetricPointsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + case errors.Is(params[i].err, errFake): + refusedMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.AcceptedMetricPointsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.RefusedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected param: %v", params[i]) + } } - } - require.NoError(t, obsreporttest.CheckReceiverMetrics(tt, receiver, transport, int64(acceptedMetricPoints), int64(refusedMetricPoints))) + require.NoError(t, obsreporttest.CheckReceiverMetrics(tt, receiver, transport, int64(acceptedMetricPoints), int64(refusedMetricPoints))) + }) } func TestScrapeMetricsDataOp(t *testing.T) { diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index b480f5d82ef2..c506535ec188 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -20,8 +20,13 @@ import ( "reflect" "sort" + ocprom "contrib.go.opencensus.io/exporter/prometheus" + "github.com/prometheus/client_golang/prometheus" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/multierr" @@ -51,6 +56,10 @@ type TestTelemetry struct { component.TelemetrySettings SpanRecorder *tracetest.SpanRecorder views []*view.View + + otelPrometheusChecker *prometheusChecker + meterProvider *sdkmetric.MeterProvider + ocExporter *ocprom.Exporter } // ToExporterCreateSettings returns ExporterCreateSettings with configured TelemetrySettings @@ -77,7 +86,13 @@ func (tts *TestTelemetry) ToReceiverCreateSettings() component.ReceiverCreateSet // Shutdown unregisters any views and shuts down the SpanRecorder func (tts *TestTelemetry) Shutdown(ctx context.Context) error { view.Unregister(tts.views...) - return tts.SpanRecorder.Shutdown(ctx) + view.UnregisterExporter(tts.ocExporter) + var errs error + errs = multierr.Append(errs, tts.SpanRecorder.Shutdown(ctx)) + if tts.meterProvider != nil { + errs = multierr.Append(errs, tts.meterProvider.Shutdown(ctx)) + } + return errs } // SetupTelemetry does setup the testing environment to check the metrics recorded by receivers, producers or exporters. @@ -99,7 +114,28 @@ func SetupTelemetry() (TestTelemetry, error) { return settings, err } - return settings, err + promReg := prometheus.NewRegistry() + + settings.ocExporter, err = ocprom.NewExporter(ocprom.Options{Registry: promReg}) + if err != nil { + return settings, err + } + view.RegisterExporter(settings.ocExporter) + + exporter, err := otelprom.New(otelprom.WithRegisterer(promReg), otelprom.WithoutUnits()) + if err != nil { + return settings, err + } + + settings.meterProvider = sdkmetric.NewMeterProvider( + sdkmetric.WithResource(resource.Empty()), + sdkmetric.WithReader(exporter), + ) + settings.TelemetrySettings.MeterProvider = settings.meterProvider + + settings.otelPrometheusChecker = &prometheusChecker{promHandler: settings.ocExporter} + + return settings, nil } // CheckExporterTraces checks that for the current exported values for trace exporter metrics match given values. @@ -170,29 +206,20 @@ func CheckProcessorLogs(_ TestTelemetry, processor config.ComponentID, acceptedL // CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckReceiverTraces(_ TestTelemetry, receiver config.ComponentID, protocol string, acceptedSpans, droppedSpans int64) error { - receiverTags := tagsForReceiverView(receiver, protocol) - return multierr.Combine( - checkValueForView(receiverTags, acceptedSpans, "receiver/accepted_spans"), - checkValueForView(receiverTags, droppedSpans, "receiver/refused_spans")) +func CheckReceiverTraces(tts TestTelemetry, receiver config.ComponentID, protocol string, acceptedSpans, droppedSpans int64) error { + return tts.otelPrometheusChecker.checkReceiverTraces(receiver, protocol, acceptedSpans, droppedSpans) } // CheckReceiverLogs checks that for the current exported values for logs receiver metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckReceiverLogs(_ TestTelemetry, receiver config.ComponentID, protocol string, acceptedLogRecords, droppedLogRecords int64) error { - receiverTags := tagsForReceiverView(receiver, protocol) - return multierr.Combine( - checkValueForView(receiverTags, acceptedLogRecords, "receiver/accepted_log_records"), - checkValueForView(receiverTags, droppedLogRecords, "receiver/refused_log_records")) +func CheckReceiverLogs(tts TestTelemetry, receiver config.ComponentID, protocol string, acceptedLogRecords, droppedLogRecords int64) error { + return tts.otelPrometheusChecker.checkReceiverLogs(receiver, protocol, acceptedLogRecords, droppedLogRecords) } // CheckReceiverMetrics checks that for the current exported values for metrics receiver metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckReceiverMetrics(_ TestTelemetry, receiver config.ComponentID, protocol string, acceptedMetricPoints, droppedMetricPoints int64) error { - receiverTags := tagsForReceiverView(receiver, protocol) - return multierr.Combine( - checkValueForView(receiverTags, acceptedMetricPoints, "receiver/accepted_metric_points"), - checkValueForView(receiverTags, droppedMetricPoints, "receiver/refused_metric_points")) +func CheckReceiverMetrics(tts TestTelemetry, receiver config.ComponentID, protocol string, acceptedMetricPoints, droppedMetricPoints int64) error { + return tts.otelPrometheusChecker.checkReceiverMetrics(receiver, protocol, acceptedMetricPoints, droppedMetricPoints) } // CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values. @@ -229,18 +256,6 @@ func checkValueForView(wantTags []tag.Tag, value int64, vName string) error { return fmt.Errorf("[%s]: could not find tags, wantTags: %s in rows %v", vName, wantTags, rows) } -// tagsForReceiverView returns the tags that are needed for the receiver views. -func tagsForReceiverView(receiver config.ComponentID, transport string) []tag.Tag { - tags := make([]tag.Tag, 0, 2) - - tags = append(tags, tag.Tag{Key: receiverTag, Value: receiver.String()}) - if transport != "" { - tags = append(tags, tag.Tag{Key: transportTag, Value: transport}) - } - - return tags -} - // tagsForScraperView returns the tags that are needed for the scraper views. func tagsForScraperView(receiver config.ComponentID, scraper config.ComponentID) []tag.Tag { return []tag.Tag{ diff --git a/obsreport/obsreporttest/otelprometheuschecker.go b/obsreport/obsreporttest/otelprometheuschecker.go new file mode 100644 index 000000000000..9c6586de998b --- /dev/null +++ b/obsreport/obsreporttest/otelprometheuschecker.go @@ -0,0 +1,133 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsreporttest // import "go.opentelemetry.io/collector/obsreport/obsreporttest" + +import ( + "fmt" + "math" + "net/http" + "net/http/httptest" + + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "go.opencensus.io/stats/view" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/config" +) + +// prometheusChecker is used to assert exported metrics from a prometheus handler. +type prometheusChecker struct { + promHandler http.Handler +} + +func (pc *prometheusChecker) checkReceiverTraces(receiver config.ComponentID, protocol string, acceptedSpans, droppedSpans int64) error { + receiverAttrs := attributesForReceiverMetrics(receiver, protocol) + return multierr.Combine( + pc.checkCounter("receiver_accepted_spans", acceptedSpans, receiverAttrs), + pc.checkCounter("receiver_refused_spans", droppedSpans, receiverAttrs)) +} + +func (pc *prometheusChecker) checkReceiverLogs(receiver config.ComponentID, protocol string, acceptedLogRecords, droppedLogRecords int64) error { + receiverAttrs := attributesForReceiverMetrics(receiver, protocol) + return multierr.Combine( + pc.checkCounter("receiver_accepted_log_records", acceptedLogRecords, receiverAttrs), + pc.checkCounter("receiver_refused_log_records", droppedLogRecords, receiverAttrs)) +} + +func (pc *prometheusChecker) checkReceiverMetrics(receiver config.ComponentID, protocol string, acceptedMetricPoints, droppedMetricPoints int64) error { + receiverAttrs := attributesForReceiverMetrics(receiver, protocol) + return multierr.Combine( + pc.checkCounter("receiver_accepted_metric_points", acceptedMetricPoints, receiverAttrs), + pc.checkCounter("receiver_refused_metric_points", droppedMetricPoints, receiverAttrs)) +} + +func (pc *prometheusChecker) checkCounter(expectedMetric string, value int64, attrs []attribute.KeyValue) error { + // Forces a flush for the opencensus view data. + _, _ = view.RetrieveData(expectedMetric) + + ts, err := pc.getMetric(expectedMetric, io_prometheus_client.MetricType_COUNTER, attrs) + if err != nil { + return err + } + + expected := float64(value) + if math.Abs(expected-ts.GetCounter().GetValue()) > 0.0001 { + return fmt.Errorf("values for metric '%s' did no match, expected '%f' got '%f'", expectedMetric, expected, ts.GetCounter().GetValue()) + } + + return nil +} + +// getMetric returns the metric time series that matches the given name, type and set of attributes +// it fetches data from the prometheus endpoint and parse them, ideally OTel Go should provide a MeterRecorder of some kind. +func (pc *prometheusChecker) getMetric(expectedName string, expectedType io_prometheus_client.MetricType, expectedAttrs []attribute.KeyValue) (*io_prometheus_client.Metric, error) { + parsed, err := fetchPrometheusMetrics(pc.promHandler) + if err != nil { + return nil, err + } + + metricFamily, ok := parsed[expectedName] + if !ok { + // OTel Go adds `_total` suffix for all monotonic sum. + metricFamily, ok = parsed[expectedName+"_total"] + if !ok { + return nil, fmt.Errorf("metric '%s' not found", expectedName) + } + } + + if metricFamily.Type.String() != expectedType.String() { + return nil, fmt.Errorf("metric '%v' has type '%s' instead of '%s'", expectedName, metricFamily.Type.String(), expectedType.String()) + } + + expectedSet := attribute.NewSet(expectedAttrs...) + + for _, metric := range metricFamily.Metric { + var attrs []attribute.KeyValue + + for _, label := range metric.Label { + attrs = append(attrs, attribute.String(label.GetName(), label.GetValue())) + } + set := attribute.NewSet(attrs...) + + if expectedSet.Equals(&set) { + return metric, nil + } + } + + return nil, fmt.Errorf("metric '%s' doesn't have a timeseries with the given attributes: %s", expectedName, expectedSet.Encoded(attribute.DefaultEncoder())) +} + +func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_client.MetricFamily, error) { + req, err := http.NewRequest("GET", "/metrics", nil) + if err != nil { + return nil, err + } + + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + var parser expfmt.TextParser + return parser.TextToMetricFamilies(rr.Body) +} + +// attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics. +func attributesForReceiverMetrics(receiver config.ComponentID, transport string) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(receiverTag.Name(), receiver.String()), + attribute.String(transportTag.Name(), transport), + } +} diff --git a/obsreport/obsreporttest/otelprometheuschecker_test.go b/obsreport/obsreporttest/otelprometheuschecker_test.go new file mode 100644 index 000000000000..7c1615ccb321 --- /dev/null +++ b/obsreport/obsreporttest/otelprometheuschecker_test.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsreporttest // import "go.opentelemetry.io/collector/obsreport/obsreporttest" + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" + + "go.opentelemetry.io/collector/config" +) + +func newStubPromChecker() prometheusChecker { + return prometheusChecker{ + promHandler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(` +# HELP receiver_accepted_log_records Number of log records successfully pushed into the pipeline. +# TYPE receiver_accepted_log_records counter +receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102 +# HELP receiver_accepted_metric_points Number of metric points successfully pushed into the pipeline. +# TYPE receiver_accepted_metric_points counter +receiver_accepted_metric_points{receiver="fakeReceiver",transport="fakeTransport"} 7 +# HELP receiver_accepted_spans Number of spans successfully pushed into the pipeline. +# TYPE receiver_accepted_spans counter +receiver_accepted_spans{receiver="fakeReceiver",transport="fakeTransport"} 42 +# HELP receiver_refused_log_records Number of log records that could not be pushed into the pipeline. +# TYPE receiver_refused_log_records counter +receiver_refused_log_records{receiver="fakeReceiver",transport="fakeTransport"} 35 +# HELP receiver_refused_metric_points Number of metric points that could not be pushed into the pipeline. +# TYPE receiver_refused_metric_points counter +receiver_refused_metric_points{receiver="fakeReceiver",transport="fakeTransport"} 41 +# HELP receiver_refused_spans Number of spans that could not be pushed into the pipeline. +# TYPE receiver_refused_spans counter +receiver_refused_spans{receiver="fakeReceiver",transport="fakeTransport"} 13 +# HELP gauge_metric A simple gauge metric +# TYPE gauge_metric gauge +gauge_metric 49 +`)) + }), + } +} + +func TestPromChecker(t *testing.T) { + pc := newStubPromChecker() + receiver := config.NewComponentID("fakeReceiver") + transport := "fakeTransport" + + assert.NoError(t, + pc.checkCounter("receiver_accepted_spans", 42, []attribute.KeyValue{attribute.String("receiver", receiver.String()), attribute.String("transport", transport)}), + "correct assertion should return no error", + ) + + assert.Error(t, + pc.checkCounter("receiver_accepted_spans", 15, []attribute.KeyValue{attribute.String("receiver", receiver.String()), attribute.String("transport", transport)}), + "invalid value should return error", + ) + + assert.Error(t, + pc.checkCounter("invalid_name", 42, []attribute.KeyValue{attribute.String("receiver", receiver.String()), attribute.String("transport", transport)}), + "invalid name should return error", + ) + + assert.Error(t, + pc.checkCounter("receiver_accepted_spans", 42, []attribute.KeyValue{attribute.String("receiver", "notFakeReceiver"), attribute.String("transport", transport)}), + "invalid attributes should return error", + ) + + assert.Error(t, + pc.checkCounter("gauge_metric", 49, nil), + "invalid metric type should return error", + ) + + assert.NoError(t, + pc.checkReceiverTraces(receiver, transport, 42, 13), + "metrics from Receiver Traces should be valid", + ) + + assert.NoError(t, + pc.checkReceiverMetrics(receiver, transport, 7, 41), + "metrics from Receiver Metrics should be valid", + ) + + assert.NoError(t, + pc.checkReceiverLogs(receiver, transport, 102, 35), + "metrics from Receiver Logs should be valid", + ) +}