Skip to content

Commit

Permalink
Instrument obsreport.Scraper (open-telemetry#6460)
Browse files Browse the repository at this point in the history
* Instrument obsreport.Scraper (#19)

* instrument obsreport.scraper metrics with otel go

* add changelog

* update API to use MustNewScraper

* fix typo add testing for checkScraperMetrics

* remove accidental merge conflict references

* fix references after rebase

* address review comments

* remove unneded log

* add newlines

* run gofmt

* fix indenting
  • Loading branch information
moh-osman3 authored Nov 17, 2022
1 parent 1d4100c commit 4565692
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 46 deletions.
16 changes: 16 additions & 0 deletions .chloggen/obsreport-scraper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: obsreport

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Instrument `obsreport.Scraper` metrics with otel-go"

# One or more tracking issues or pull requests related to the change
issues: [6460]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
21 changes: 15 additions & 6 deletions internal/obsreportconfig/obsreportconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ func allViews() []*view.View {
views = append(views, receiverViews()...)

// Scraper views.
measures = []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
views = append(views, scraperViews()...)

// Exporter views.
measures = []*stats.Int64Measure{
Expand Down Expand Up @@ -136,6 +131,20 @@ func receiverViews() []*view.View {
return genViews(measures, tagKeys, view.Sum())
}

func scraperViews() []*view.View {
if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) {
return nil
}

measures := []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys := []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}

return genViews(measures, tagKeys, view.Sum())
}

func genViews(
measures []*stats.Int64Measure,
tagKeys []tag.Key,
Expand Down
94 changes: 82 additions & 12 deletions obsreport/obsreport_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,40 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/receiver/scrapererror"
)

var (
scraperName = "scraper"
scraperScope = scopeName + nameSep + scraperName
)

// Scraper is a helper to add observability to a component.Scraper.
type Scraper struct {
level configtelemetry.Level
receiverID component.ID
scraper component.ID
mutators []tag.Mutator
tracer trace.Tracer

logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
scrapedMetricsPoints syncint64.Counter
erroredMetricsPoints syncint64.Counter
}

// ScraperSettings are settings for creating a Scraper.
Expand All @@ -47,25 +66,67 @@ type ScraperSettings struct {

// NewScraper creates a new Scraper.
func NewScraper(cfg ScraperSettings) (*Scraper, error) {
return &Scraper{
return newScraper(cfg, featuregate.GetRegistry())
}

// Deprecated: [v0.65.0] use NewScraper.
func MustNewScraper(cfg ScraperSettings) *Scraper {
scr, err := newScraper(cfg, featuregate.GetRegistry())
if err != nil {
panic(err)
}

return scr
}

func newScraper(cfg ScraperSettings, registry *featuregate.Registry) (*Scraper, error) {
scraper := &Scraper{
level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel,
receiverID: cfg.ReceiverID,
scraper: cfg.Scraper,
mutators: []tag.Mutator{
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))},
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()),
}, nil

logger: cfg.ReceiverCreateSettings.Logger,
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()),
attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()),
},
}

if err := scraper.createOtelMetrics(cfg); err != nil {
return nil, err
}

return scraper, nil
}

// Deprecated: [v0.65.0] use NewScraper.
func MustNewScraper(cfg ScraperSettings) *Scraper {
scrap, err := NewScraper(cfg)
if err != nil {
panic(err)
func (s *Scraper) createOtelMetrics(cfg ScraperSettings) error {
if !s.useOtelForMetrics {
return nil
}
meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope)

var errors, err error

return scrap
s.scrapedMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey,
instrument.WithDescription("Number of metric points successfully scraped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

s.erroredMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey,
instrument.WithDescription("Number of metric points that were unable to be scraped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

return errors
}

// StartMetricsOp is called when a scrape operation is started. The
Expand Down Expand Up @@ -100,10 +161,7 @@ func (s *Scraper) EndMetricsOp(
span := trace.SpanFromContext(scraperCtx)

if s.level != configtelemetry.LevelNone {
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics)
}

// end span according to errors
Expand All @@ -118,3 +176,15 @@ func (s *Scraper) EndMetricsOp(

span.End()
}

func (s *Scraper) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) {
if s.useOtelForMetrics {
s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), s.otelAttrs...)
s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), s.otelAttrs...)
} else { // OC for metrics
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
}
}
30 changes: 15 additions & 15 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type testParams struct {
err error
}

