diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index ec889cc244..afa684feba 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -233,7 +233,7 @@ function test_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 3" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/query?query=database_write_tagged_success) = "400" ]]' - # Test the default docs limit applied when directly querying + # Test the docs limit applied when directly querying # coordinator (docs limit set by header) echo "Test query docs limit with coordinator limit header" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ @@ -391,6 +391,46 @@ function test_series { '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' } +function test_label_query_limits_applied { + # Test that require exhaustive does nothing if limits are not hit + echo "Test label limits with require-exhaustive headers true (below limit therefore no error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' + + # the header takes precedence over the configured default series limit + echo "Test label series limit with coordinator limit header (default requires exhaustive so error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + + echo "Test label series limit with require-exhaustive headers false" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' + + echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" + # Test that require exhaustive error is returned + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + # Test that require exhaustive error is 4xx + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' + + echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + + echo "Test label docs limit with require-exhaustive headers false" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Docs: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' + + echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)" + # Test that require exhaustive error is returned + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + # Test that require exhaustive error is 4xx + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' +} + function test_labels { TAG_NAME_0="name_0" TAG_VALUE_0="value_0_1" \ TAG_NAME_1="name_1" TAG_VALUE_1="value_1_1" \ @@ -449,8 +489,9 @@ test_query_restrict_metrics_type test_prometheus_query_native_timeout test_query_restrict_tags test_prometheus_remote_write_map_tags -test_series -test_labels +test_series +test_label_query_limits_applied +test_labels echo "Running function correctness tests" test_correctness diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 014db1eb6f..cbbc3bd46d 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1404,9 +1404,10 @@ func (i *nsIndex) AggregateQuery( query index.Query, opts index.AggregationOptions, ) (index.AggregateQueryResult, error) { + id := i.nsMetadata.ID() logFields := []opentracinglog.Field{ opentracinglog.String("query", query.String()), - opentracinglog.String("namespace", i.nsMetadata.ID().String()), + opentracinglog.String("namespace", id.String()), opentracinglog.Int("seriesLimit", opts.SeriesLimit), opentracinglog.Int("docsLimit", opts.DocsLimit), xopentracing.Time("queryStart", opts.StartInclusive), @@ -1417,12 +1418,15 @@ func (i *nsIndex) AggregateQuery( sp.LogFields(logFields...) defer sp.Finish() + metrics := index.NewAggregateUsageMetrics(id, i.opts.InstrumentOptions()) // Get results and set the filters, namespace ID and size limit. results := i.aggregateResultsPool.Get() aopts := index.AggregateResultsOptions{ - SizeLimit: opts.SeriesLimit, - FieldFilter: opts.FieldFilter, - Type: opts.Type, + SizeLimit: opts.SeriesLimit, + DocsLimit: opts.DocsLimit, + FieldFilter: opts.FieldFilter, + Type: opts.Type, + AggregateUsageMetrics: metrics, } ctx.RegisterFinalizer(results) // use appropriate fn to query underlying blocks. @@ -1441,7 +1445,7 @@ func (i *nsIndex) AggregateQuery( } } aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe() - results.Reset(i.nsMetadata.ID(), aopts) + results.Reset(id, aopts) exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields) if err != nil { return index.AggregateQueryResult{}, err @@ -1706,7 +1710,16 @@ func (i *nsIndex) execBlockQueryFn( sp.LogFields(logFields...) defer sp.Finish() - blockExhaustive, err := block.Query(ctx, cancellable, query, opts, results, logFields) + docResults, ok := results.(index.DocumentResults) + if !ok { // should never happen + state.Lock() + err := fmt.Errorf("unknown results type [%T] received during query", results) + state.multiErr = state.multiErr.Add(err) + state.Unlock() + return + } + + blockExhaustive, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1744,7 +1757,16 @@ func (i *nsIndex) execBlockWideQueryFn( sp.LogFields(logFields...) defer sp.Finish() - _, err := block.Query(ctx, cancellable, query, opts, results, logFields) + docResults, ok := results.(index.DocumentResults) + if !ok { // should never happen + state.Lock() + err := fmt.Errorf("unknown results type [%T] received during wide query", results) + state.multiErr = state.multiErr.Add(err) + state.Unlock() + return + } + + _, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index be08767c8b..ac3cd7c194 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -21,22 +21,18 @@ package index import ( - "fmt" + "math" "sync" "time" - "github.com/m3db/m3/src/m3ninx/doc" + "github.com/uber-go/tally" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" ) -const missingDocumentFields = "invalid document fields: empty %s" - -// NB: emptyValues is an AggregateValues with no values, used for tracking -// terms only rather than terms and values. -var emptyValues = AggregateValues{hasValues: false} - type aggregatedResults struct { sync.RWMutex @@ -44,16 +40,87 @@ type aggregatedResults struct { aggregateOpts AggregateResultsOptions resultsMap *AggregateResultsMap + size int totalDocsCount int idPool ident.Pool bytesPool pool.CheckedBytesPool - pool AggregateResultsPool - valuesPool AggregateValuesPool - + pool AggregateResultsPool + valuesPool AggregateValuesPool encodedDocReader docs.EncodedDocumentReader resultDuration ResultDurations + + iOpts instrument.Options +} + +var _ AggregateUsageMetrics = (*usageMetrics)(nil) + +type usageMetrics struct { + total tally.Counter + + totalTerms tally.Counter + dedupedTerms tally.Counter + + totalFields tally.Counter + dedupedFields tally.Counter +} + +func (m *usageMetrics) IncTotal(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.total != nil { + m.total.Inc(val) + } +} + +func (m *usageMetrics) IncTotalTerms(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.totalTerms != nil { + m.totalTerms.Inc(val) + } +} + +func (m *usageMetrics) IncDedupedTerms(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.dedupedTerms != nil { + m.dedupedTerms.Inc(val) + } +} + +func (m *usageMetrics) IncTotalFields(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.totalFields != nil { + m.totalFields.Inc(val) + } +} + +func (m *usageMetrics) IncDedupedFields(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.dedupedFields != nil { + m.dedupedFields.Inc(val) + } +} + +// NewAggregateUsageMetrics builds a new aggregated usage metrics. +func NewAggregateUsageMetrics(ns ident.ID, iOpts instrument.Options) AggregateUsageMetrics { + if ns == nil { + return &usageMetrics{} + } + + scope := iOpts.MetricsScope() + buildCounter := func(val string) tally.Counter { + return scope. + Tagged(map[string]string{"type": val, "namespace": ns.String()}). + Counter("aggregated-results") + } + + return &usageMetrics{ + total: buildCounter("total"), + totalTerms: buildCounter("total-terms"), + dedupedTerms: buildCounter("deduped-terms"), + totalFields: buildCounter("total-fields"), + dedupedFields: buildCounter("deduped-fields"), + } } // NewAggregateResults returns a new AggregateResults object. @@ -62,9 +129,14 @@ func NewAggregateResults( aggregateOpts AggregateResultsOptions, opts Options, ) AggregateResults { + if aggregateOpts.AggregateUsageMetrics == nil { + aggregateOpts.AggregateUsageMetrics = &usageMetrics{} + } + return &aggregatedResults{ nsID: namespaceID, aggregateOpts: aggregateOpts, + iOpts: opts.InstrumentOptions(), resultsMap: newAggregateResultsMap(opts.IdentifierPool()), idPool: opts.IdentifierPool(), bytesPool: opts.CheckedBytesPool(), @@ -99,8 +171,11 @@ func (r *aggregatedResults) Reset( ) { r.Lock() - r.aggregateOpts = aggregateOpts + if aggregateOpts.AggregateUsageMetrics == nil { + aggregateOpts.AggregateUsageMetrics = NewAggregateUsageMetrics(nsID, r.iOpts) + } + r.aggregateOpts = aggregateOpts // finalize existing held nsID if r.nsID != nil { r.nsID.Finalize() @@ -117,10 +192,10 @@ func (r *aggregatedResults) Reset( valueMap := entry.Value() valueMap.finalize() } - // reset all keys in the map next r.resultsMap.Reset() r.totalDocsCount = 0 + r.size = 0 r.resultDuration = ResultDurations{} @@ -129,195 +204,124 @@ func (r *aggregatedResults) Reset( r.Unlock() } -func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) { - r.Lock() - err := r.addDocumentsBatchWithLock(batch) - size := r.resultsMap.Len() - docsCount := r.totalDocsCount + len(batch) - r.totalDocsCount = docsCount - r.Unlock() - return size, docsCount, err -} - func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { return r.aggregateOpts } func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { r.Lock() - valueInsertions := 0 - for _, entry := range batch { //nolint:gocritic - f := entry.Field - aggValues, ok := r.resultsMap.Get(f) - if !ok { - aggValues = r.valuesPool.Get() - // we can avoid the copy because we assume ownership of the passed ident.ID, - // but still need to finalize it. - r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) - } else { - // because we already have a entry for this field, we release the ident back to - // the underlying pool. - f.Finalize() - } - valuesMap := aggValues.Map() - for _, t := range entry.Terms { - if !valuesMap.Contains(t) { - // we can avoid the copy because we assume ownership of the passed ident.ID, - // but still need to finalize it. - valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) - valueInsertions++ - } else { - // because we already have a entry for this term, we release the ident back to - // the underlying pool. - t.Finalize() - } - } - } - size := r.resultsMap.Len() - docsCount := r.totalDocsCount + valueInsertions - r.totalDocsCount = docsCount - r.Unlock() - return size, docsCount -} - -func (r *aggregatedResults) addDocumentsBatchWithLock( - batch []doc.Document, -) error { - for _, doc := range batch { //nolint:gocritic - switch r.aggregateOpts.Type { - case AggregateTagNamesAndValues: - if err := r.addDocumentWithLock(doc); err != nil { - return err - } - - case AggregateTagNames: - // NB: if aggregating by name only, can ignore any additional documents - // after the result map size exceeds the optional size limit, since all - // incoming terms are either duplicates or new values which will exceed - // the limit. - size := r.resultsMap.Len() - if r.aggregateOpts.SizeLimit > 0 && size >= r.aggregateOpts.SizeLimit { - return nil - } - - if err := r.addDocumentTermsWithLock(doc); err != nil { - return err - } - default: - return fmt.Errorf("unsupported aggregation type: %v", r.aggregateOpts.Type) - } - } - - return nil -} + defer r.Unlock() -func (r *aggregatedResults) addDocumentTermsWithLock( - container doc.Document, -) error { - document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) - if err != nil { - return fmt.Errorf("unable to decode encoded document; %w", err) + // NB: init total count with batch length, since each aggregated entry + // will have one field. + totalCount := len(batch) + for idx := 0; idx < len(batch); idx++ { + totalCount += len(batch[idx].Terms) } - for _, field := range document.Fields { //nolint:gocritic - if err := r.addTermWithLock(field.Name); err != nil { - return fmt.Errorf("unable to add document terms [%+v]: %w", document, err) - } - } - - return nil -} -func (r *aggregatedResults) addTermWithLock( - term []byte, -) error { - if len(term) == 0 { - return fmt.Errorf(missingDocumentFields, "term") + r.aggregateOpts.AggregateUsageMetrics.IncTotal(int64(totalCount)) + remainingDocs := math.MaxInt64 + if r.aggregateOpts.DocsLimit != 0 { + remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount } - // if a term filter is provided, ensure this field matches the filter, - // otherwise ignore it. - filter := r.aggregateOpts.FieldFilter - if filter != nil && !filter.Allow(term) { - return nil - } + // NB: already hit doc limit. + if remainingDocs <= 0 { + for idx := 0; idx < len(batch); idx++ { + batch[idx].Field.Finalize() + r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1) + for _, term := range batch[idx].Terms { + r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1) + term.Finalize() + } + } - // NB: can cast the []byte -> ident.ID to avoid an alloc - // before we're sure we need it. - termID := ident.BytesID(term) - if r.resultsMap.Contains(termID) { - // NB: this term is already added; continue. - return nil + return r.size, r.totalDocsCount } - // Set results map to an empty AggregateValues since we only care about - // existence of the term in the map, rather than its set of values. - r.resultsMap.Set(termID, emptyValues) - return nil -} - -func (r *aggregatedResults) addDocumentWithLock( - container doc.Document, -) error { - document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) - if err != nil { - return fmt.Errorf("unable to decode encoded document; %w", err) - } - for _, field := range document.Fields { //nolint:gocritic - if err := r.addFieldWithLock(field.Name, field.Value); err != nil { - return fmt.Errorf("unable to add document [%+v]: %w", document, err) + // NB: cannot insert more than max docs, so that acts as the upper bound here. + remainingInserts := remainingDocs + if r.aggregateOpts.SizeLimit != 0 { + if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < remainingInserts { + remainingInserts = remaining } } - return nil -} - -func (r *aggregatedResults) addFieldWithLock( - term []byte, - value []byte, -) error { - if len(term) == 0 { - return fmt.Errorf(missingDocumentFields, "term") - } + var ( + docs int + numInserts int + entry AggregateResultsEntry + ) + + for idx := 0; idx < len(batch); idx++ { + entry = batch[idx] + r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1) + + if docs >= remainingDocs || numInserts >= remainingInserts { + entry.Field.Finalize() + for _, term := range entry.Terms { + r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1) + term.Finalize() + } - // if a term filter is provided, ensure this field matches the filter, - // otherwise ignore it. - filter := r.aggregateOpts.FieldFilter - if filter != nil && !filter.Allow(term) { - return nil - } + r.size += numInserts + r.totalDocsCount += docs + return r.size, r.totalDocsCount + } - // NB: can cast the []byte -> ident.ID to avoid an alloc - // before we're sure we need it. - termID := ident.BytesID(term) - valueID := ident.BytesID(value) + docs++ + f := entry.Field + aggValues, ok := r.resultsMap.Get(f) + if !ok { + if remainingInserts > numInserts { + r.aggregateOpts.AggregateUsageMetrics.IncDedupedFields(1) - valueMap, found := r.resultsMap.Get(termID) - if found { - return valueMap.addValue(valueID) - } + numInserts++ + aggValues = r.valuesPool.Get() + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + } else { + // this value exceeds the limit, so should be released to the underling + // pool without adding to the map. + f.Finalize() + } + } else { + // because we already have a entry for this field, we release the ident back to + // the underlying pool. + f.Finalize() + } - // NB: if over limit, do not add any new values to the map. - if r.aggregateOpts.SizeLimit > 0 && - r.resultsMap.Len() >= r.aggregateOpts.SizeLimit { - // Early return if limit enforced and we hit our limit. - return nil - } + valuesMap := aggValues.Map() + for _, t := range entry.Terms { + r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1) + if remainingDocs > docs { + docs++ + if !valuesMap.Contains(t) { + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + if remainingInserts > numInserts { + r.aggregateOpts.AggregateUsageMetrics.IncDedupedTerms(1) + valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + numInserts++ + continue + } + } + } - aggValues := r.valuesPool.Get() - if err := aggValues.addValue(valueID); err != nil { - // Return these values to the pool. - r.valuesPool.Put(aggValues) - return err + t.Finalize() + } } - r.resultsMap.Set(termID, aggValues) - return nil + r.size += numInserts + r.totalDocsCount += docs + return r.size, r.totalDocsCount } func (r *aggregatedResults) Namespace() ident.ID { @@ -336,9 +340,9 @@ func (r *aggregatedResults) Map() *AggregateResultsMap { func (r *aggregatedResults) Size() int { r.RLock() - l := r.resultsMap.Len() + size := r.size r.RUnlock() - return l + return size } func (r *aggregatedResults) TotalDocsCount() int { diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index 7a514b484b..085350a023 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -21,436 +21,264 @@ package index import ( - "bytes" + "sort" "testing" - "github.com/m3db/m3/src/m3ninx/doc" - "github.com/m3db/m3/src/x/ident" - xtest "github.com/m3db/m3/src/x/test" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" ) -func genDoc(strs ...string) doc.Document { - if len(strs)%2 != 0 { - panic("invalid test setup; need even str length") - } +func entries(entries ...AggregateResultsEntry) []AggregateResultsEntry { return entries } - fields := make([]doc.Field, len(strs)/2) - for i := range fields { - fields[i] = doc.Field{ - Name: []byte(strs[i*2]), - Value: []byte(strs[i*2+1]), - } +func genResultsEntry(field string, terms ...string) AggregateResultsEntry { + entryTerms := make([]ident.ID, 0, len(terms)) + for _, term := range terms { + entryTerms = append(entryTerms, ident.StringID(term)) } - return doc.NewDocumentFromMetadata(doc.Metadata{Fields: fields}) -} - -func TestAggResultsInsertInvalid(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - assert.True(t, res.EnforceLimits()) - - dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) - size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsInsertEmptyTermValue(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - dValidEmptyTerm := genDoc("foo", "") - size, docsCount, err := res.AddDocuments([]doc.Document{dValidEmptyTerm}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) -} - -func TestAggResultsInsertBatchOfTwo(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("d1", "") - d2 := genDoc("d2", "") - size, docsCount, err := res.AddDocuments([]doc.Document{d1, d2}) - require.NoError(t, err) - require.Equal(t, 2, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 2, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsTermOnlyInsert(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) - size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) - - valid := genDoc("foo", "") - size, docsCount, err = res.AddDocuments([]doc.Document{valid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 3, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 3, res.TotalDocsCount()) -} - -func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { - dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - size, docsCount, err = res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsInsertIdempotency(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - testAggResultsInsertIdempotency(t, res) -} - -func TestAggResultsTermOnlyInsertIdempotency(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - testAggResultsInsertIdempotency(t, res) -} - -func TestInvalidAggregateType(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: 100, - }, testOpts) - dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) -} - -func TestAggResultsSameName(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - rMap := res.Map() - aggVals, ok := rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.Equal(t, 1, aggVals.Size()) - assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) - - d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Document{d2}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - aggVals, ok = rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.Equal(t, 2, aggVals.Size()) - assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) - assert.True(t, aggVals.Map().Contains(ident.StringID("biz"))) -} - -func assertNoValuesInNameOnlyAggregate(t *testing.T, v AggregateValues) { - assert.False(t, v.hasValues) - assert.Nil(t, v.valuesMap) - assert.Nil(t, v.pool) - - assert.Equal(t, 0, v.Size()) - assert.Nil(t, v.Map()) - assert.False(t, v.HasValues()) -} - -func TestAggResultsTermOnlySameName(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - rMap := res.Map() - aggVals, ok := rMap.Get(ident.StringID("foo")) - require.True(t, ok) - assertNoValuesInNameOnlyAggregate(t, aggVals) - - d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Document{d2}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - aggVals, ok = rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.False(t, aggVals.hasValues) - assertNoValuesInNameOnlyAggregate(t, aggVals) + return AggregateResultsEntry{ + Field: ident.StringID(field), + Terms: entryTerms, + } } -func addMultipleDocuments(t *testing.T, res AggregateResults) (int, int) { - _, _, err := res.AddDocuments([]doc.Document{ - genDoc("foo", "bar"), - genDoc("fizz", "bar"), - genDoc("buzz", "bar"), - }) - require.NoError(t, err) - - _, _, err = res.AddDocuments([]doc.Document{ - genDoc("foo", "biz"), - genDoc("fizz", "bar"), - }) - require.NoError(t, err) - - size, docsCount, err := res.AddDocuments([]doc.Document{ - genDoc("foo", "baz", "buzz", "bag", "qux", "qaz"), - }) - - require.NoError(t, err) - return size, docsCount -} +func toMap(res AggregateResults) map[string][]string { + entries := res.Map().Iter() + resultMap := make(map[string][]string, len(entries)) + for _, entry := range entries { //nolint:gocritic + terms := entry.value.Map().Iter() + resultTerms := make([]string, 0, len(terms)) + for _, term := range terms { + resultTerms = append(resultTerms, term.Key().String()) + } -func toFilter(strs ...string) AggregateFieldFilter { - b := make([][]byte, len(strs)) - for i, s := range strs { - b[i] = []byte(s) + sort.Strings(resultTerms) + resultMap[entry.Key().String()] = resultTerms } - return AggregateFieldFilter(b) + return resultMap } -var mergeTests = []struct { - name string - opts AggregateResultsOptions - expected map[string][]string -}{ - { - name: "no limit no filter", - opts: AggregateResultsOptions{}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, - "qux": {"qaz"}, +func TestWithLimits(t *testing.T) { + tests := []struct { + name string + entries []AggregateResultsEntry + sizeLimit int + docLimit int + exSeries int + exDocs int + expected map[string][]string + exMetrics map[string]int64 + }{ + { + name: "single term", + entries: entries(genResultsEntry("foo")), + exSeries: 1, + exDocs: 1, + expected: map[string][]string{"foo": {}}, + + exMetrics: map[string]int64{ + "total": 1, "total-fields": 1, "deduped-fields": 1, + "total-terms": 0, "deduped-terms": 0, + }, }, - }, - { - name: "with limit no filter", - opts: AggregateResultsOptions{SizeLimit: 2}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, + { + name: "same term", + entries: entries(genResultsEntry("foo"), genResultsEntry("foo")), + exSeries: 1, + exDocs: 2, + expected: map[string][]string{"foo": {}}, + exMetrics: map[string]int64{ + "total": 2, "total-fields": 2, "deduped-fields": 1, + "total-terms": 0, "deduped-terms": 0, + }, }, - }, - { - name: "no limit empty filter", - opts: AggregateResultsOptions{FieldFilter: toFilter()}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, - "qux": {"qaz"}, + { + name: "multiple terms", + entries: entries(genResultsEntry("foo"), genResultsEntry("bar")), + exSeries: 2, + exDocs: 2, + expected: map[string][]string{"foo": {}, "bar": {}}, + exMetrics: map[string]int64{ + "total": 2, "total-fields": 2, "deduped-fields": 2, + "total-terms": 0, "deduped-terms": 0, + }, }, - }, - { - name: "no limit matchless filter", - opts: AggregateResultsOptions{FieldFilter: toFilter("zig")}, - expected: map[string][]string{}, - }, - { - name: "empty limit with filter", - opts: AggregateResultsOptions{FieldFilter: toFilter("buzz")}, - expected: map[string][]string{ - "buzz": {"bar", "bag"}, + { + name: "single entry", + entries: entries(genResultsEntry("foo", "bar")), + exSeries: 2, + exDocs: 2, + expected: map[string][]string{"foo": {"bar"}}, + exMetrics: map[string]int64{ + "total": 2, "total-fields": 1, "deduped-fields": 1, + "total-terms": 1, "deduped-terms": 1, + }, }, - }, - { - name: "with limit with filter", - opts: AggregateResultsOptions{ - SizeLimit: 2, FieldFilter: toFilter("buzz", "qux", "fizz"), + { + name: "single entry multiple fields", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "baz", "qux")), + exSeries: 4, + exDocs: 6, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 6, "total-fields": 1, "deduped-fields": 1, + "total-terms": 5, "deduped-terms": 3, + }, }, - expected: map[string][]string{ - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, + { + name: "multiple entry multiple fields", + entries: entries( + genResultsEntry("foo", "bar", "baz"), + genResultsEntry("foo", "baz", "baz", "qux")), + exSeries: 4, + exDocs: 7, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 7, "total-fields": 2, "deduped-fields": 1, + "total-terms": 5, "deduped-terms": 3, + }, + }, + { + name: "multiple entries", + entries: entries(genResultsEntry("foo", "baz"), genResultsEntry("bar", "baz", "qux")), + exSeries: 5, + exDocs: 5, + expected: map[string][]string{"foo": {"baz"}, "bar": {"baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 2, "deduped-fields": 2, + "total-terms": 3, "deduped-terms": 3, + }, }, - }, -} -func TestAggResultsMerge(t *testing.T) { - for _, tt := range mergeTests { - t.Run(tt.name, func(t *testing.T) { - res := NewAggregateResults(nil, tt.opts, testOpts) - size, docsCount := addMultipleDocuments(t, res) - - require.Equal(t, len(tt.expected), size) - require.Equal(t, 6, docsCount) - ac := res.Map() - require.Equal(t, len(tt.expected), ac.Len()) - for k, v := range tt.expected { - aggVals, ok := ac.Get(ident.StringID(k)) - require.True(t, ok) - require.Equal(t, len(v), aggVals.Size()) - for _, actual := range v { - require.True(t, aggVals.Map().Contains(ident.StringID(actual))) - } - } - }) - } -} + { + name: "single entry query at size limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), + sizeLimit: 4, + exSeries: 4, + exDocs: 5, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 1, "deduped-fields": 1, + "total-terms": 4, "deduped-terms": 3, + }, + }, + { + name: "single entry query at doc limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), + docLimit: 5, + exSeries: 4, + exDocs: 5, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 1, "deduped-fields": 1, + "total-terms": 4, "deduped-terms": 3, + }, + }, -func TestAggResultsMergeNameOnly(t *testing.T) { - for _, tt := range mergeTests { - t.Run(tt.name+" name only", func(t *testing.T) { - tt.opts.Type = AggregateTagNames - res := NewAggregateResults(nil, tt.opts, testOpts) - size, docsCount := addMultipleDocuments(t, res) - - require.Equal(t, len(tt.expected), size) - require.Equal(t, 6, docsCount) - - ac := res.Map() - require.Equal(t, len(tt.expected), ac.Len()) - for k := range tt.expected { - aggVals, ok := ac.Get(ident.StringID(k)) - require.True(t, ok) - assertNoValuesInNameOnlyAggregate(t, aggVals) - } - }) + { + name: "single entry query below size limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "qux")), + sizeLimit: 3, + exSeries: 3, + exDocs: 4, + expected: map[string][]string{"foo": {"bar", "baz"}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 1, "deduped-fields": 1, + "total-terms": 3, "deduped-terms": 2, + }, + }, + { + name: "single entry query below doc limit", + entries: entries(genResultsEntry("foo", "bar", "bar", "bar", "baz")), + docLimit: 3, + exSeries: 2, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 1, "deduped-fields": 1, + "total-terms": 4, "deduped-terms": 1, + }, + }, + { + name: "multiple entry query below series limit", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + sizeLimit: 3, + exSeries: 3, + exDocs: 4, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 2, "deduped-fields": 2, + "total-terms": 2, "deduped-terms": 1, + }, + }, + { + name: "multiple entry query below doc limit", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + docLimit: 3, + exSeries: 3, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 2, "deduped-fields": 2, + "total-terms": 2, "deduped-terms": 1, + }, + }, + { + name: "multiple entry query both limits", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + docLimit: 3, + sizeLimit: 10, + exSeries: 3, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 2, "deduped-fields": 2, + "total-terms": 2, "deduped-terms": 1, + }, + }, } -} -func TestAggResultsInsertCopies(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - dValid := genDoc("foo", "bar") - - d, ok := dValid.Metadata() - require.True(t, ok) - name := d.Fields[0].Name - value := d.Fields[0].Value - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - found := false - - // our genny generated maps don't provide access to MapEntry directly, - // so we iterate over the map to find the added entry. Could avoid this - // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` - for _, entry := range res.Map().Iter() { - // see if this key has the same value as the added document's ID. - n := entry.Key().Bytes() - if !bytes.Equal(name, n) { - continue - } - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) - v := entry.Value() - for _, f := range v.Map().Iter() { - v := f.Key().Bytes() - if !bytes.Equal(value, v) { - continue + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + res := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + SizeLimit: tt.sizeLimit, + DocsLimit: tt.docLimit, + AggregateUsageMetrics: NewAggregateUsageMetrics(ident.StringID("ns"), iOpts), + }, testOpts) + + size, docsCount := res.AddFields(tt.entries) + assert.Equal(t, tt.exSeries, size) + assert.Equal(t, tt.exDocs, docsCount) + assert.Equal(t, tt.exSeries, res.Size()) + assert.Equal(t, tt.exDocs, res.TotalDocsCount()) + + assert.Equal(t, tt.expected, toMap(res)) + + counters := scope.Snapshot().Counters() + actualCounters := make(map[string]int64, len(counters)) + for _, v := range counters { + actualCounters[v.Tags()["type"]] = v.Value() } - found = true - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(v, value)) - } - } - - require.True(t, found) -} - -func TestAggResultsNameOnlyInsertCopies(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - dValid := genDoc("foo", "bar") - d, ok := dValid.Metadata() - require.True(t, ok) - name := d.Fields[0].Name - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - found := false - // our genny generated maps don't provide access to MapEntry directly, - // so we iterate over the map to find the added entry. Could avoid this - // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` - for _, entry := range res.Map().Iter() { - // see if this key has the same value as the added document's ID. - n := entry.Key().Bytes() - if !bytes.Equal(name, n) { - continue - } - - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) - found = true - assertNoValuesInNameOnlyAggregate(t, entry.Value()) + assert.Equal(t, tt.exMetrics, actualCounters) + }) } - - require.True(t, found) } func TestAggResultsReset(t *testing.T) { res := NewAggregateResults(ident.StringID("qux"), AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) + size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) + require.Equal(t, 2, size) + require.Equal(t, 2, docsCount) aggVals, ok := res.Map().Get(ident.StringID("foo")) require.True(t, ok) @@ -496,11 +324,9 @@ func TestAggResultsResetNamespaceClones(t *testing.T) { func TestAggResultFinalize(t *testing.T) { // Create a Results and insert some data. res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) + size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) + require.Equal(t, 2, size) + require.Equal(t, 2, docsCount) // Ensure the data is present. rMap := res.Map() @@ -522,3 +348,29 @@ func TestAggResultFinalize(t *testing.T) { require.False(t, id.IsNoFinalize()) } } + +func TestResetUpdatesMetics(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + testOpts = testOpts.SetInstrumentOptions(iOpts) + res := NewAggregateResults(nil, AggregateResultsOptions{ + AggregateUsageMetrics: NewAggregateUsageMetrics(ident.StringID("ns1"), iOpts), + }, testOpts) + res.AddFields(entries(genResultsEntry("foo"))) + res.Reset(ident.StringID("ns2"), AggregateResultsOptions{}) + res.AddFields(entries(genResultsEntry("bar"))) + + counters := scope.Snapshot().Counters() + seenNamespaces := make(map[string]struct{}) + for _, v := range counters { + ns := v.Tags()["namespace"] + seenNamespaces[ns] = struct{}{} + } + + assert.Equal(t, map[string]struct{}{ + "ns1": {}, + "ns2": {}, + }, seenNamespaces) + + res.Finalize() +} diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index d4c3b886bd..a0dde98dbf 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -149,7 +149,6 @@ type block struct { blockOpts BlockOptions nsMD namespace.Metadata namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager - queryLimits limits.QueryLimits docsLimit limits.LookbackLimit metrics blockMetrics @@ -259,7 +258,6 @@ func NewBlock( namespaceRuntimeOptsMgr: namespaceRuntimeOptsMgr, metrics: newBlockMetrics(scope), logger: iopts.Logger(), - queryLimits: opts.QueryLimits(), docsLimit: opts.QueryLimits().DocsLimit(), } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator @@ -412,7 +410,7 @@ func (b *block) Query( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, logFields []opentracinglog.Field, ) (bool, error) { ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) @@ -434,7 +432,7 @@ func (b *block) queryWithSpan( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, sp opentracing.Span, logFields []opentracinglog.Field, ) (bool, error) { @@ -549,7 +547,7 @@ func (b *block) closeAsync(closer io.Closer) { func (b *block) addQueryResults( cancellable *xresource.CancellableLifetime, - results BaseResults, + results DocumentResults, batch []doc.Document, source []byte, ) ([]doc.Document, int, int, error) { @@ -567,7 +565,7 @@ func (b *block) addQueryResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the xresource. + // try to add the docs to the resource. size, docsCount, err := results.AddDocuments(batch) // immediately release the checkout on the lifetime of query. @@ -658,15 +656,21 @@ func (b *block) aggregateWithSpan( } var ( - source = opts.Source - size = results.Size() - docsCount = results.TotalDocsCount() - batch = b.opts.AggregateResultsEntryArrayPool().Get() - batchSize = cap(batch) - iterClosed = false // tracking whether we need to free the iterator at the end. + source = opts.Source + size = results.Size() + resultCount = results.TotalDocsCount() + batch = b.opts.AggregateResultsEntryArrayPool().Get() + maxBatch = cap(batch) + iterClosed = false // tracking whether we need to free the iterator at the end. + fieldAppended bool + termAppended bool + lastField []byte + batchedFields int + currFields int + currTerms int ) - if batchSize == 0 { - batchSize = defaultAggregateResultsEntryBatchSize + if maxBatch == 0 { + maxBatch = defaultAggregateResultsEntryBatchSize } // cleanup at the end @@ -692,8 +696,16 @@ func (b *block) aggregateWithSpan( })) } + if opts.SeriesLimit > 0 && opts.SeriesLimit < maxBatch { + maxBatch = opts.SeriesLimit + } + + if opts.DocsLimit > 0 && opts.DocsLimit < maxBatch { + maxBatch = opts.DocsLimit + } + for _, reader := range readers { - if opts.LimitsExceeded(size, docsCount) { + if opts.LimitsExceeded(size, resultCount) { break } @@ -702,22 +714,63 @@ func (b *block) aggregateWithSpan( return false, err } iterClosed = false // only once the iterator has been successfully Reset(). - for iter.Next() { - if opts.LimitsExceeded(size, docsCount) { + if opts.LimitsExceeded(size, resultCount) { break } field, term := iter.Current() - batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) - if len(batch) < batchSize { + + // TODO: remove this legacy doc tracking implementation when alternative + // limits are in place. + if results.EnforceLimits() { + if lastField == nil { + lastField = append(lastField, field...) + batchedFields++ + if err := b.docsLimit.Inc(1, source); err != nil { + return false, err + } + } else if !bytes.Equal(lastField, field) { + lastField = lastField[:0] + lastField = append(lastField, field...) + batchedFields++ + if err := b.docsLimit.Inc(1, source); err != nil { + return false, err + } + } + + // NB: this logic increments the doc count to account for where the + // legacy limits would have been updated. It increments by two to + // reflect the term appearing as both the last element of the previous + // batch, as well as the first element in the next batch. + if batchedFields > maxBatch { + if err := b.docsLimit.Inc(2, source); err != nil { + return false, err + } + + batchedFields = 1 + } + } + + batch, fieldAppended, termAppended = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) + if fieldAppended { + currFields++ + } + if termAppended { + currTerms++ + } + // continue appending to the batch until we hit our max batch size. + if currFields+currTerms < maxBatch { continue } - batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) + batch, size, resultCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } + + currFields = 0 + currTerms = 0 } if err := iter.Err(); err != nil { @@ -731,21 +784,25 @@ func (b *block) aggregateWithSpan( } // Add last batch to results if remaining. - if len(batch) > 0 { - batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) + for len(batch) > 0 { + batch, size, resultCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } } - return opts.exhaustive(size, docsCount), nil + return opts.exhaustive(size, resultCount), nil } +// appendFieldAndTermToBatch adds the provided field / term onto the batch, +// optionally reusing the last element of the batch if it pertains to the same field. +// First boolean result indicates that a unique field was added to the batch +// and the second boolean indicates if a unique term was added. func (b *block) appendFieldAndTermToBatch( batch []AggregateResultsEntry, field, term []byte, includeTerms bool, -) []AggregateResultsEntry { +) ([]AggregateResultsEntry, bool, bool) { // NB(prateek): we make a copy of the (field, term) entries returned // by the iterator during traversal, because the []byte are only valid per entry during // the traversal (i.e. calling Next() invalidates the []byte). We choose to do this @@ -754,10 +811,11 @@ func (b *block) appendFieldAndTermToBatch( // idents is transferred to the results map, which either hangs on to them (if they are new), // or finalizes them if they are duplicates. var ( - entry AggregateResultsEntry - lastField []byte - lastFieldIsValid bool - reuseLastEntry bool + entry AggregateResultsEntry + lastField []byte + lastFieldIsValid bool + reuseLastEntry bool + newFieldAdded, newTermAdded bool ) // we are iterating multiple segments so we may receive duplicates (same field/term), but // as we are iterating one segment at a time, and because the underlying index structures @@ -780,6 +838,7 @@ func (b *block) appendFieldAndTermToBatch( reuseLastEntry = true entry = batch[len(batch)-1] // avoid alloc cause we already have the field } else { + newFieldAdded = true // allocate id because this is the first time we've seen it // NB(r): Iterating fields FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -788,6 +847,7 @@ func (b *block) appendFieldAndTermToBatch( } if includeTerms { + newTermAdded = true // terms are always new (as far we know without checking the map for duplicates), so we allocate // NB(r): Iterating terms FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -800,7 +860,8 @@ func (b *block) appendFieldAndTermToBatch( } else { batch = append(batch, entry) } - return batch + + return batch, newFieldAdded, newTermAdded } func (b *block) pooledID(id []byte) ident.ID { @@ -811,19 +872,14 @@ func (b *block) pooledID(id []byte) ident.ID { return b.opts.IdentifierPool().BinaryID(data) } +// addAggregateResults adds the fields on the batch +// to the provided results and resets the batch to be reused. func (b *block) addAggregateResults( cancellable *xresource.CancellableLifetime, results AggregateResults, batch []AggregateResultsEntry, source []byte, ) ([]AggregateResultsEntry, int, int, error) { - // update recently queried docs to monitor memory. - if results.EnforceLimits() { - if err := b.docsLimit.Inc(len(batch), source); err != nil { - return batch, 0, 0, err - } - } - // checkout the lifetime of the query before adding results. queryValid := cancellable.TryCheckout() if !queryValid { @@ -831,8 +887,8 @@ func (b *block) addAggregateResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the xresource. - size, docsCount := results.AddFields(batch) + // try to add the docs to the resource. + size, resultCount := results.AddFields(batch) // immediately release the checkout on the lifetime of query. cancellable.ReleaseCheckout() @@ -845,7 +901,7 @@ func (b *block) addAggregateResults( batch = batch[:0] // return results. - return batch, size, docsCount, nil + return batch, size, resultCount, nil } func (b *block) AddResults( diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 8671c4be68..ec8d4764e0 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -1891,6 +1891,18 @@ func TestBlockAggregate(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(iOpts). + SetDocsLimitOpts(limits.LookbackLimitOptions{Limit: 50, Lookback: time.Minute}). + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + queryLimits, err := limits.NewQueryLimits((limitOpts)) + require.NoError(t, err) + testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits) + + // NB: seriesLimit must be higher than the number of fields to be exhaustive. + seriesLimit := 10 testMD := newTestNSMetadata(t) start := time.Now().Truncate(time.Hour) blk, err := NewBlock(start, testMD, BlockOptions{}, @@ -1913,7 +1925,7 @@ func TestBlockAggregate(t *testing.T) { } results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ - SizeLimit: 3, + SizeLimit: seriesLimit, Type: AggregateTagNamesAndValues, }, testOpts) @@ -1925,24 +1937,23 @@ func TestBlockAggregate(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - gomock.InOrder( - iter.EXPECT().Reset(reader, gomock.Any()).Return(nil), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")), - iter.EXPECT().Next().Return(false), - iter.EXPECT().Err().Return(nil), - iter.EXPECT().Close().Return(nil), - ) + iter.EXPECT().Reset(reader, gomock.Any()).Return(nil) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")) + iter.EXPECT().Next().Return(false) + iter.EXPECT().Err().Return(nil) + iter.EXPECT().Close().Return(nil) + exhaustive, err := b.Aggregate( ctx, xresource.NewCancellableLifetime(), - QueryOptions{SeriesLimit: 3}, + QueryOptions{SeriesLimit: seriesLimit}, results, emptyLogFields) require.NoError(t, err) @@ -1957,6 +1968,9 @@ func TestBlockAggregate(t *testing.T) { spans := mtr.FinishedSpans() require.Len(t, spans, 2) require.Equal(t, tracepoint.BlockAggregate, spans[0].OperationName) + + snap := scope.Snapshot() + tallytest.AssertCounterValue(t, 3, snap, "query-limit.total-docs-matched", nil) } func TestBlockAggregateNotExhaustive(t *testing.T) { @@ -2025,7 +2039,7 @@ func TestBlockAggregateNotExhaustive(t *testing.T) { require.False(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ - "f1": {"t1"}, + "f1": {}, }, results) sp.Finish() @@ -2420,10 +2434,8 @@ func TestBlockAggregateBatching(t *testing.T) { count += len(entry.Terms) } - // FIXME: this currently fails, but will be fixed after - // https://github.com/m3db/m3/pull/3133 is reverted. - // require.True(t, count <= tt.batchSize, - // fmt.Sprintf("batch %v exceeds batchSize %d", batch, tt.batchSize)) + require.True(t, count <= tt.batchSize, + fmt.Sprintf("batch %v exceeds batchSize %d", batch, tt.batchSize)) return addAggregateResultsFn(cancellable, results, batch, source) } diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 6db3378bd8..d33d2c13c0 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -166,32 +166,161 @@ func (mr *MockBaseResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockBaseResults)(nil).EnforceLimits)) } -// AddDocuments mocks base method -func (m *MockBaseResults) AddDocuments(batch []doc.Document) (int, int, error) { +// Finalize mocks base method +func (m *MockBaseResults) Finalize() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddDocuments", batch) + m.ctrl.Call(m, "Finalize") +} + +// Finalize indicates an expected call of Finalize +func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) +} + +// MockDocumentResults is a mock of DocumentResults interface +type MockDocumentResults struct { + ctrl *gomock.Controller + recorder *MockDocumentResultsMockRecorder +} + +// MockDocumentResultsMockRecorder is the mock recorder for MockDocumentResults +type MockDocumentResultsMockRecorder struct { + mock *MockDocumentResults +} + +// NewMockDocumentResults creates a new mock instance +func NewMockDocumentResults(ctrl *gomock.Controller) *MockDocumentResults { + mock := &MockDocumentResults{ctrl: ctrl} + mock.recorder = &MockDocumentResultsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDocumentResults) EXPECT() *MockDocumentResultsMockRecorder { + return m.recorder +} + +// Namespace mocks base method +func (m *MockDocumentResults) Namespace() ident.ID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Namespace") + ret0, _ := ret[0].(ident.ID) + return ret0 +} + +// Namespace indicates an expected call of Namespace +func (mr *MockDocumentResultsMockRecorder) Namespace() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Namespace", reflect.TypeOf((*MockDocumentResults)(nil).Namespace)) +} + +// Size mocks base method +func (m *MockDocumentResults) Size() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") ret0, _ := ret[0].(int) - ret1, _ := ret[1].(int) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + return ret0 } -// AddDocuments indicates an expected call of AddDocuments -func (mr *MockBaseResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { +// Size indicates an expected call of Size +func (mr *MockDocumentResultsMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockDocumentResults)(nil).Size)) +} + +// TotalDocsCount mocks base method +func (m *MockDocumentResults) TotalDocsCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDocsCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// TotalDocsCount indicates an expected call of TotalDocsCount +func (mr *MockDocumentResultsMockRecorder) TotalDocsCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockDocumentResults)(nil).TotalDocsCount)) +} + +// TotalDuration mocks base method +func (m *MockDocumentResults) TotalDuration() ResultDurations { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDuration") + ret0, _ := ret[0].(ResultDurations) + return ret0 +} + +// TotalDuration indicates an expected call of TotalDuration +func (mr *MockDocumentResultsMockRecorder) TotalDuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockDocumentResults)(nil).TotalDuration)) +} + +// AddBlockProcessingDuration mocks base method +func (m *MockDocumentResults) AddBlockProcessingDuration(duration time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddBlockProcessingDuration", duration) +} + +// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration +func (mr *MockDocumentResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockProcessingDuration), duration) +} + +// AddBlockSearchDuration mocks base method +func (m *MockDocumentResults) AddBlockSearchDuration(duration time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddBlockSearchDuration", duration) +} + +// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration +func (mr *MockDocumentResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockSearchDuration), duration) +} + +// EnforceLimits mocks base method +func (m *MockDocumentResults) EnforceLimits() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnforceLimits") + ret0, _ := ret[0].(bool) + return ret0 +} + +// EnforceLimits indicates an expected call of EnforceLimits +func (mr *MockDocumentResultsMockRecorder) EnforceLimits() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockBaseResults)(nil).AddDocuments), batch) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockDocumentResults)(nil).EnforceLimits)) } // Finalize mocks base method -func (m *MockBaseResults) Finalize() { +func (m *MockDocumentResults) Finalize() { m.ctrl.T.Helper() m.ctrl.Call(m, "Finalize") } // Finalize indicates an expected call of Finalize -func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { +func (mr *MockDocumentResultsMockRecorder) Finalize() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockDocumentResults)(nil).Finalize)) +} + +// AddDocuments mocks base method +func (m *MockDocumentResults) AddDocuments(batch []doc.Document) (int, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddDocuments", batch) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// AddDocuments indicates an expected call of AddDocuments +func (mr *MockDocumentResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockDocumentResults)(nil).AddDocuments), batch) } // MockQueryResults is a mock of QueryResults interface @@ -311,6 +440,18 @@ func (mr *MockQueryResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockQueryResults)(nil).EnforceLimits)) } +// Finalize mocks base method +func (m *MockQueryResults) Finalize() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Finalize") +} + +// Finalize indicates an expected call of Finalize +func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) +} + // AddDocuments mocks base method func (m *MockQueryResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() @@ -327,18 +468,6 @@ func (mr *MockQueryResultsMockRecorder) AddDocuments(batch interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockQueryResults)(nil).AddDocuments), batch) } -// Finalize mocks base method -func (m *MockQueryResults) Finalize() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Finalize") -} - -// Finalize indicates an expected call of Finalize -func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) -} - // Reset mocks base method func (m *MockQueryResults) Reset(nsID ident.ID, opts QueryResultsOptions) { m.ctrl.T.Helper() @@ -543,22 +672,6 @@ func (mr *MockAggregateResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockAggregateResults)(nil).EnforceLimits)) } -// AddDocuments mocks base method -func (m *MockAggregateResults) AddDocuments(batch []doc.Document) (int, int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddDocuments", batch) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(int) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// AddDocuments indicates an expected call of AddDocuments -func (mr *MockAggregateResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockAggregateResults)(nil).AddDocuments), batch) -} - // Finalize mocks base method func (m *MockAggregateResults) Finalize() { m.ctrl.T.Helper() @@ -626,6 +739,89 @@ func (mr *MockAggregateResultsMockRecorder) Map() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Map", reflect.TypeOf((*MockAggregateResults)(nil).Map)) } +// MockAggregateUsageMetrics is a mock of AggregateUsageMetrics interface +type MockAggregateUsageMetrics struct { + ctrl *gomock.Controller + recorder *MockAggregateUsageMetricsMockRecorder +} + +// MockAggregateUsageMetricsMockRecorder is the mock recorder for MockAggregateUsageMetrics +type MockAggregateUsageMetricsMockRecorder struct { + mock *MockAggregateUsageMetrics +} + +// NewMockAggregateUsageMetrics creates a new mock instance +func NewMockAggregateUsageMetrics(ctrl *gomock.Controller) *MockAggregateUsageMetrics { + mock := &MockAggregateUsageMetrics{ctrl: ctrl} + mock.recorder = &MockAggregateUsageMetricsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockAggregateUsageMetrics) EXPECT() *MockAggregateUsageMetricsMockRecorder { + return m.recorder +} + +// IncTotal mocks base method +func (m *MockAggregateUsageMetrics) IncTotal(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncTotal", val) +} + +// IncTotal indicates an expected call of IncTotal +func (mr *MockAggregateUsageMetricsMockRecorder) IncTotal(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncTotal", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncTotal), val) +} + +// IncTotalTerms mocks base method +func (m *MockAggregateUsageMetrics) IncTotalTerms(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncTotalTerms", val) +} + +// IncTotalTerms indicates an expected call of IncTotalTerms +func (mr *MockAggregateUsageMetricsMockRecorder) IncTotalTerms(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncTotalTerms", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncTotalTerms), val) +} + +// IncDedupedTerms mocks base method +func (m *MockAggregateUsageMetrics) IncDedupedTerms(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncDedupedTerms", val) +} + +// IncDedupedTerms indicates an expected call of IncDedupedTerms +func (mr *MockAggregateUsageMetricsMockRecorder) IncDedupedTerms(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncDedupedTerms", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncDedupedTerms), val) +} + +// IncTotalFields mocks base method +func (m *MockAggregateUsageMetrics) IncTotalFields(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncTotalFields", val) +} + +// IncTotalFields indicates an expected call of IncTotalFields +func (mr *MockAggregateUsageMetricsMockRecorder) IncTotalFields(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncTotalFields", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncTotalFields), val) +} + +// IncDedupedFields mocks base method +func (m *MockAggregateUsageMetrics) IncDedupedFields(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncDedupedFields", val) +} + +// IncDedupedFields indicates an expected call of IncDedupedFields +func (mr *MockAggregateUsageMetricsMockRecorder) IncDedupedFields(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncDedupedFields", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncDedupedFields), val) +} + // MockAggregateResultsPool is a mock of AggregateResultsPool interface type MockAggregateResultsPool struct { ctrl *gomock.Controller @@ -888,7 +1084,7 @@ func (mr *MockBlockMockRecorder) WriteBatch(inserts interface{}) *gomock.Call { } // Query mocks base method -func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results BaseResults, logFields []log.Field) (bool, error) { +func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results DocumentResults, logFields []log.Field) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Query", ctx, cancellable, query, opts, results, logFields) ret0, _ := ret[0].(bool) diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 9cebcaa286..c91bd4faa6 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -168,16 +168,23 @@ type BaseResults interface { // EnforceLimits returns whether this should enforce and increment limits. EnforceLimits() bool + // Finalize releases any resources held by the Results object, + // including returning it to a backing pool. + Finalize() +} + +// DocumentResults is a collection of query results that allow accumulation of +// document values, it is synchronized when access to the results set is used +// as documented by the methods. +type DocumentResults interface { + BaseResults + // AddDocuments adds the batch of documents to the results set, it will // take a copy of the bytes backing the documents so the original can be // modified after this function returns without affecting the results map. // TODO(r): We will need to change this behavior once index fields are // mutable and the most recent need to shadow older entries. AddDocuments(batch []doc.Document) (size, docsCount int, err error) - - // Finalize releases any resources held by the Results object, - // including returning it to a backing pool. - Finalize() } // ResultDurations holds various timing information for a query result. @@ -207,7 +214,7 @@ func (r ResultDurations) AddSearch(duration time.Duration) ResultDurations { // QueryResults is a collection of results for a query, it is synchronized // when access to the results set is used as documented by the methods. type QueryResults interface { - BaseResults + DocumentResults // Reset resets the Results object to initial state. Reset(nsID ident.ID, opts QueryResultsOptions) @@ -292,6 +299,9 @@ type AggregateResultsOptions struct { // overflown will return early successfully. SizeLimit int + // DocsLimit limits the amount of documents + DocsLimit int + // Type determines what result is required. Type AggregationType @@ -301,6 +311,24 @@ type AggregateResultsOptions struct { // RestrictByQuery is a query to restrict the set of documents that must // be present for an aggregated term to be returned. RestrictByQuery *Query + + // AggregateUsageMetrics are aggregate usage metrics that track field + // and term counts for aggregate queries. + AggregateUsageMetrics AggregateUsageMetrics +} + +// AggregateUsageMetrics are metrics for aggregate query usage. +type AggregateUsageMetrics interface { + // IncTotal increments the total metric count. + IncTotal(val int64) + // IncTotalTerms increments the totalTerms metric count. + IncTotalTerms(val int64) + // IncDedupedTerms increments the dedupedTerms metric count. + IncDedupedTerms(val int64) + // IncTotalFields increments the totalFields metric count. + IncTotalFields(val int64) + // IncDedupedFields increments the dedupedFields metric count. + IncDedupedFields(val int64) } // AggregateResultsAllocator allocates AggregateResults types. @@ -390,7 +418,7 @@ type Block interface { cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, logFields []opentracinglog.Field, ) (bool, error) diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index c257db3960..f209184034 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -39,6 +39,8 @@ var ErrWideQueryResultsExhausted = errors.New("no more values to add to wide que type shardFilterFn func(ident.ID) (uint32, bool) +var _ DocumentResults = (*wideResults)(nil) + type wideResults struct { sync.RWMutex size int @@ -74,7 +76,7 @@ func NewWideQueryResults( shardFilter shardFilterFn, collector chan *ident.IDBatch, opts WideQueryOptions, -) BaseResults { +) DocumentResults { batchSize := opts.BatchSize results := &wideResults{ nsID: namespaceID, diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index a4cbadcdfa..5438bcebb9 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -829,7 +829,7 @@ func TestLimits(t *testing.T) { cancellable interface{}, query interface{}, opts interface{}, - results index.BaseResults, + results index.DocumentResults, logFields interface{}) (bool, error) { _, _, err = results.AddDocuments([]doc.Document{ // Results in size=1 and docs=2.