Skip to content

Commit

Permalink
Record overflow metrics for txn metrics aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Feb 22, 2023
1 parent 900229f commit 5f75b31
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 66 deletions.
71 changes: 49 additions & 22 deletions x-pack/apm-server/aggregation/txmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/binary"
"math"
"sync"
"sync/atomic"
"time"

"github.com/axiomhq/hyperloglog"
Expand Down Expand Up @@ -49,8 +48,8 @@ const (
// Aggregator aggregates transaction durations, periodically publishing histogram metrics.
type Aggregator struct {
*baseaggregator.Aggregator
config AggregatorConfig
metrics *aggregatorMetrics // heap-allocated for 64-bit alignment
config AggregatorConfig
aggregatorMetrics *aggregatorMetrics

mu sync.RWMutex
active, inactive map[time.Duration]*metrics
Expand All @@ -59,7 +58,11 @@ type Aggregator struct {
}

type aggregatorMetrics struct {
overflowed int64
mu sync.RWMutex
activeGroups int64
txnGroupsOverflow int64
perSvcTxnGroupsOverflow int64
servicesOverflow int64
}

// AggregatorConfig holds configuration for creating an Aggregator.
Expand Down Expand Up @@ -140,10 +143,10 @@ func NewAggregator(config AggregatorConfig) (*Aggregator, error) {
config.Logger = logp.NewLogger(logs.TransactionMetrics)
}
aggregator := Aggregator{
config: config,
metrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metrics),
inactive: make(map[time.Duration]*metrics),
config: config,
aggregatorMetrics: &aggregatorMetrics{},
active: make(map[time.Duration]*metrics),
inactive: make(map[time.Duration]*metrics),
histogramPool: sync.Pool{New: func() interface{} {
return hdrhistogram.New(
minDuration.Microseconds(),
Expand Down Expand Up @@ -177,15 +180,19 @@ func (a *Aggregator) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor)
V.OnRegistryStart()
defer V.OnRegistryFinished()

a.mu.RLock()
defer a.mu.RUnlock()
metrics := a.aggregatorMetrics

m := a.active[a.config.MetricsInterval]
m.mu.RLock()
defer m.mu.RUnlock()
a.aggregatorMetrics.mu.RLock()
defer a.aggregatorMetrics.mu.RUnlock()

monitoring.ReportInt(V, "active_groups", int64(m.entries))
monitoring.ReportInt(V, "overflowed", atomic.LoadInt64(&a.metrics.overflowed))
totalOverflow := metrics.servicesOverflow + metrics.perSvcTxnGroupsOverflow + metrics.txnGroupsOverflow
monitoring.ReportInt(V, "active_groups", metrics.activeGroups)
monitoring.ReportNamespace(V, "overflowed", func() {
monitoring.ReportInt(V, "services", metrics.servicesOverflow)
monitoring.ReportInt(V, "per_service_txn_groups", metrics.perSvcTxnGroupsOverflow)
monitoring.ReportInt(V, "txn_groups", metrics.txnGroupsOverflow)
monitoring.ReportInt(V, "total", totalOverflow)
})
}

func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
Expand All @@ -203,6 +210,8 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
return nil
}

var activeGroups, servicesOverflow, perSvcTxnGroupsOverflow, txnGroupsOverflow int64
isMetricsPeriod := period == a.config.MetricsInterval
intervalStr := interval.FormatDuration(period)
batch := make(model.Batch, 0, current.entries)
for svc, svcEntry := range current.m {
Expand All @@ -218,12 +227,23 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
delete(svcEntry.m, hash)
}
if svcEntry.other != nil {
overflowCount := int64(svcEntry.otherCardinalityEstimator.Estimate())
if isMetricsPeriod {
activeGroups = int64(current.entries)
if svc == overflowBucketName {
servicesOverflow += overflowCount
} else if svcEntry.entries >= a.config.MaxTransactionGroupsPerService {
perSvcTxnGroupsOverflow += overflowCount
} else {
txnGroupsOverflow += overflowCount
}
}
entry := svcEntry.other
// Record the metricset interval as metricset.interval.
m := makeMetricset(entry.transactionAggregationKey, entry.transactionMetrics, intervalStr)
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "transaction.aggregation.overflow_count",
Value: float64(svcEntry.otherCardinalityEstimator.Estimate()),
Value: float64(overflowCount),
})
batch = append(batch, m)
entry.histogram.Reset()
Expand All @@ -238,6 +258,15 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
current.entries = 0
current.services = 0

if isMetricsPeriod {
a.aggregatorMetrics.mu.Lock()
a.aggregatorMetrics.activeGroups += activeGroups
a.aggregatorMetrics.servicesOverflow += servicesOverflow
a.aggregatorMetrics.perSvcTxnGroupsOverflow += perSvcTxnGroupsOverflow
a.aggregatorMetrics.txnGroupsOverflow += txnGroupsOverflow
a.aggregatorMetrics.mu.Unlock()
}

a.config.Logger.Debugf("%s interval: publishing %d metricsets", period, len(batch))
return a.config.BatchProcessor.ProcessBatch(ctx, &batch)
}
Expand Down Expand Up @@ -283,12 +312,12 @@ func (a *Aggregator) AggregateTransaction(event model.APMEvent) {
}
for _, interval := range a.Intervals {
key := a.makeTransactionAggregationKey(event, interval)
hash := key.hash()
a.updateTransactionMetrics(key, hash, count, event.Event.Duration, interval)
a.updateTransactionMetrics(key, count, event.Event.Duration, interval)
}
}