func testTelemetry(t *testing.T, testFunc func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry)) {
func testTelemetry(t *testing.T, testFunc func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry)) {
t.Run("WithOC", func(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

testFunc(tt, featuregate.NewRegistry())
testFunc(t, tt, featuregate.NewRegistry())
})

t.Run("WithOTel", func(t *testing.T) {
Expand All @@ -70,12 +70,12 @@ func testTelemetry(t *testing.T, testFunc func(tt obsreporttest.TestTelemetry, r
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

testFunc(tt, registry)
testFunc(t, tt, registry)
})
}

func TestReceiveTraceDataOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestReceiveTraceDataOp(t *testing.T) {
}

func TestReceiveLogsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -170,7 +170,7 @@ func TestReceiveLogsOp(t *testing.T) {
}

func TestReceiveMetricsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -219,10 +219,10 @@ func TestReceiveMetricsOp(t *testing.T) {
}

func TestScrapeMetricsDataOp(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
testTelemetry(t, testScrapeMetricsDataOp)
}

func testScrapeMetricsDataOp(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand All @@ -232,12 +232,12 @@ func TestScrapeMetricsDataOp(t *testing.T) {
{items: 15, err: nil},
}
for i := range params {
scrp, serr := NewScraper(ScraperSettings{
scrp, err := newScraper(ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
require.NoError(t, serr)
}, registry)
require.NoError(t, err)
ctx := scrp.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestScrapeMetricsDataOp(t *testing.T) {
}

func TestExportTraceDataOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -327,7 +327,7 @@ func TestExportTraceDataOp(t *testing.T) {
}

func TestExportMetricsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -376,7 +376,7 @@ func TestExportMetricsOp(t *testing.T) {
}

func TestExportLogsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down
15 changes: 2 additions & 13 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,8 @@ func CheckReceiverMetrics(tts TestTelemetry, receiver component.ID, protocol str

// CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func CheckScraperMetrics(_ TestTelemetry, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperTags := tagsForScraperView(receiver, scraper)
return multierr.Combine(
checkValueForView(scraperTags, scrapedMetricPoints, "scraper/scraped_metric_points"),
checkValueForView(scraperTags, erroredMetricPoints, "scraper/errored_metric_points"))
func CheckScraperMetrics(tts TestTelemetry, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
return tts.otelPrometheusChecker.checkScraperMetrics(receiver, scraper, scrapedMetricPoints, erroredMetricPoints)
}

// checkValueForView checks that for the current exported value in the view with the given name
Expand Down Expand Up @@ -237,14 +234,6 @@ func checkValueForView(wantTags []tag.Tag, value int64, vName string) error {
return fmt.Errorf("[%s]: could not find tags, wantTags: %s in rows %v", vName, wantTags, rows)
}

// tagsForScraperView returns the tags that are needed for the scraper views.
func tagsForScraperView(receiver component.ID, scraper component.ID) []tag.Tag {
return []tag.Tag{
{Key: receiverTag, Value: receiver.String()},
{Key: scraperTag, Value: scraper.String()},
}
}

// tagsForProcessorView returns the tags that are needed for the processor views.
func tagsForProcessorView(processor component.ID) []tag.Tag {
return []tag.Tag{
Expand Down
22 changes: 22 additions & 0 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,32 @@ const (
)

var (
scraper = component.NewID("fakeScraper")
receiver = component.NewID("fakeReicever")
exporter = component.NewID("fakeExporter")
)

func TestCheckScraperMetricsViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

s, err := obsreport.NewScraper(obsreport.ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
require.NoError(t, err)
ctx := s.StartMetricsOp(context.Background())
require.NotNil(t, ctx)
s.EndMetricsOp(ctx, 7, nil)

assert.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 7))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 7))
}

func TestCheckReceiverTracesViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
Expand Down
14 changes: 14 additions & 0 deletions obsreport/obsreporttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ type prometheusChecker struct {
promHandler http.Handler
}

func (pc *prometheusChecker) checkScraperMetrics(receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
return multierr.Combine(
pc.checkCounter("scraper_scraped_metric_points", scrapedMetricPoints, scraperAttrs),
pc.checkCounter("scraper_errored_metric_points", erroredMetricPoints, scraperAttrs))
}

func (pc *prometheusChecker) checkReceiverTraces(receiver component.ID, protocol string, acceptedSpans, droppedSpans int64) error {
receiverAttrs := attributesForReceiverMetrics(receiver, protocol)
return multierr.Combine(
Expand Down Expand Up @@ -145,6 +152,13 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli
return parser.TextToMetricFamilies(rr.Body)
}

func attributesForScraperMetrics(receiver component.ID, scraper component.ID) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(receiverTag.Name(), receiver.String()),
attribute.String(scraperTag.Name(), scraper.String()),
}
}

// attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics.
func attributesForReceiverMetrics(receiver component.ID, transport string) []attribute.KeyValue {
return []attribute.KeyValue{
Expand Down
Loading

0 comments on commit 4565692

Please sign in to comment.