Skip to content

Commit

Permalink
[batchprocessor] use mdatagen for async metric (#10233)
Browse files Browse the repository at this point in the history
Follows
#10232,
use mdatagen for async counter

---------

Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
codeboten committed May 28, 2024
1 parent 0a9c4ff commit 1d8c53f
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 64 deletions.
84 changes: 74 additions & 10 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -165,17 +167,14 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
}

func TestBatchProcessorSentBySize(t *testing.T) {
telemetryTest(t, testBatchProcessorSentBySize)
}

func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) {
tel := setupTestTelemetry()
sizer := &ptrace.ProtoMarshaler{}
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
creationSet := tel.NewCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
Expand Down Expand Up @@ -211,11 +210,76 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) {
}
}

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.SpanCount()),
sendSizeBytesSum: float64(sizeSum),
sizeTrigger: float64(expectedBatchesNum),
tel.assertMetrics(t, []metricdata.Metrics{
{
Name: "processor_batch_batch_send_size_bytes",
Description: "Number of bytes in batch that was sent",
Unit: "By",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000,
100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000,
1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(sizeSum),
Min: metricdata.NewExtrema(int64(sizeSum / expectedBatchesNum)),
Max: metricdata.NewExtrema(int64(sizeSum / expectedBatchesNum)),
},
},
},
},
{
Name: "processor_batch_batch_send_size",
Description: "Number of units in the batch",
Unit: "1",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000},
BucketCounts: []uint64{0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(sink.SpanCount()),
Min: metricdata.NewExtrema(int64(sendBatchSize)),
Max: metricdata.NewExtrema(int64(sendBatchSize)),
},
},
},
},
{
Name: "processor_batch_batch_size_trigger_send",
Description: "Number of times the batch was sent due to a size trigger",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: int64(expectedBatchesNum),
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
{
Name: "processor_batch_metadata_cardinality",
Description: "Number of distinct metadata value combinations being processed",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
},
},
},
},
})
}

Expand Down
8 changes: 8 additions & 0 deletions processor/batchprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ Number of times the batch was sent due to a size trigger
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_batch_metadata_cardinality

Number of distinct metadata value combinations being processed

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | false |

### processor_batch_timeout_trigger_send

Number of times the batch was sent due to a timeout trigger
Expand Down
2 changes: 1 addition & 1 deletion processor/batchprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
)

Expand All @@ -45,6 +44,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand Down
34 changes: 27 additions & 7 deletions processor/batchprocessor/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions processor/batchprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@ telemetry:
unit: 1
histogram:
value_type: int
bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000]
processor_batch_batch_send_size_bytes:
enabled: true
description: Number of bytes in batch that was sent
unit: By
histogram:
value_type: int
bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000]
processor_batch_metadata_cardinality:
enabled: true
description: Number of distinct metadata value combinations being processed
unit: 1
sum:
value_type: int
async: true
59 changes: 13 additions & 46 deletions processor/batchprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/processorhelper"
)

type trigger int
Expand All @@ -28,59 +24,30 @@ const (
)

type batchProcessorTelemetry struct {
level configtelemetry.Level
detailed bool

exportCtx context.Context

processorAttr []attribute.KeyValue
telemetryBuilder *metadata.TelemetryBuilder
batchMetadataCardinality metric.Int64ObservableUpDownCounter
processorAttr []attribute.KeyValue
telemetryBuilder *metadata.TelemetryBuilder
}

func newBatchProcessorTelemetry(set processor.CreateSettings, currentMetadataCardinality func() int) (*batchProcessorTelemetry, error) {
bpt := &batchProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(obsmetrics.ProcessorKey, set.ID.String())},
exportCtx: context.Background(),
level: set.MetricsLevel,
detailed: set.MetricsLevel == configtelemetry.LevelDetailed,
}

if err := bpt.createOtelMetrics(set.TelemetrySettings, currentMetadataCardinality); err != nil {
return nil, err
}

return bpt, nil
}

func (bpt *batchProcessorTelemetry) createOtelMetrics(set component.TelemetrySettings, currentMetadataCardinality func() int) error {
var (
errors, err error
meter metric.Meter
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings,
metadata.WithLevel(set.MetricsLevel),
metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { return int64(currentMetadataCardinality()) }),
)

// BatchProcessor are emitted starting from Normal level only.
if bpt.level >= configtelemetry.LevelNormal {
meter = metadata.Meter(set)
} else {
meter = noopmetric.Meter{}
if err != nil {
return nil, err
}

bpt.telemetryBuilder, err = metadata.NewTelemetryBuilder(set, metadata.WithLevel(bpt.level))
errors = multierr.Append(errors, err)

bpt.batchMetadataCardinality, err = meter.Int64ObservableUpDownCounter(
processorhelper.BuildCustomMetricName(typeStr, "metadata_cardinality"),
metric.WithDescription("Number of distinct metadata value combinations being processed"),
metric.WithUnit("1"),
metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error {
obs.Observe(int64(currentMetadataCardinality()))
return nil
}),
)
errors = multierr.Append(errors, err)

return errors
return &batchProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(obsmetrics.ProcessorKey, set.ID.String())},
exportCtx: context.Background(),
detailed: set.MetricsLevel == configtelemetry.LevelDetailed,
telemetryBuilder: telemetryBuilder,
}, nil
}

func (bpt *batchProcessorTelemetry) record(trigger trigger, sent, bytes int64) {
Expand Down

0 comments on commit 1d8c53f

Please sign in to comment.