func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, hash uint64, count float64, duration, interval time.Duration) {
func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, count float64, duration, interval time.Duration) {
hash := key.hash()
if duration < minDuration {
duration = minDuration
} else if duration > maxDuration {
Expand Down Expand Up @@ -349,15 +378,14 @@ func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, has
}
}
var entry *metricsMapEntry
txnOverflow := m.entries >= a.config.MaxTransactionGroups
perSvcTxnOverflow := svcEntry.entries >= a.config.MaxTransactionGroupsPerService
txnOverflow := m.entries >= a.config.MaxTransactionGroups
if svcOverflow || txnOverflow || perSvcTxnOverflow {
if svcEntry.other != nil {
// axiomhq/hyerloglog uses metrohash but here we are using
// xxhash. Metrohash has better performance but since we are
// already calculating xxhash we can use it directly.
svcEntry.otherCardinalityEstimator.InsertHash(hash)
atomic.AddInt64(&a.metrics.overflowed, 1)
svcEntry.other.recordDuration(duration, count)
return
}
Expand Down Expand Up @@ -401,7 +429,6 @@ that configuration option appropriately, may lead to better results.`[1:],
m.entries++
svcEntry.otherCardinalityEstimator = hyperloglog.New14()
svcEntry.otherCardinalityEstimator.InsertHash(hash)
atomic.AddInt64(&a.metrics.overflowed, 1)
entry = svcEntry.other
// For `_other` service we only account for `_other` transaction bucket.
key = a.makeOverflowAggregationKey(key, svcOverflow, a.config.MetricsInterval)
Expand Down
109 changes: 65 additions & 44 deletions x-pack/apm-server/aggregation/txmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,49 +93,59 @@ func TestTxnAggregatorProcessBatch(t *testing.T) {
// for 7 transactions and 3 services; first three service will receive
// 2 txns and the last one will receive 1 txn.
// Note that practically uniqueTxnCount will always be >= uniqueServices.
name string
uniqueTxnCount int
uniqueServices int
expectedActiveGroups int64
expectedPerSvcTxnLimitOverflow int
expectedOtherSvcTxnLimitOverflow int // we will design tests to overflow all the services equally
expectedTotalOverflow int64
name string
uniqueTxnCount int
uniqueServices int
expectedActiveGroups int
// expectedOverflowReasonPerSvcTxnGrps represent total number of txn groups
// that overflowed due to per service txn group limit assuming all servies
// overflow equally. These will be recorded in the `transaction.name: _other`
// and the corresponding service name documents.
expectedOverflowReasonPerSvcTxnGrps int
// expectedOverflowReasonTxnGrps represent total number of txn groups that
// overflowed due to max txn groups limit. These will be recorded in the
// `transaction.name: _other` and the corresponding service name documents.
expectedOverflowReasonTxnGrps int
// expectedOverflowReasonSvc represents total number of txn groups that
// overflowed due to max services limit. These will be recorded in the
// `transaction.name: _other` and the `service.name: _other` document.
expectedOverflowReasonSvc int
}{
{
name: "record_into_other_txn_if_txn_per_svcs_limit_breached",
uniqueTxnCount: 20,
uniqueServices: 2,
expectedActiveGroups: 6,
expectedPerSvcTxnLimitOverflow: 8,
expectedOtherSvcTxnLimitOverflow: 0,
expectedTotalOverflow: 16,
name: "record_into_other_txn_if_txn_per_svcs_limit_breached",
uniqueTxnCount: 20,
uniqueServices: 2,
expectedActiveGroups: 6,
expectedOverflowReasonPerSvcTxnGrps: 16,
expectedOverflowReasonTxnGrps: 0,
expectedOverflowReasonSvc: 0,
},
{
name: "record_into_other_txn_if_txn_grps_limit_breached",
uniqueTxnCount: 60,
uniqueServices: 20,
expectedActiveGroups: 40,
expectedPerSvcTxnLimitOverflow: 2,
expectedOtherSvcTxnLimitOverflow: 0,
expectedTotalOverflow: 40,
name: "record_into_other_txn_if_txn_grps_limit_breached",
uniqueTxnCount: 60,
uniqueServices: 20,
expectedActiveGroups: 40,
expectedOverflowReasonPerSvcTxnGrps: 0,
expectedOverflowReasonTxnGrps: 40,
expectedOverflowReasonSvc: 0,
},
{
name: "record_into_other_txn_other_svc_if_txn_grps_and_svcs_limit_breached",
uniqueTxnCount: 60,
uniqueServices: 60,
expectedActiveGroups: 21,
expectedPerSvcTxnLimitOverflow: 0,
expectedOtherSvcTxnLimitOverflow: 40,
expectedTotalOverflow: 40,
name: "record_into_other_txn_other_svc_if_txn_grps_and_svcs_limit_breached",
uniqueTxnCount: 60,
uniqueServices: 60,
expectedActiveGroups: 21,
expectedOverflowReasonPerSvcTxnGrps: 0,
expectedOverflowReasonTxnGrps: 0,
expectedOverflowReasonSvc: 40,
},
{
name: "all_overflow",
uniqueTxnCount: 600,
uniqueServices: 60,
expectedActiveGroups: 41,
expectedPerSvcTxnLimitOverflow: 9,
expectedOtherSvcTxnLimitOverflow: 400,
expectedTotalOverflow: 580,
name: "all_overflow",
uniqueTxnCount: 600,
uniqueServices: 60,
expectedActiveGroups: 41,
expectedOverflowReasonPerSvcTxnGrps: 0,
expectedOverflowReasonTxnGrps: 180,
expectedOverflowReasonSvc: 400,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -171,10 +181,16 @@ func TestTxnAggregatorProcessBatch(t *testing.T) {
require.NoError(t, agg.Run())
}(t)
require.NoError(t, agg.ProcessBatch(context.Background(), &batch))

require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["txmetrics.active_groups"] = tc.expectedActiveGroups
expectedMonitoring.Ints["txmetrics.overflowed"] = tc.expectedTotalOverflow * int64(repCount)

expectedMonitoring.Ints["txmetrics.active_groups"] = int64(tc.expectedActiveGroups)
expectedMonitoring.Ints["txmetrics.overflowed.per_service_txn_groups"] = int64(tc.expectedOverflowReasonPerSvcTxnGrps)
expectedMonitoring.Ints["txmetrics.overflowed.txn_groups"] = int64(tc.expectedOverflowReasonTxnGrps)
expectedMonitoring.Ints["txmetrics.overflowed.services"] = int64(tc.expectedOverflowReasonSvc)
expectedMonitoring.Ints["txmetrics.overflowed.total"] = int64(
tc.expectedOverflowReasonPerSvcTxnGrps + tc.expectedOverflowReasonTxnGrps + tc.expectedOverflowReasonSvc)
registry := monitoring.NewRegistry()
monitoring.NewFunc(registry, "txmetrics", agg.CollectMonitoring)
assert.Equal(t, expectedMonitoring, monitoring.CollectFlatSnapshot(
Expand All @@ -183,26 +199,31 @@ func TestTxnAggregatorProcessBatch(t *testing.T) {
false, // expvar
))

require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
var expectedOverflowMetricsets []model.APMEvent
var totalOverflowSvcCount int
if tc.expectedPerSvcTxnLimitOverflow > 0 {
totalOverflowIntoSpecificSvcBuckets := tc.expectedOverflowReasonPerSvcTxnGrps + tc.expectedOverflowReasonTxnGrps
// Assuming that all services in the test will overflow equally, any overflow due to max
// transaction groups or per service transaction group limit limit will overflow into the
// corresponding service's overflow bucket uptil the max services limit.
if totalOverflowIntoSpecificSvcBuckets > 0 {
totalOverflowSvcCount = tc.uniqueServices
if tc.uniqueServices > maxSvcs {
totalOverflowSvcCount = maxSvcs
}
}
if tc.expectedOtherSvcTxnLimitOverflow > 0 {
// If there are any overflows due to the max services limit then the overflow
// will be aggregated under a special `service.name: _other` bucket.
if tc.expectedOverflowReasonSvc > 0 {
expectedOverflowMetricsets = append(
expectedOverflowMetricsets,
createOverflowMetricset(tc.expectedOtherSvcTxnLimitOverflow, repCount, txnDuration),
createOverflowMetricset(tc.expectedOverflowReasonSvc, repCount, txnDuration),
)
}
for i := 0; i < totalOverflowSvcCount; i++ {
totalOverflowForEachSvcBuckets := totalOverflowIntoSpecificSvcBuckets / totalOverflowSvcCount
expectedOverflowMetricsets = append(
expectedOverflowMetricsets,
createOverflowMetricset(tc.expectedPerSvcTxnLimitOverflow, repCount, txnDuration),
createOverflowMetricset(totalOverflowForEachSvcBuckets, repCount, txnDuration),
)
}
assert.Empty(t, cmp.Diff(
Expand Down

0 comments on commit 5f75b31

Please sign in to comment.