Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect monitoring for the overflow metrics for all aggregations #10330

Merged
merged 9 commits into from
Feb 23, 2023
2 changes: 2 additions & 0 deletions changelogs/8.7.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ https://github.com/elastic/apm-server/compare/8.6\...8.7[View commits]
- `transaction.success_count` has been moved to `event.success_count` {pull}9819[9819]
- Stop indexing transaction metrics to `metrics-apm.internal` {pull}9846[9846]
- Stop indexing span destination metrics to `metrics-apm.internal` {pull}9926[9926]
- `apmserver.aggregation.txmetrics.overflowed` metric has been renamed to `apmserver.aggregation.txmetrics.overflowed.total` {pull}10330[10330]

[float]
==== Bug fixes
Expand Down Expand Up @@ -48,3 +49,4 @@ https://github.com/elastic/apm-server/compare/8.6\...8.7[View commits]
- Translate otel log events to error events {pull}10066[10066]
- Use Elasticsearch fetcher for agent remote configuration where possible {pull}9720[9720] {pull}10088[10088]
- Fetch sourcemaps from Elasticsearch {pull}9722[9722]
- Add overflow metrics for transaction, span metrics, service transaction and service summary metrics {pull}10330[10330]
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"context"
"encoding/binary"
"sync"
"sync/atomic"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-data/model"
"github.com/elastic/apm-server/internal/logs"
Expand Down Expand Up @@ -69,15 +71,20 @@ func (config AggregatorConfig) Validate() error {
// Aggregator aggregates all events, periodically publishing service summary metrics.
type Aggregator struct {
*baseaggregator.Aggregator

config AggregatorConfig
config AggregatorConfig
metrics *aggregatorMetrics

mu sync.RWMutex
// These two metricsBuffer are set to the same size and act as buffers
// for caching and then publishing the metrics as batches.
active, inactive map[time.Duration]*metricsBuffer
}

type aggregatorMetrics struct {
activeGroups int64
overflow int64
}

// NewAggregator returns a new Aggregator with the given config.
func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
if err := config.Validate(); err != nil {
Expand All @@ -88,6 +95,7 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
}
aggregator := Aggregator{
config: config,
metrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metricsBuffer),
inactive: make(map[time.Duration]*metricsBuffer),
}
Expand All @@ -108,6 +116,20 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
return &aggregator, nil
}

// CollectMonitoring may be called to collect monitoring metrics from the
// aggregation. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.aggregation.servicesummarymetrics" registry.
func (a *Aggregator) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

activeGroups := int64(atomic.LoadInt64(&a.metrics.activeGroups))
overflowed := int64(atomic.LoadInt64(&a.metrics.overflow))
monitoring.ReportInt(V, "active_groups", activeGroups)
monitoring.ReportInt(V, "overflowed.total", overflowed)
}

func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
// We hold a.mu only long enough to swap the serviceSummaryMetrics. This will
// be blocked by serviceSummaryMetrics updates, which is OK, as we prefer not
Expand All @@ -134,6 +156,7 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
}

intervalStr := interval.FormatDuration(period)
isMetricsPeriod := period == a.config.Interval
batch := make(model.Batch, 0, size)
for key, metrics := range current.m {
for _, entry := range metrics {
Expand All @@ -143,10 +166,15 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
delete(current.m, key)
}
if current.other != nil {
overflowCount := current.otherCardinalityEstimator.Estimate()
if isMetricsPeriod {
atomic.AddInt64(&a.metrics.activeGroups, int64(current.entries))
atomic.AddInt64(&a.metrics.overflow, int64(overflowCount))
}
m := makeMetricset(*current.other, intervalStr)
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "service_summary.aggregation.overflow_count",
Value: float64(current.otherCardinalityEstimator.Estimate()),
Value: float64(overflowCount),
})
batch = append(batch, m)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/apm-data/model"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func TestNewAggregatorConfigInvalid(t *testing.T) {
Expand Down Expand Up @@ -279,6 +280,14 @@ func TestAggregatorOverflow(t *testing.T) {
require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
require.Len(t, metricsets, maxGrps+1) // only one `other` metric should overflow

// assert monitoring
registry := monitoring.NewRegistry()
monitoring.NewFunc(registry, "servicesummarymetrics", agg.CollectMonitoring)
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["servicesummarymetrics.active_groups"] = int64(maxGrps)
expectedMonitoring.Ints["servicesummarymetrics.overflowed.total"] = int64(overflowCount)

var overflowEvent *model.APMEvent
for i := range metricsets {
m := metricsets[i]
Expand Down
34 changes: 31 additions & 3 deletions x-pack/apm-server/aggregation/servicetxmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"encoding/binary"
"math"
"sync"
"sync/atomic"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-hdrhistogram"

"github.com/elastic/apm-data/model"
Expand Down Expand Up @@ -86,8 +88,8 @@ func (config AggregatorConfig) Validate() error {
// Aggregator aggregates service latency and throughput, periodically publishing service transaction metrics.
type Aggregator struct {
*baseaggregator.Aggregator

config AggregatorConfig
config AggregatorConfig
metrics *aggregatorMetrics

mu sync.RWMutex
// These two metricsBuffer are set to the same size and act as buffers
Expand All @@ -97,6 +99,11 @@ type Aggregator struct {
histogramPool sync.Pool
}

type aggregatorMetrics struct {
activeGroups int64
overflow int64
}

// NewAggregator returns a new Aggregator with the given config.
func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
if err := config.Validate(); err != nil {
Expand All @@ -107,6 +114,7 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
}
aggregator := Aggregator{
config: config,
metrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metricsBuffer),
inactive: make(map[time.Duration]*metricsBuffer),
histogramPool: sync.Pool{New: func() interface{} {
Expand Down Expand Up @@ -134,6 +142,20 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
return &aggregator, nil
}

// CollectMonitoring may be called to collect monitoring metrics from the
// aggregation. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.aggregation.servicetxmetrics" registry.
func (a *Aggregator) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

activeGroups := int64(atomic.LoadInt64(&a.metrics.activeGroups))
overflowed := int64(atomic.LoadInt64(&a.metrics.overflow))
monitoring.ReportInt(V, "active_groups", activeGroups)
monitoring.ReportInt(V, "overflowed.total", overflowed)
}

func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
// We hold a.mu only long enough to swap the serviceTxMetrics. This will
// be blocked by serviceTxMetrics updates, which is OK, as we prefer not
Expand All @@ -159,6 +181,7 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
size++
}

isMetricsPeriod := period == a.config.Interval
intervalStr := interval.FormatDuration(period)
batch := make(model.Batch, 0, size)
for key, metrics := range current.m {
Expand All @@ -173,12 +196,17 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
delete(current.m, key)
}
if current.other != nil {
overflowCount := current.otherCardinalityEstimator.Estimate()
if isMetricsPeriod {
atomic.AddInt64(&a.metrics.activeGroups, int64(current.entries))
atomic.AddInt64(&a.metrics.overflow, int64(overflowCount))
}
entry := current.other
// Record the metricset interval as metricset.interval.
m := makeMetricset(entry.aggregationKey, entry.serviceTxMetrics, intervalStr)
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "service_transaction.aggregation.overflow_count",
Value: float64(current.otherCardinalityEstimator.Estimate()),
Value: float64(overflowCount),
})
batch = append(batch, m)
entry.histogram.Reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/apm-data/model"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func TestNewAggregatorConfigInvalid(t *testing.T) {
Expand Down Expand Up @@ -297,6 +298,17 @@ func TestAggregatorOverflow(t *testing.T) {
require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
require.Len(t, metricsets, maxGrps+1) // only one `other` metric should overflow

// assert monitoring
registry := monitoring.NewRegistry()
monitoring.NewFunc(registry, "servicetxmetrics", agg.CollectMonitoring)
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["servicetxmetrics.active_groups"] = int64(maxGrps)
expectedMonitoring.Ints["servicetxmetrics.overflowed.total"] = int64(overflowCount)
assert.Equal(t, expectedMonitoring, monitoring.CollectFlatSnapshot(
registry, monitoring.Full, false,
))

var overflowEvent *model.APMEvent
for i := range metricsets {
m := metricsets[i]
Expand Down
37 changes: 32 additions & 5 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/binary"
"math"
"sync"
"sync/atomic"
"time"

"github.com/axiomhq/hyperloglog"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/interval"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/labels"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

const (
Expand Down Expand Up @@ -77,15 +79,20 @@ func (config AggregatorConfig) Validate() error {
// Aggregator aggregates transaction durations, periodically publishing histogram spanMetrics.
type Aggregator struct {
*baseaggregator.Aggregator

config AggregatorConfig
config AggregatorConfig
metrics *aggregatorMetrics

mu sync.RWMutex
// These two metricsBuffer are set to the same size and act as buffers
// for caching and then publishing the metrics as batches.
active, inactive map[time.Duration]*metricsBuffer
}

type aggregatorMetrics struct {
activeGroups int64
overflow int64
}

// NewAggregator returns a new Aggregator with the given config.
func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
if err := config.Validate(); err != nil {
Expand All @@ -96,6 +103,7 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
}
aggregator := Aggregator{
config: config,
metrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metricsBuffer),
inactive: make(map[time.Duration]*metricsBuffer),
}
Expand All @@ -116,6 +124,20 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
return &aggregator, nil
}

// CollectMonitoring may be called to collect monitoring metrics from the
// aggregation. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.aggregation.spanmetrics" registry.
func (a *Aggregator) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

activeGroups := int64(atomic.LoadInt64(&a.metrics.activeGroups))
overflowed := int64(atomic.LoadInt64(&a.metrics.overflow))
monitoring.ReportInt(V, "active_groups", activeGroups)
monitoring.ReportInt(V, "overflowed.total", overflowed)
}

func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
// We hold a.mu only long enough to swap the spanMetrics. This will
// be blocked by spanMetrics updates, which is OK, as we prefer not
Expand All @@ -142,6 +164,7 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
}

intervalStr := interval.FormatDuration(period)
isMetricsPeriod := period == a.config.Interval
batch := make(model.Batch, 0, size)
for hash, entries := range current.m {
for _, entry := range entries {
Expand All @@ -153,15 +176,19 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
delete(current.m, hash)
}
if current.other != nil {
overflowCount := current.otherCardinalityEstimator.Estimate()
if isMetricsPeriod {
atomic.AddInt64(&a.metrics.activeGroups, int64(current.entries))
atomic.AddInt64(&a.metrics.overflow, int64(overflowCount))
}
entry := current.other
m := makeMetricset(entry.aggregationKey, entry.spanMetrics)
m.Metricset.Interval = intervalStr
count := current.otherCardinalityEstimator.Estimate()
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "service_destination.aggregation.overflow_count",
Value: float64(count),
Value: float64(overflowCount),
})
m.Metricset.DocCount = int64(count)
m.Metricset.DocCount = int64(overflowCount)
batch = append(batch, m)
current.other = nil
current.otherCardinalityEstimator = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/apm-data/model"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func BenchmarkAggregateSpan(b *testing.B) {
Expand Down Expand Up @@ -627,6 +628,14 @@ func TestAggregatorOverflow(t *testing.T) {
require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
require.Len(t, metricsets, maxGrps+1) // only one `other` metric should overflow

// assert monitoring
registry := monitoring.NewRegistry()
monitoring.NewFunc(registry, "spanmetrics", agg.CollectMonitoring)
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["spanmetrics.active_groups"] = int64(maxGrps)
expectedMonitoring.Ints["spanmetrics.overflowed.total"] = int64(overflowCount)

var overflowEvent *model.APMEvent
for i := range metricsets {
m := metricsets[i]
Expand Down
Loading