From 048ed0dfb8ba3368c6c0ce0cf7c5f157366449b7 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Thu, 16 May 2024 13:15:27 -0700 Subject: [PATCH] [mdatagen] add support for async instruments (#10159) This PR adds the ability to configure asynchronous (observable) instruments via mdatagen. This requires providing a mechanism to set options to pass in the callbacks that will be called at the time of the observation. --------- Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .../internal/metadata/generated_telemetry.go | 27 +++++++++++++-- .../metadata/generated_telemetry_test.go | 13 ++++++++ .../internal/samplereceiver/metadata.yaml | 8 +++++ cmd/mdatagen/loader_test.go | 12 +++++++ cmd/mdatagen/metricdata.go | 28 +++++++++++++++- cmd/mdatagen/metricdata_test.go | 33 +++++++++++++------ cmd/mdatagen/templates/telemetry.go.tmpl | 24 ++++++++++++++ cmd/mdatagen/templates/telemetry_test.go.tmpl | 15 +++++++++ .../internal/metadata/generated_telemetry.go | 3 ++ .../metadata/generated_telemetry_test.go | 13 ++++++++ .../internal/metadata/generated_telemetry.go | 3 ++ .../metadata/generated_telemetry_test.go | 13 ++++++++ .../internal/metadata/generated_telemetry.go | 3 ++ .../metadata/generated_telemetry_test.go | 13 ++++++++ .../internal/metadata/generated_telemetry.go | 3 ++ .../metadata/generated_telemetry_test.go | 13 ++++++++ .../internal/metadata/generated_telemetry.go | 3 ++ .../metadata/generated_telemetry_test.go | 13 ++++++++ 18 files changed, 227 insertions(+), 13 deletions(-) diff --git a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go index d916f8b40bb..86a50251b2d 100644 --- a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go +++ b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go @@ -3,6 +3,7 @@ package metadata import ( + "context" "errors" "go.opentelemetry.io/otel/metric" @@ -22,17 +23,29 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - BatchSizeTriggerSend metric.Int64Counter - RequestDuration metric.Float64Histogram + BatchSizeTriggerSend metric.Int64Counter + ProcessRuntimeTotalAllocBytes metric.Int64ObservableCounter + observeProcessRuntimeTotalAllocBytes func() int64 + RequestDuration metric.Float64Histogram } // telemetryBuilderOption applies changes to default builder. type telemetryBuilderOption func(*TelemetryBuilder) +// WithProcessRuntimeTotalAllocBytesCallback sets callback for observable ProcessRuntimeTotalAllocBytes metric. +func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessRuntimeTotalAllocBytes = cb + } +} + // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } var err, errs error meter := Meter(settings) builder.BatchSizeTriggerSend, err = meter.Int64Counter( @@ -41,6 +54,16 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme metric.WithUnit("1"), ) errs = errors.Join(errs, err) + builder.ProcessRuntimeTotalAllocBytes, err = meter.Int64ObservableCounter( + "process_runtime_total_alloc_bytes", + metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"), + metric.WithUnit("By"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(builder.observeProcessRuntimeTotalAllocBytes()) + return nil + }), + ) + errs = errors.Join(errs, err) builder.RequestDuration, err = meter.Float64Histogram( "request_duration", metric.WithDescription("Duration of request"), diff --git a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry_test.go b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry_test.go index cfdc5ab89e3..da03a267c62 100644 --- a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry_test.go +++ b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/cmd/mdatagen/internal/samplereceiver/metadata.yaml b/cmd/mdatagen/internal/samplereceiver/metadata.yaml index 9394b54fcc8..824b1e0ecae 100644 --- a/cmd/mdatagen/internal/samplereceiver/metadata.yaml +++ b/cmd/mdatagen/internal/samplereceiver/metadata.yaml @@ -165,3 +165,11 @@ telemetry: unit: s histogram: value_type: double + process_runtime_total_alloc_bytes: + enabled: true + description: Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc') + unit: By + sum: + async: true + value_type: int + monotonic: true diff --git a/cmd/mdatagen/loader_test.go b/cmd/mdatagen/loader_test.go index ee720d8ce54..0c32493d283 100644 --- a/cmd/mdatagen/loader_test.go +++ b/cmd/mdatagen/loader_test.go @@ -250,6 +250,18 @@ func TestLoadMetadata(t *testing.T) { MetricValueType: MetricValueType{pmetric.NumberDataPointValueTypeDouble}, }, }, + "process_runtime_total_alloc_bytes": { + Enabled: true, + Description: "Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')", + Unit: strPtr("By"), + Sum: &sum{ + Mono: Mono{true}, + MetricValueType: MetricValueType{ + ValueType: pmetric.NumberDataPointValueTypeInt, + }, + Async: true, + }, + }, }, }, ScopeName: "go.opentelemetry.io/collector/internal/receiver/samplereceiver", diff --git a/cmd/mdatagen/metricdata.go b/cmd/mdatagen/metricdata.go index c011be6db7a..2acf4b77b3b 100644 --- a/cmd/mdatagen/metricdata.go +++ b/cmd/mdatagen/metricdata.go @@ -27,6 +27,7 @@ type MetricData interface { HasAggregated() bool HasMetricInputType() bool Instrument() string + IsAsync() bool } // AggregationTemporality defines a metric aggregation type. @@ -123,6 +124,7 @@ func (mvt MetricValueType) BasicType() string { type gauge struct { MetricValueType `mapstructure:"value_type"` MetricInputType `mapstructure:",squash"` + Async bool `mapstructure:"async,omitempty"` } // Unmarshal is a custom unmarshaler for gauge. Needed mostly to avoid MetricValueType.Unmarshal inheritance. @@ -146,7 +148,18 @@ func (d gauge) HasAggregated() bool { } func (d gauge) Instrument() string { - return "" + instrumentName := cases.Title(language.English).String(d.MetricValueType.BasicType()) + + if d.Async { + instrumentName += "Observable" + } + + instrumentName += "Gauge" + return instrumentName +} + +func (d gauge) IsAsync() bool { + return d.Async } type sum struct { @@ -154,6 +167,7 @@ type sum struct { Mono `mapstructure:",squash"` MetricValueType `mapstructure:"value_type"` MetricInputType `mapstructure:",squash"` + Async bool `mapstructure:"async,omitempty"` } // Unmarshal is a custom unmarshaler for sum. Needed mostly to avoid MetricValueType.Unmarshal inheritance. @@ -190,6 +204,9 @@ func (d sum) HasAggregated() bool { func (d sum) Instrument() string { instrumentName := cases.Title(language.English).String(d.MetricValueType.BasicType()) + if d.Async { + instrumentName += "Observable" + } if !d.Monotonic { instrumentName += "UpDown" } @@ -197,11 +214,16 @@ func (d sum) Instrument() string { return instrumentName } +func (d sum) IsAsync() bool { + return d.Async +} + type histogram struct { AggregationTemporality `mapstructure:"aggregation_temporality"` Mono `mapstructure:",squash"` MetricValueType `mapstructure:"value_type"` MetricInputType `mapstructure:",squash"` + Async bool `mapstructure:"async,omitempty"` } func (d histogram) Type() string { @@ -228,3 +250,7 @@ func (d *histogram) Unmarshal(parser *confmap.Conf) error { } return parser.Unmarshal(d, confmap.WithIgnoreUnused()) } + +func (d histogram) IsAsync() bool { + return d.Async +} diff --git a/cmd/mdatagen/metricdata_test.go b/cmd/mdatagen/metricdata_test.go index fc67af4e074..1e56afff27c 100644 --- a/cmd/mdatagen/metricdata_test.go +++ b/cmd/mdatagen/metricdata_test.go @@ -7,21 +7,34 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/pdata/pmetric" ) func TestMetricData(t *testing.T) { for _, arg := range []struct { - metricData MetricData - typ string - hasAggregated bool - hasMonotonic bool + metricData MetricData + wantType string + wantHasAggregated bool + wantHasMonotonic bool + wantInstrument string + wantAsync bool }{ - {&gauge{}, "Gauge", false, false}, - {&sum{}, "Sum", true, true}, - {&histogram{}, "Histogram", false, false}, + {&gauge{}, "Gauge", false, false, "Gauge", false}, + {&gauge{Async: true}, "Gauge", false, false, "ObservableGauge", true}, + {&gauge{MetricValueType: MetricValueType{pmetric.NumberDataPointValueTypeInt}, Async: true}, "Gauge", false, false, "Int64ObservableGauge", true}, + {&gauge{MetricValueType: MetricValueType{pmetric.NumberDataPointValueTypeDouble}, Async: true}, "Gauge", false, false, "Float64ObservableGauge", true}, + {&sum{}, "Sum", true, true, "UpDownCounter", false}, + {&sum{Mono: Mono{true}}, "Sum", true, true, "Counter", false}, + {&sum{Async: true}, "Sum", true, true, "ObservableUpDownCounter", true}, + {&sum{MetricValueType: MetricValueType{pmetric.NumberDataPointValueTypeInt}, Async: true}, "Sum", true, true, "Int64ObservableUpDownCounter", true}, + {&sum{MetricValueType: MetricValueType{pmetric.NumberDataPointValueTypeDouble}, Async: true}, "Sum", true, true, "Float64ObservableUpDownCounter", true}, + {&histogram{}, "Histogram", false, false, "Histogram", false}, } { - assert.Equal(t, arg.typ, arg.metricData.Type()) - assert.Equal(t, arg.hasAggregated, arg.metricData.HasAggregated()) - assert.Equal(t, arg.hasMonotonic, arg.metricData.HasMonotonic()) + assert.Equal(t, arg.wantType, arg.metricData.Type()) + assert.Equal(t, arg.wantHasAggregated, arg.metricData.HasAggregated()) + assert.Equal(t, arg.wantHasMonotonic, arg.metricData.HasMonotonic()) + assert.Equal(t, arg.wantInstrument, arg.metricData.Instrument()) + assert.Equal(t, arg.wantAsync, arg.metricData.IsAsync()) } } diff --git a/cmd/mdatagen/templates/telemetry.go.tmpl b/cmd/mdatagen/templates/telemetry.go.tmpl index 5d72cabe316..5643b66967f 100644 --- a/cmd/mdatagen/templates/telemetry.go.tmpl +++ b/cmd/mdatagen/templates/telemetry.go.tmpl @@ -4,6 +4,7 @@ package {{ .Package }} import ( {{- if .Telemetry.Metrics }} + "context" "errors" {{- end }} @@ -26,16 +27,33 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { type TelemetryBuilder struct { {{- range $name, $metric := .Telemetry.Metrics }} {{ $name.Render }} metric.{{ $metric.Data.Instrument }} + {{- if $metric.Data.Async }} + observe{{ $name.Render }} func() {{ $metric.Data.BasicType }} + {{- end }} {{- end }} } // telemetryBuilderOption applies changes to default builder. type telemetryBuilderOption func(*TelemetryBuilder) +{{- range $name, $metric := .Telemetry.Metrics }} + {{ if $metric.Data.Async -}} +// With{{ $name.Render }}Callback sets callback for observable {{ $name.Render }} metric. +func With{{ $name.Render }}Callback(cb func() {{ $metric.Data.BasicType }}) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observe{{ $name.Render }} = cb + } +} + {{- end }} +{{- end }} + // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } var err, errs error meter := Meter(settings) {{- range $name, $metric := .Telemetry.Metrics }} @@ -43,6 +61,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme "{{ $name }}", metric.WithDescription("{{ $metric.Description }}"), metric.WithUnit("{{ $metric.Unit }}"), + {{ if $metric.Data.Async -}} + metric.With{{ casesTitle $metric.Data.BasicType }}Callback(func(_ context.Context, o metric.{{ casesTitle $metric.Data.BasicType }}Observer) error { + o.Observe(builder.observe{{ $name.Render }}()) + return nil + }), + {{- end }} ) errs = errors.Join(errs, err) {{- end }} diff --git a/cmd/mdatagen/templates/telemetry_test.go.tmpl b/cmd/mdatagen/templates/telemetry_test.go.tmpl index 073a53d2aec..ed10dbae08b 100644 --- a/cmd/mdatagen/templates/telemetry_test.go.tmpl +++ b/cmd/mdatagen/templates/telemetry_test.go.tmpl @@ -61,3 +61,18 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } +{{- if .Telemetry.Metrics }} + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} +{{- end }} diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index c74090acb7d..d2d0339d776 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -40,6 +40,9 @@ type telemetryBuilderOption func(*TelemetryBuilder) // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } var err, errs error meter := Meter(settings) builder.ExporterEnqueueFailedLogRecords, err = meter.Int64Counter( diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry_test.go b/exporter/exporterhelper/internal/metadata/generated_telemetry_test.go index 7fd14058b3a..34ce1d281f5 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry_test.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/processor/batchprocessor/internal/metadata/generated_telemetry.go b/processor/batchprocessor/internal/metadata/generated_telemetry.go index a35d095dd51..b0394550566 100644 --- a/processor/batchprocessor/internal/metadata/generated_telemetry.go +++ b/processor/batchprocessor/internal/metadata/generated_telemetry.go @@ -35,6 +35,9 @@ type telemetryBuilderOption func(*TelemetryBuilder) // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } var err, errs error meter := Meter(settings) builder.ProcessorBatchBatchSendSize, err = meter.Int64Histogram( diff --git a/processor/batchprocessor/internal/metadata/generated_telemetry_test.go b/processor/batchprocessor/internal/metadata/generated_telemetry_test.go index 22002cbff85..6480073f1dc 100644 --- a/processor/batchprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/batchprocessor/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/processor/processorhelper/internal/metadata/generated_telemetry.go b/processor/processorhelper/internal/metadata/generated_telemetry.go index cf496bb3504..e32610d463d 100644 --- a/processor/processorhelper/internal/metadata/generated_telemetry.go +++ b/processor/processorhelper/internal/metadata/generated_telemetry.go @@ -40,6 +40,9 @@ type telemetryBuilderOption func(*TelemetryBuilder) // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } var err, errs error meter := Meter(settings) builder.ProcessorAcceptedLogRecords, err = meter.Int64Counter( diff --git a/processor/processorhelper/internal/metadata/generated_telemetry_test.go b/processor/processorhelper/internal/metadata/generated_telemetry_test.go index 8f0846b7663..1fb64bf6023 100644 --- a/processor/processorhelper/internal/metadata/generated_telemetry_test.go +++ b/processor/processorhelper/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/receiver/receiverhelper/internal/metadata/generated_telemetry.go b/receiver/receiverhelper/internal/metadata/generated_telemetry.go index 5226a3d64b4..96b5b5b5fe6 100644 --- a/receiver/receiverhelper/internal/metadata/generated_telemetry.go +++ b/receiver/receiverhelper/internal/metadata/generated_telemetry.go @@ -37,6 +37,9 @@ type telemetryBuilderOption func(*TelemetryBuilder) // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } var err, errs error meter := Meter(settings) builder.ReceiverAcceptedLogRecords, err = meter.Int64Counter( diff --git a/receiver/receiverhelper/internal/metadata/generated_telemetry_test.go b/receiver/receiverhelper/internal/metadata/generated_telemetry_test.go index 842c86838aa..65b40921527 100644 --- a/receiver/receiverhelper/internal/metadata/generated_telemetry_test.go +++ b/receiver/receiverhelper/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/receiver/scraperhelper/internal/metadata/generated_telemetry.go b/receiver/scraperhelper/internal/metadata/generated_telemetry.go index 6c76bccd5f3..4aa060e9c33 100644 --- a/receiver/scraperhelper/internal/metadata/generated_telemetry.go +++ b/receiver/scraperhelper/internal/metadata/generated_telemetry.go @@ -33,6 +33,9 @@ type telemetryBuilderOption func(*TelemetryBuilder) // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } var err, errs error meter := Meter(settings) builder.ScraperErroredMetricPoints, err = meter.Int64Counter( diff --git a/receiver/scraperhelper/internal/metadata/generated_telemetry_test.go b/receiver/scraperhelper/internal/metadata/generated_telemetry_test.go index d182ec9776e..4973ec82ec9 100644 --- a/receiver/scraperhelper/internal/metadata/generated_telemetry_test.go +++ b/receiver/scraperhelper/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +}