Skip to content

Commit

Permalink
Collect monitoring for the overflow metrics for all aggregations (#10330
Browse files Browse the repository at this point in the history
)

(cherry picked from commit 55e78ab)
  • Loading branch information
lahsivjar authored and mergify[bot] committed Feb 23, 2023
1 parent 934bbf5 commit ed4f940
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 72 deletions.
2 changes: 2 additions & 0 deletions changelogs/8.7.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ https://github.com/elastic/apm-server/compare/8.6\...8.7[View commits]
- `transaction.success_count` has been moved to `event.success_count` {pull}9819[9819]
- Stop indexing transaction metrics to `metrics-apm.internal` {pull}9846[9846]
- Stop indexing span destination metrics to `metrics-apm.internal` {pull}9926[9926]
- `apmserver.aggregation.txmetrics.overflowed` metric has been renamed to `apmserver.aggregation.txmetrics.overflowed.total` {pull}10330[10330]

[float]
==== Bug fixes
Expand Down Expand Up @@ -48,3 +49,4 @@ https://github.com/elastic/apm-server/compare/8.6\...8.7[View commits]
- Translate otel log events to error events {pull}10066[10066]
- Use Elasticsearch fetcher for agent remote configuration where possible {pull}9720[9720] {pull}10088[10088]
- Fetch sourcemaps from Elasticsearch {pull}9722[9722]
- Add overflow metrics for transaction, span metrics, service transaction and service summary metrics {pull}10330[10330]
34 changes: 31 additions & 3 deletions x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"context"
"encoding/binary"
"sync"
"sync/atomic"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-data/model"
"github.com/elastic/apm-server/internal/logs"
Expand Down Expand Up @@ -69,15 +71,20 @@ func (config AggregatorConfig) Validate() error {
// Aggregator aggregates all events, periodically publishing service summary metrics.
type Aggregator struct {
*baseaggregator.Aggregator

config AggregatorConfig
config AggregatorConfig
metrics *aggregatorMetrics

mu sync.RWMutex
// These two metricsBuffer are set to the same size and act as buffers
// for caching and then publishing the metrics as batches.
active, inactive map[time.Duration]*metricsBuffer
}

type aggregatorMetrics struct {
activeGroups int64
overflow int64
}

// NewAggregator returns a new Aggregator with the given config.
func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
if err := config.Validate(); err != nil {
Expand All @@ -88,6 +95,7 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
}
aggregator := Aggregator{
config: config,
metrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metricsBuffer),
inactive: make(map[time.Duration]*metricsBuffer),
}
Expand All @@ -108,6 +116,20 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
return &aggregator, nil
}

// CollectMonitoring may be called to collect monitoring metrics from the
// aggregation. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.aggregation.servicesummarymetrics" registry.
func (a *Aggregator) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

activeGroups := int64(atomic.LoadInt64(&a.metrics.activeGroups))
overflowed := int64(atomic.LoadInt64(&a.metrics.overflow))
monitoring.ReportInt(V, "active_groups", activeGroups)
monitoring.ReportInt(V, "overflowed.total", overflowed)
}

func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
// We hold a.mu only long enough to swap the serviceSummaryMetrics. This will
// be blocked by serviceSummaryMetrics updates, which is OK, as we prefer not
Expand All @@ -134,6 +156,7 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
}

intervalStr := interval.FormatDuration(period)
isMetricsPeriod := period == a.config.Interval
batch := make(model.Batch, 0, size)
for key, metrics := range current.m {
for _, entry := range metrics {
Expand All @@ -143,10 +166,15 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
delete(current.m, key)
}
if current.other != nil {
overflowCount := current.otherCardinalityEstimator.Estimate()
if isMetricsPeriod {
atomic.AddInt64(&a.metrics.activeGroups, int64(current.entries))
atomic.AddInt64(&a.metrics.overflow, int64(overflowCount))
}
m := makeMetricset(*current.other, intervalStr)
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "service_summary.aggregation.overflow_count",
Value: float64(current.otherCardinalityEstimator.Estimate()),
Value: float64(overflowCount),
})
batch = append(batch, m)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/apm-data/model"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func TestNewAggregatorConfigInvalid(t *testing.T) {
Expand Down Expand Up @@ -279,6 +280,14 @@ func TestAggregatorOverflow(t *testing.T) {
require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
require.Len(t, metricsets, maxGrps+1) // only one `other` metric should overflow

// assert monitoring
registry := monitoring.NewRegistry()
monitoring.NewFunc(registry, "servicesummarymetrics", agg.CollectMonitoring)
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["servicesummarymetrics.active_groups"] = int64(maxGrps)
expectedMonitoring.Ints["servicesummarymetrics.overflowed.total"] = int64(overflowCount)

var overflowEvent *model.APMEvent
for i := range metricsets {
m := metricsets[i]
Expand Down
34 changes: 31 additions & 3 deletions x-pack/apm-server/aggregation/servicetxmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"encoding/binary"
"math"
"sync"
"sync/atomic"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-hdrhistogram"

"github.com/elastic/apm-data/model"
Expand Down Expand Up @@ -86,8 +88,8 @@ func (config AggregatorConfig) Validate() error {
// Aggregator aggregates service latency and throughput, periodically publishing service transaction metrics.
type Aggregator struct {
*baseaggregator.Aggregator

config AggregatorConfig
config AggregatorConfig
metrics *aggregatorMetrics

mu sync.RWMutex
// These two metricsBuffer are set to the same size and act as buffers
Expand All @@ -97,6 +99,11 @@ type Aggregator struct {
histogramPool sync.Pool
}

type aggregatorMetrics struct {
activeGroups int64
overflow int64
}

// NewAggregator returns a new Aggregator with the given config.
func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
if err := config.Validate(); err != nil {
Expand All @@ -107,6 +114,7 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
}
aggregator := Aggregator{
config: config,
metrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metricsBuffer),
inactive: make(map[time.Duration]*metricsBuffer),
histogramPool: sync.Pool{New: func() interface{} {
Expand Down Expand Up @@ -134,6 +142,20 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
return &aggregator, nil
}

// CollectMonitoring may be called to collect monitoring metrics from the
// aggregation. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.aggregation.servicetxmetrics" registry.
func (a *Aggregator) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

activeGroups := int64(atomic.LoadInt64(&a.metrics.activeGroups))
overflowed := int64(atomic.LoadInt64(&a.metrics.overflow))
monitoring.ReportInt(V, "active_groups", activeGroups)
monitoring.ReportInt(V, "overflowed.total", overflowed)
}

func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
// We hold a.mu only long enough to swap the serviceTxMetrics. This will
// be blocked by serviceTxMetrics updates, which is OK, as we prefer not
Expand All @@ -159,6 +181,7 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
size++
}

isMetricsPeriod := period == a.config.Interval
intervalStr := interval.FormatDuration(period)
batch := make(model.Batch, 0, size)
for key, metrics := range current.m {
Expand All @@ -173,12 +196,17 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
delete(current.m, key)
}
if current.other != nil {
overflowCount := current.otherCardinalityEstimator.Estimate()
if isMetricsPeriod {
atomic.AddInt64(&a.metrics.activeGroups, int64(current.entries))
atomic.AddInt64(&a.metrics.overflow, int64(overflowCount))
}
entry := current.other
// Record the metricset interval as metricset.interval.
m := makeMetricset(entry.aggregationKey, entry.serviceTxMetrics, intervalStr)
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "service_transaction.aggregation.overflow_count",
Value: float64(current.otherCardinalityEstimator.Estimate()),
Value: float64(overflowCount),
})
batch = append(batch, m)
entry.histogram.Reset()
Expand Down
12 changes: 12 additions & 0 deletions x-pack/apm-server/aggregation/servicetxmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/apm-data/model"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func TestNewAggregatorConfigInvalid(t *testing.T) {
Expand Down Expand Up @@ -297,6 +298,17 @@ func TestAggregatorOverflow(t *testing.T) {
require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
require.Len(t, metricsets, maxGrps+1) // only one `other` metric should overflow

// assert monitoring
registry := monitoring.NewRegistry()
monitoring.NewFunc(registry, "servicetxmetrics", agg.CollectMonitoring)
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["servicetxmetrics.active_groups"] = int64(maxGrps)
expectedMonitoring.Ints["servicetxmetrics.overflowed.total"] = int64(overflowCount)
assert.Equal(t, expectedMonitoring, monitoring.CollectFlatSnapshot(
registry, monitoring.Full, false,
))

var overflowEvent *model.APMEvent
for i := range metricsets {
m := metricsets[i]
Expand Down
37 changes: 32 additions & 5 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/binary"
"math"
"sync"
"sync/atomic"
"time"

"github.com/axiomhq/hyperloglog"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/interval"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/labels"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

const (
Expand Down Expand Up @@ -77,15 +79,20 @@ func (config AggregatorConfig) Validate() error {
// Aggregator aggregates transaction durations, periodically publishing histogram spanMetrics.
type Aggregator struct {
*baseaggregator.Aggregator

config AggregatorConfig
config AggregatorConfig
metrics *aggregatorMetrics

mu sync.RWMutex
// These two metricsBuffer are set to the same size and act as buffers
// for caching and then publishing the metrics as batches.
active, inactive map[time.Duration]*metricsBuffer
}

type aggregatorMetrics struct {
activeGroups int64
overflow int64
}

// NewAggregator returns a new Aggregator with the given config.
func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
if err := config.Validate(); err != nil {
Expand All @@ -96,6 +103,7 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
}
aggregator := Aggregator{
config: config,
metrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metricsBuffer),
inactive: make(map[time.Duration]*metricsBuffer),
}
Expand All @@ -116,6 +124,20 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
return &aggregator, nil
}

// CollectMonitoring may be called to collect monitoring metrics from the
// aggregation. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.aggregation.spanmetrics" registry.
func (a *Aggregator) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

activeGroups := int64(atomic.LoadInt64(&a.metrics.activeGroups))
overflowed := int64(atomic.LoadInt64(&a.metrics.overflow))
monitoring.ReportInt(V, "active_groups", activeGroups)
monitoring.ReportInt(V, "overflowed.total", overflowed)
}

func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
// We hold a.mu only long enough to swap the spanMetrics. This will
// be blocked by spanMetrics updates, which is OK, as we prefer not
Expand All @@ -142,6 +164,7 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
}

intervalStr := interval.FormatDuration(period)
isMetricsPeriod := period == a.config.Interval
batch := make(model.Batch, 0, size)
for hash, entries := range current.m {
for _, entry := range entries {
Expand All @@ -153,15 +176,19 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
delete(current.m, hash)
}
if current.other != nil {
overflowCount := current.otherCardinalityEstimator.Estimate()
if isMetricsPeriod {
atomic.AddInt64(&a.metrics.activeGroups, int64(current.entries))
atomic.AddInt64(&a.metrics.overflow, int64(overflowCount))
}
entry := current.other
m := makeMetricset(entry.aggregationKey, entry.spanMetrics)
m.Metricset.Interval = intervalStr
count := current.otherCardinalityEstimator.Estimate()
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "service_destination.aggregation.overflow_count",
Value: float64(count),
Value: float64(overflowCount),
})
m.Metricset.DocCount = int64(count)
m.Metricset.DocCount = int64(overflowCount)
batch = append(batch, m)
current.other = nil
current.otherCardinalityEstimator = nil
Expand Down
9 changes: 9 additions & 0 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/apm-data/model"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func BenchmarkAggregateSpan(b *testing.B) {
Expand Down Expand Up @@ -627,6 +628,14 @@ func TestAggregatorOverflow(t *testing.T) {
require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
require.Len(t, metricsets, maxGrps+1) // only one `other` metric should overflow

// assert monitoring
registry := monitoring.NewRegistry()
monitoring.NewFunc(registry, "spanmetrics", agg.CollectMonitoring)
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["spanmetrics.active_groups"] = int64(maxGrps)
expectedMonitoring.Ints["spanmetrics.overflowed.total"] = int64(overflowCount)

var overflowEvent *model.APMEvent
for i := range metricsets {
m := metricsets[i]
Expand Down
Loading

0 comments on commit ed4f940

Please sign in to comment.