Skip to content

Commit

Permalink
Revert "[dbnode] Emit aggregate usage metrics (#3123)"
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola committed Jan 28, 2021
1 parent b973d23 commit 033ef9c
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 338 deletions.
15 changes: 6 additions & 9 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,10 +1404,9 @@ func (i *nsIndex) AggregateQuery(
query index.Query,
opts index.AggregationOptions,
) (index.AggregateQueryResult, error) {
id := i.nsMetadata.ID()
logFields := []opentracinglog.Field{
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", id.String()),
opentracinglog.String("namespace", i.nsMetadata.ID().String()),
opentracinglog.Int("seriesLimit", opts.SeriesLimit),
opentracinglog.Int("docsLimit", opts.DocsLimit),
xopentracing.Time("queryStart", opts.StartInclusive),
Expand All @@ -1418,15 +1417,13 @@ func (i *nsIndex) AggregateQuery(
sp.LogFields(logFields...)
defer sp.Finish()

metrics := index.NewAggregateUsageMetrics(id, i.opts.InstrumentOptions())
// Get results and set the filters, namespace ID and size limit.
results := i.aggregateResultsPool.Get()
aopts := index.AggregateResultsOptions{
SizeLimit: opts.SeriesLimit,
DocsLimit: opts.DocsLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
AggregateUsageMetrics: metrics,
SizeLimit: opts.SeriesLimit,
DocsLimit: opts.DocsLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
}
ctx.RegisterFinalizer(results)
// use appropriate fn to query underlying blocks.
Expand All @@ -1445,7 +1442,7 @@ func (i *nsIndex) AggregateQuery(
}
}
aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe()
results.Reset(id, aopts)
results.Reset(i.nsMetadata.ID(), aopts)
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields)
if err != nil {
return index.AggregateQueryResult{}, err
Expand Down
107 changes: 5 additions & 102 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ import (
"math"
"sync"

"github.com/uber-go/tally"

"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
)

Expand All @@ -45,80 +42,10 @@ type aggregatedResults struct {
idPool ident.Pool
bytesPool pool.CheckedBytesPool

pool AggregateResultsPool
valuesPool AggregateValuesPool
encodedDocReader docs.EncodedDocumentReader

iOpts instrument.Options
}

var _ AggregateUsageMetrics = (*usageMetrics)(nil)

type usageMetrics struct {
total tally.Counter

totalTerms tally.Counter
dedupedTerms tally.Counter

totalFields tally.Counter
dedupedFields tally.Counter
}

func (m *usageMetrics) IncTotal(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.total != nil {
m.total.Inc(val)
}
}

func (m *usageMetrics) IncTotalTerms(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.totalTerms != nil {
m.totalTerms.Inc(val)
}
}

func (m *usageMetrics) IncDedupedTerms(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.dedupedTerms != nil {
m.dedupedTerms.Inc(val)
}
}

func (m *usageMetrics) IncTotalFields(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.totalFields != nil {
m.totalFields.Inc(val)
}
}

func (m *usageMetrics) IncDedupedFields(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.dedupedFields != nil {
m.dedupedFields.Inc(val)
}
}

// NewAggregateUsageMetrics builds a new aggregated usage metrics.
func NewAggregateUsageMetrics(ns ident.ID, iOpts instrument.Options) AggregateUsageMetrics {
if ns == nil {
return &usageMetrics{}
}

scope := iOpts.MetricsScope()
buildCounter := func(val string) tally.Counter {
return scope.
Tagged(map[string]string{"type": val, "namespace": ns.String()}).
Counter("aggregated-results")
}
pool AggregateResultsPool
valuesPool AggregateValuesPool

return &usageMetrics{
total: buildCounter("total"),
totalTerms: buildCounter("total-terms"),
dedupedTerms: buildCounter("deduped-terms"),
totalFields: buildCounter("total-fields"),
dedupedFields: buildCounter("deduped-fields"),
}
encodedDocReader docs.EncodedDocumentReader
}

// NewAggregateResults returns a new AggregateResults object.
Expand All @@ -127,14 +54,9 @@ func NewAggregateResults(
aggregateOpts AggregateResultsOptions,
opts Options,
) AggregateResults {
if aggregateOpts.AggregateUsageMetrics == nil {
aggregateOpts.AggregateUsageMetrics = &usageMetrics{}
}

return &aggregatedResults{
nsID: namespaceID,
aggregateOpts: aggregateOpts,
iOpts: opts.InstrumentOptions(),
resultsMap: newAggregateResultsMap(opts.IdentifierPool()),
idPool: opts.IdentifierPool(),
bytesPool: opts.CheckedBytesPool(),
Expand All @@ -151,11 +73,8 @@ func (r *aggregatedResults) Reset(
) {
r.Lock()

if aggregateOpts.AggregateUsageMetrics == nil {
aggregateOpts.AggregateUsageMetrics = NewAggregateUsageMetrics(nsID, r.iOpts)
}

r.aggregateOpts = aggregateOpts

// finalize existing held nsID
if r.nsID != nil {
r.nsID.Finalize()
Expand All @@ -172,6 +91,7 @@ func (r *aggregatedResults) Reset(
valueMap := entry.Value()
valueMap.finalize()
}

// reset all keys in the map next
r.resultsMap.Reset()
r.totalDocsCount = 0
Expand All @@ -190,14 +110,6 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
r.Lock()
defer r.Unlock()

// NB: init total count with batch length, since each aggregated entry
// will have one field.
totalCount := len(batch)
for idx := 0; idx < len(batch); idx++ {
totalCount += len(batch[idx].Terms)
}

r.aggregateOpts.AggregateUsageMetrics.IncTotal(int64(totalCount))
remainingDocs := math.MaxInt64
if r.aggregateOpts.DocsLimit != 0 {
remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount
Expand All @@ -207,9 +119,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
if remainingDocs <= 0 {
for idx := 0; idx < len(batch); idx++ {
batch[idx].Field.Finalize()
r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1)
for _, term := range batch[idx].Terms {
r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1)
term.Finalize()
}
}
Expand All @@ -228,12 +138,9 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
docs := 0
numInserts := 0
for _, entry := range batch {
r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1)

if docs >= remainingDocs || numInserts >= remainingInserts {
entry.Field.Finalize()
for _, term := range entry.Terms {
r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1)
term.Finalize()
}

Expand All @@ -247,8 +154,6 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
aggValues, ok := r.resultsMap.Get(f)
if !ok {
if remainingInserts > numInserts {
r.aggregateOpts.AggregateUsageMetrics.IncDedupedFields(1)

numInserts++
aggValues = r.valuesPool.Get()
// we can avoid the copy because we assume ownership of the passed ident.ID,
Expand All @@ -270,14 +175,12 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)

valuesMap := aggValues.Map()
for _, t := range entry.Terms {
r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1)
if remainingDocs > docs {
docs++
if !valuesMap.Contains(t) {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
if remainingInserts > numInserts {
r.aggregateOpts.AggregateUsageMetrics.IncDedupedTerms(1)
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
Expand Down
Loading

0 comments on commit 033ef9c

Please sign in to comment.