Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Emit aggregate usage metrics #3123

Merged
merged 12 commits into from
Jan 27, 2021
64 changes: 61 additions & 3 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"math"
"sync"

"github.com/uber-go/tally"

"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
)

Expand All @@ -42,10 +45,44 @@ type aggregatedResults struct {
idPool ident.Pool
bytesPool pool.CheckedBytesPool

pool AggregateResultsPool
valuesPool AggregateValuesPool

pool AggregateResultsPool
valuesPool AggregateValuesPool
encodedDocReader docs.EncodedDocumentReader

iOpts instrument.Options
metrics usageMetrics
}

type usageMetrics struct {
total tally.Counter

totalTerms tally.Counter
dedupedTerms tally.Counter

totalFields tally.Counter
dedupedFields tally.Counter
}

func newUsageMetrics(ns ident.ID, iOpts instrument.Options) usageMetrics {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to reset this with every aggregate result? Is the namespace important enough?

namespace := "unknown"
if ns != nil {
namespace = ns.String()
}

scope := iOpts.MetricsScope()
buildCounter := func(val string) tally.Counter {
return scope.
Tagged(map[string]string{"type": val, "namespace": namespace}).
Counter("aggregated-results")
}

return usageMetrics{
total: buildCounter("total"),
totalTerms: buildCounter("total-terms"),
dedupedTerms: buildCounter("deduped-terms"),
totalFields: buildCounter("total-fields"),
dedupedFields: buildCounter("deduped-fields"),
}
}

// NewAggregateResults returns a new AggregateResults object.
Expand All @@ -54,14 +91,17 @@ func NewAggregateResults(
aggregateOpts AggregateResultsOptions,
opts Options,
) AggregateResults {
iOpts := opts.InstrumentOptions()
return &aggregatedResults{
nsID: namespaceID,
aggregateOpts: aggregateOpts,
iOpts: iOpts,
resultsMap: newAggregateResultsMap(opts.IdentifierPool()),
idPool: opts.IdentifierPool(),
bytesPool: opts.CheckedBytesPool(),
pool: opts.AggregateResultsPool(),
valuesPool: opts.AggregateValuesPool(),
metrics: newUsageMetrics(namespaceID, iOpts),
}
}

Expand Down Expand Up @@ -97,6 +137,7 @@ func (r *aggregatedResults) Reset(
r.totalDocsCount = 0
r.size = 0

r.metrics = newUsageMetrics(nsID, r.iOpts)
// NB: could do keys+value in one step but I'm trying to avoid
// using an internal method of a code-gen'd type.
r.Unlock()
Expand All @@ -110,6 +151,14 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
r.Lock()
defer r.Unlock()

// NB: init total count with batch length, since each aggregated entry
// will have one field.
totalCount := len(batch)
for idx := 0; idx < len(batch); idx++ {
totalCount += len(batch[idx].Terms)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is total different than total terms?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this

}

r.metrics.total.Inc(int64(totalCount))
remainingDocs := int(math.MaxInt64)
if r.aggregateOpts.DocsLimit != 0 {
remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount
Expand All @@ -119,7 +168,9 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
if remainingDocs <= 0 {
for idx := 0; idx < len(batch); idx++ {
batch[idx].Field.Finalize()
r.metrics.totalFields.Inc(1)
for _, term := range batch[idx].Terms {
r.metrics.totalTerms.Inc(1)
term.Finalize()
}
}
Expand All @@ -138,9 +189,12 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
docs := 0
numInserts := 0
for _, entry := range batch {
r.metrics.totalFields.Inc(1)

if docs >= remainingDocs || numInserts >= remainingInserts {
entry.Field.Finalize()
for _, term := range entry.Terms {
r.metrics.totalTerms.Inc(1)
term.Finalize()
}

Expand All @@ -154,6 +208,8 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
aggValues, ok := r.resultsMap.Get(f)
if !ok {
if remainingInserts > numInserts {
r.metrics.dedupedFields.Inc(1)

numInserts++
aggValues = r.valuesPool.Get()
// we can avoid the copy because we assume ownership of the passed ident.ID,
Expand All @@ -175,12 +231,14 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)

valuesMap := aggValues.Map()
for _, t := range entry.Terms {
r.metrics.totalTerms.Inc(1)
if remainingDocs > docs {
docs++
if !valuesMap.Contains(t) {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
if remainingInserts > numInserts {
r.metrics.dedupedTerms.Inc(1)
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
Expand Down
Loading