From 85019868dbc1a76f5395616ac0013988423b2315 Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 29 Jan 2021 01:34:52 -0500 Subject: [PATCH 1/7] [dbnode] Add aggregate term limit regression test --- src/dbnode/storage/index/block.go | 13 +- src/dbnode/storage/index/block_prop_test.go | 221 +++++++++++++++++++- src/dbnode/storage/index/block_test.go | 202 ++++++++++++++++++ 3 files changed, 432 insertions(+), 4 deletions(-) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index cbb641e6e5..1b02e60b53 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -122,6 +122,13 @@ func (s shardRangesSegmentsByVolumeType) forEachSegmentGroup(cb func(group block return nil } +type addAggregateResultsFn func( + cancellable *xresource.CancellableLifetime, + results AggregateResults, + batch []AggregateResultsEntry, + source []byte, +) ([]AggregateResultsEntry, int, int, error) + // nolint: maligned type block struct { sync.RWMutex @@ -133,6 +140,7 @@ type block struct { shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn newExecutorWithRLockFn newExecutorFn + addAggregateResultsFn addAggregateResultsFn blockStart time.Time blockEnd time.Time blockSize time.Duration @@ -256,6 +264,7 @@ func NewBlock( } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator b.newExecutorWithRLockFn = b.executorWithRLock + b.addAggregateResultsFn = b.addAggregateResults return b, nil } @@ -697,7 +706,7 @@ func (b *block) aggregateWithSpan( continue } - batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) + batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } @@ -715,7 +724,7 @@ func (b *block) aggregateWithSpan( // Add last batch to results if remaining. if len(batch) > 0 { - batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) + batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 0624a3923a..31c2edd3bc 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -1,5 +1,3 @@ -// +build big -// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -23,26 +21,35 @@ package index import ( + stdlibctx "context" "errors" "fmt" "math/rand" "os" + "sort" "testing" "time" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/limits" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/fst" + "github.com/m3db/m3/src/m3ninx/index/segment/mem" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/m3ninx/search/proptest" "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xresource "github.com/m3db/m3/src/x/resource" + "github.com/opentracing/opentracing-go" + "github.com/uber-go/tally" "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/stretchr/testify/require" ) @@ -197,3 +204,213 @@ func newPropTestBlock(t *testing.T, blockStart time.Time, nsMeta namespace.Metad require.NoError(t, err) return blk, nil } + +type testFields struct { + name string + values []string +} + +func genField() gopter.Gen { + return gopter.CombineGens( + gen.AlphaString(), + gen.SliceOf(gen.AlphaString()), + ).Map(func(input []interface{}) testFields { + var ( + name = input[0].(string) + values = input[1].([]string) + ) + + return testFields{ + name: name, + values: values, + } + }) +} + +type propTestSegment struct { + metadata doc.Metadata + exCount int64 + segmentMap segmentMap +} + +type testValuesSet map[string]struct{} +type segmentMap map[string]testValuesSet + +func genTestSegment() gopter.Gen { + return gen.SliceOf(genField()).Map(func(input []testFields) propTestSegment { + segMap := make(segmentMap, len(input)) + for _, field := range input { + for _, value := range field.values { + exVals, found := segMap[field.name] + if !found { + exVals = make(testValuesSet) + } + exVals[value] = struct{}{} + segMap[field.name] = exVals + } + } + + fields := make([]testFields, 0, len(input)) + for name, valSet := range segMap { + vals := make([]string, 0, len(valSet)) + for val := range valSet { + vals = append(vals, val) + } + + sort.Strings(vals) + fields = append(fields, testFields{name: name, values: vals}) + } + + sort.Slice(fields, func(i, j int) bool { + return fields[i].name < fields[j].name + }) + + docFields := []doc.Field{} + for _, field := range fields { + for _, val := range field.values { + docFields = append(docFields, doc.Field{ + Name: []byte(field.name), + Value: []byte(val), + }) + } + } + + return propTestSegment{ + metadata: doc.Metadata{Fields: docFields}, + exCount: int64(len(segMap)), + segmentMap: segMap, + } + }) +} + +func TestGenSegments(t *testing.T) { + var ( + parameters = gopter.DefaultTestParameters() + seed = time.Now().UnixNano() + reporter = gopter.NewFormatedReporter(true, 160, os.Stdout) + ) + + parameters.MinSuccessfulTests = 1000 + parameters.MinSize = 5 + parameters.MaxSize = 10 + parameters.Rng = rand.New(rand.NewSource(seed)) + properties := gopter.NewProperties(parameters) + + properties.Property("segments dedupe and have correct docs counts", prop.ForAll( + func(testSegment propTestSegment) (bool, error) { + seg, err := mem.NewSegment(mem.NewOptions()) + if err != nil { + return false, err + } + + _, err = seg.Insert(testSegment.metadata) + if err != nil { + return false, err + } + + err = seg.Seal() + if err != nil { + return false, err + } + + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(iOpts). + SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + queryLimits, err := limits.NewQueryLimits((limitOpts)) + require.NoError(t, err) + testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits) + + testMD := newTestNSMetadata(t) + start := time.Now().Truncate(time.Hour) + blk, err := NewBlock(start, testMD, BlockOptions{}, + namespace.NewRuntimeOptionsManager("foo"), testOpts) + if err != nil { + return false, err + } + + b, ok := blk.(*block) + if !ok { + return false, errors.New("bad block type") + } + + b.mutableSegments.foregroundSegments = []*readableSeg{ + newReadableSeg(seg, testOpts), + } + + results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + Type: AggregateTagNamesAndValues, + }, testOpts) + + ctx := context.NewContext() + defer ctx.BlockingClose() + + // create initial span from a mock tracer and get ctx + ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), nil)) + + exhaustive, err := b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{}, + results, + emptyLogFields) + + if err != nil { + return false, err + } + + if !exhaustive { + return false, errors.New("not exhaustive") + } + + resultMap := make(segmentMap, results.Map().Len()) + for _, field := range results.Map().Iter() { + name := field.Key().String() + _, found := resultMap[name] + if found { + return false, errors.New("duplicate fields in results map") + } + + values := make(testValuesSet, field.value.Map().Len()) + for _, value := range field.value.Map().Iter() { + val := value.Key().String() + _, found := values[val] + if found { + return false, errors.New("duplicate values in results map") + } + + values[val] = struct{}{} + } + + resultMap[name] = values + } + + require.Equal(t, resultMap, testSegment.segmentMap) + found := false + for _, c := range scope.Snapshot().Counters() { + if c.Name() == "query-limit.total-docs-matched" { + if c.Value() != testSegment.exCount { + return false, fmt.Errorf("docs count %d does not match expected %d", + c.Value(), testSegment.exCount) + } + + found = true + break + } + } + + if !found { + return false, errors.New("counter not found in metrics") + } + + return true, nil + }, + genTestSegment(), + )) + + if !properties.Run(reporter) { + t.Errorf("failed with initial seed: %d", seed) + } +} diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index f33c6b71e9..ee4e1e56bf 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -23,6 +23,7 @@ package index import ( stdlibctx "context" "fmt" + "sort" "testing" "time" @@ -30,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/test" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" @@ -40,14 +42,18 @@ import ( "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" xresource "github.com/m3db/m3/src/x/resource" + "github.com/m3db/m3/src/x/tallytest" xtime "github.com/m3db/m3/src/x/time" + "github.com/uber-go/tally" "github.com/golang/mock/gomock" opentracing "github.com/opentracing/opentracing-go" opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -2241,3 +2247,199 @@ func testDoc3() doc.Metadata { }, } } + +func optionsWithAggResultsPool(capacity int) Options { + pool := NewAggregateResultsEntryArrayPool( + AggregateResultsEntryArrayPoolOpts{ + Capacity: capacity, + }, + ) + + pool.Init() + return testOpts.SetAggregateResultsEntryArrayPool(pool) +} + +func buildSegment(t *testing.T, term string, fields []string, opts mem.Options) *readableSeg { + seg, err := mem.NewSegment(opts) + require.NoError(t, err) + + docFields := make([]doc.Field, 0, len(fields)) + sort.Strings(fields) + for _, field := range fields { + docFields = append(docFields, doc.Field{Name: []byte(term), Value: []byte(field)}) + } + + _, err = seg.Insert(doc.Metadata{Fields: docFields}) + require.NoError(t, err) + + require.NoError(t, seg.Seal()) + return newReadableSeg(seg, testOpts) +} + +func TestBlockAggregateBatching(t *testing.T) { + memOpts := mem.NewOptions() + + var ( + batchSizeMap = make(map[string][]string) + batchSizeSegments = make([]*readableSeg, 0, defaultQueryDocsBatchSize) + ) + + for i := 0; i < defaultQueryDocsBatchSize; i++ { + fields := make([]string, 0, defaultQueryDocsBatchSize) + for j := 0; j < defaultQueryDocsBatchSize; j++ { + fields = append(fields, fmt.Sprintf("bar_%d", j)) + } + + if i == 0 { + batchSizeMap["foo"] = fields + } + + batchSizeSegments = append(batchSizeSegments, buildSegment(t, "foo", fields, memOpts)) + } + + tests := []struct { + name string + batchSize int + segments []*readableSeg + expectedDocsMatched int64 + expected map[string][]string + }{ + { + name: "single term multiple fields duplicated across readers", + batchSize: 3, + segments: []*readableSeg{ + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + }, + expectedDocsMatched: 1, + expected: map[string][]string{ + "foo": {"bar", "baz"}, + }, + }, + { + name: "multiple term multiple fields", + batchSize: 3, + segments: []*readableSeg{ + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bag", "bat"}, memOpts), + buildSegment(t, "qux", []string{"bar", "baz"}, memOpts), + }, + expectedDocsMatched: 2, + expected: map[string][]string{ + "foo": {"bag", "bar", "bat", "baz"}, + "qux": {"bar", "baz"}, + }, + }, + { + name: "term present in first and third reader", + // NB: expecting three batches due to the way batches are split (on the + // first different term ID in a batch), will look like this: + // [{foo [bar baz]} {dog [bar baz]} {qux [bar]} + // [{qux [baz]} {qaz [bar baz]} {foo [bar]}] + // [{foo [baz]}] + batchSize: 3, + segments: []*readableSeg{ + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "dog", []string{"bar", "baz"}, memOpts), + buildSegment(t, "qux", []string{"bar", "baz"}, memOpts), + buildSegment(t, "qaz", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + }, + expectedDocsMatched: 7, + expected: map[string][]string{ + "foo": {"bar", "baz"}, + "dog": {"bar", "baz"}, + "qux": {"bar", "baz"}, + "qaz": {"bar", "baz"}, + }, + }, + { + name: "batch size case", + batchSize: defaultQueryDocsBatchSize, + segments: batchSizeSegments, + expectedDocsMatched: 1, + expected: batchSizeMap, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(iOpts). + SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + queryLimits, err := limits.NewQueryLimits((limitOpts)) + require.NoError(t, err) + testOpts = optionsWithAggResultsPool(tt.batchSize). + SetInstrumentOptions(iOpts). + SetQueryLimits(queryLimits) + + testMD := newTestNSMetadata(t) + start := time.Now().Truncate(time.Hour) + blk, err := NewBlock(start, testMD, BlockOptions{}, + namespace.NewRuntimeOptionsManager("foo"), testOpts) + require.NoError(t, err) + + b, ok := blk.(*block) + require.True(t, ok) + + // NB: wrap existing aggregate results fn to more easily inspect batch size. + addAggregateResultsFn := b.addAggregateResultsFn + b.addAggregateResultsFn = func( + cancellable *xresource.CancellableLifetime, + results AggregateResults, + batch []AggregateResultsEntry, + source []byte, + ) ([]AggregateResultsEntry, int, int, error) { + // NB: since both terms and values count towards the batch size, initialize + // this with batch size to account for terms. + count := len(batch) + for _, entry := range batch { + count += len(entry.Terms) + } + + // FIXME: this currently fails, but will be fixed after + // https://github.com/m3db/m3/pull/3133 is reverted. + // require.True(t, count <= tt.batchSize, + // fmt.Sprintf("batch %v exceeds batchSize %d", batch, tt.batchSize)) + + return addAggregateResultsFn(cancellable, results, batch, source) + } + + b.mutableSegments.foregroundSegments = tt.segments + results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + Type: AggregateTagNamesAndValues, + }, testOpts) + + ctx := context.NewContext() + defer ctx.BlockingClose() + + exhaustive, err := b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{}, + results, + emptyLogFields) + require.NoError(t, err) + require.True(t, exhaustive) + + snap := scope.Snapshot() + tallytest.AssertCounterValue(t, tt.expectedDocsMatched, snap, "query-limit.total-docs-matched", nil) + resultsMap := make(map[string][]string, results.Map().Len()) + for _, res := range results.Map().Iter() { + vals := make([]string, 0, res.Value().valuesMap.Len()) + for _, val := range res.Value().valuesMap.Iter() { + vals = append(vals, val.Key().String()) + } + + sort.Strings(vals) + resultsMap[res.Key().String()] = vals + } + + assert.Equal(t, tt.expected, resultsMap) + }) + } +} From 892cdd6b8e4a8a26abadc196c58b7565b6353fbe Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 29 Jan 2021 02:06:51 -0500 Subject: [PATCH 2/7] Linter --- src/dbnode/storage/index/block_prop_test.go | 84 ++++++++++----------- src/dbnode/storage/index/block_test.go | 2 +- 2 files changed, 41 insertions(+), 45 deletions(-) diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 31c2edd3bc..891fc0538c 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -1,3 +1,5 @@ +// +build big +// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -45,13 +47,13 @@ import ( "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xresource "github.com/m3db/m3/src/x/resource" - "github.com/opentracing/opentracing-go" - "github.com/uber-go/tally" "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" + "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) var ( @@ -233,13 +235,13 @@ type propTestSegment struct { segmentMap segmentMap } -type testValuesSet map[string]struct{} -type segmentMap map[string]testValuesSet +type testValuesSet map[string]struct{} //nolint:gofumpt +type segmentMap map[string]testValuesSet //nolint:gofumpt func genTestSegment() gopter.Gen { return gen.SliceOf(genField()).Map(func(input []testFields) propTestSegment { segMap := make(segmentMap, len(input)) - for _, field := range input { + for _, field := range input { //nolint:gocritic for _, value := range field.values { exVals, found := segMap[field.name] if !found { @@ -266,7 +268,7 @@ func genTestSegment() gopter.Gen { }) docFields := []doc.Field{} - for _, field := range fields { + for _, field := range fields { //nolint:gocritic for _, val := range field.values { docFields = append(docFields, doc.Field{ Name: []byte(field.name), @@ -283,7 +285,33 @@ func genTestSegment() gopter.Gen { }) } -func TestGenSegments(t *testing.T) { +func verifyResults( + t *testing.T, + results AggregateResults, + exMap segmentMap, +) { + resultMap := make(segmentMap, results.Map().Len()) + for _, field := range results.Map().Iter() { //nolint:gocritic + name := field.Key().String() + _, found := resultMap[name] + require.False(t, found, "duplicate values in results map") + + values := make(testValuesSet, field.value.Map().Len()) + for _, value := range field.value.Map().Iter() { + val := value.Key().String() + _, found := values[val] + require.False(t, found, "duplicate values in results map") + + values[val] = struct{}{} + } + + resultMap[name] = values + } + + require.Equal(t, resultMap, exMap) +} + +func TestAggregateDocLimits(t *testing.T) { var ( parameters = gopter.DefaultTestParameters() seed = time.Now().UnixNano() @@ -293,7 +321,7 @@ func TestGenSegments(t *testing.T) { parameters.MinSuccessfulTests = 1000 parameters.MinSize = 5 parameters.MaxSize = 10 - parameters.Rng = rand.New(rand.NewSource(seed)) + parameters.Rng = rand.New(rand.NewSource(seed)) //nolint:gosec properties := gopter.NewProperties(parameters) properties.Property("segments dedupe and have correct docs counts", prop.ForAll( @@ -361,50 +389,18 @@ func TestGenSegments(t *testing.T) { return false, err } - if !exhaustive { - return false, errors.New("not exhaustive") - } - - resultMap := make(segmentMap, results.Map().Len()) - for _, field := range results.Map().Iter() { - name := field.Key().String() - _, found := resultMap[name] - if found { - return false, errors.New("duplicate fields in results map") - } - - values := make(testValuesSet, field.value.Map().Len()) - for _, value := range field.value.Map().Iter() { - val := value.Key().String() - _, found := values[val] - if found { - return false, errors.New("duplicate values in results map") - } - - values[val] = struct{}{} - } - - resultMap[name] = values - } - - require.Equal(t, resultMap, testSegment.segmentMap) + require.True(t, exhaustive, errors.New("not exhaustive")) + verifyResults(t, results, testSegment.segmentMap) found := false for _, c := range scope.Snapshot().Counters() { if c.Name() == "query-limit.total-docs-matched" { - if c.Value() != testSegment.exCount { - return false, fmt.Errorf("docs count %d does not match expected %d", - c.Value(), testSegment.exCount) - } - + require.Equal(t, testSegment.exCount, c.Value(), "docs count mismatch") found = true break } } - if !found { - return false, errors.New("counter not found in metrics") - } - + require.True(t, found, "counter not found in metrics") return true, nil }, genTestSegment(), diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index ee4e1e56bf..cedac63b67 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -47,7 +47,6 @@ import ( xresource "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/tallytest" xtime "github.com/m3db/m3/src/x/time" - "github.com/uber-go/tally" "github.com/golang/mock/gomock" opentracing "github.com/opentracing/opentracing-go" @@ -55,6 +54,7 @@ import ( "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" "go.uber.org/zap" ) From 77100c4071b1713cc2b3a2271ad15804c919156f Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 29 Jan 2021 01:51:19 -0500 Subject: [PATCH 3/7] [dbnode] Revert #3133 with fix --- .../prometheus/test.sh | 43 +- src/dbnode/storage/index.go | 36 +- src/dbnode/storage/index/aggregate_results.go | 357 +++++----- .../storage/index/aggregate_results_test.go | 654 +++++++----------- src/dbnode/storage/index/block.go | 86 ++- src/dbnode/storage/index/block_test.go | 49 +- src/dbnode/storage/index/index_mock.go | 240 +++++-- src/dbnode/storage/index/types.go | 40 +- .../storage/index/wide_query_results.go | 4 +- src/dbnode/storage/index_block_test.go | 2 +- 10 files changed, 829 insertions(+), 682 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 1d3482c232..ca957bf264 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -233,7 +233,7 @@ function test_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 3" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/query?query=database_write_tagged_success) = "400" ]]' - # Test the default docs limit applied when directly querying + # Test the docs limit applied when directly querying # coordinator (docs limit set by header) echo "Test query docs limit with coordinator limit header" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ @@ -391,6 +391,46 @@ function test_series { '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' } +function test_label_query_limits_applied { + # Test that require exhaustive does nothing if limits are not hit + echo "Test label limits with require-exhaustive headers true (below limit therefore no error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' + + # the header takes precedence over the configured default series limit + echo "Test label series limit with coordinator limit header (default requires exhaustive so error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + + echo "Test label series limit with require-exhaustive headers false" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' + + echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" + # Test that require exhaustive error is returned + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + # Test that require exhaustive error is 4xx + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' + + echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + + echo "Test label docs limit with require-exhaustive headers false" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Docs: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' + + echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)" + # Test that require exhaustive error is returned + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + # Test that require exhaustive error is 4xx + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' +} + echo "Running readiness test" test_readiness @@ -409,6 +449,7 @@ test_prometheus_query_native_timeout test_query_restrict_tags test_prometheus_remote_write_map_tags test_series +test_label_query_limits_applied echo "Running function correctness tests" test_correctness diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 64d285f701..38dfa00428 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1404,9 +1404,10 @@ func (i *nsIndex) AggregateQuery( query index.Query, opts index.AggregationOptions, ) (index.AggregateQueryResult, error) { + id := i.nsMetadata.ID() logFields := []opentracinglog.Field{ opentracinglog.String("query", query.String()), - opentracinglog.String("namespace", i.nsMetadata.ID().String()), + opentracinglog.String("namespace", id.String()), opentracinglog.Int("seriesLimit", opts.SeriesLimit), opentracinglog.Int("docsLimit", opts.DocsLimit), xopentracing.Time("queryStart", opts.StartInclusive), @@ -1417,12 +1418,15 @@ func (i *nsIndex) AggregateQuery( sp.LogFields(logFields...) defer sp.Finish() + metrics := index.NewAggregateUsageMetrics(id, i.opts.InstrumentOptions()) // Get results and set the filters, namespace ID and size limit. results := i.aggregateResultsPool.Get() aopts := index.AggregateResultsOptions{ - SizeLimit: opts.SeriesLimit, - FieldFilter: opts.FieldFilter, - Type: opts.Type, + SizeLimit: opts.SeriesLimit, + DocsLimit: opts.DocsLimit, + FieldFilter: opts.FieldFilter, + Type: opts.Type, + AggregateUsageMetrics: metrics, } ctx.RegisterFinalizer(results) // use appropriate fn to query underlying blocks. @@ -1441,7 +1445,7 @@ func (i *nsIndex) AggregateQuery( } } aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe() - results.Reset(i.nsMetadata.ID(), aopts) + results.Reset(id, aopts) exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields) if err != nil { return index.AggregateQueryResult{}, err @@ -1687,7 +1691,16 @@ func (i *nsIndex) execBlockQueryFn( sp.LogFields(logFields...) defer sp.Finish() - blockExhaustive, err := block.Query(ctx, cancellable, query, opts, results, logFields) + docResults, ok := results.(index.DocumentResults) + if !ok { // should never happen + state.Lock() + err := fmt.Errorf("unknown results type [%T] received during query", results) + state.multiErr = state.multiErr.Add(err) + state.Unlock() + return + } + + blockExhaustive, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1725,7 +1738,16 @@ func (i *nsIndex) execBlockWideQueryFn( sp.LogFields(logFields...) defer sp.Finish() - _, err := block.Query(ctx, cancellable, query, opts, results, logFields) + docResults, ok := results.(index.DocumentResults) + if !ok { // should never happen + state.Lock() + err := fmt.Errorf("unknown results type [%T] received during wide query", results) + state.multiErr = state.multiErr.Add(err) + state.Unlock() + return + } + + _, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index ed0b03e093..bcee7b81c3 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -21,21 +21,17 @@ package index import ( - "fmt" + "math" "sync" - "github.com/m3db/m3/src/m3ninx/doc" + "github.com/uber-go/tally" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" ) -const missingDocumentFields = "invalid document fields: empty %s" - -// NB: emptyValues is an AggregateValues with no values, used for tracking -// terms only rather than terms and values. -var emptyValues = AggregateValues{hasValues: false} - type aggregatedResults struct { sync.RWMutex @@ -43,15 +39,86 @@ type aggregatedResults struct { aggregateOpts AggregateResultsOptions resultsMap *AggregateResultsMap + size int totalDocsCount int idPool ident.Pool bytesPool pool.CheckedBytesPool - pool AggregateResultsPool - valuesPool AggregateValuesPool - + pool AggregateResultsPool + valuesPool AggregateValuesPool encodedDocReader docs.EncodedDocumentReader + + iOpts instrument.Options +} + +var _ AggregateUsageMetrics = (*usageMetrics)(nil) + +type usageMetrics struct { + total tally.Counter + + totalTerms tally.Counter + dedupedTerms tally.Counter + + totalFields tally.Counter + dedupedFields tally.Counter +} + +func (m *usageMetrics) IncTotal(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.total != nil { + m.total.Inc(val) + } +} + +func (m *usageMetrics) IncTotalTerms(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.totalTerms != nil { + m.totalTerms.Inc(val) + } +} + +func (m *usageMetrics) IncDedupedTerms(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.dedupedTerms != nil { + m.dedupedTerms.Inc(val) + } +} + +func (m *usageMetrics) IncTotalFields(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.totalFields != nil { + m.totalFields.Inc(val) + } +} + +func (m *usageMetrics) IncDedupedFields(val int64) { + // NB: if metrics not set, to valid values, no-op. + if m.dedupedFields != nil { + m.dedupedFields.Inc(val) + } +} + +// NewAggregateUsageMetrics builds a new aggregated usage metrics. +func NewAggregateUsageMetrics(ns ident.ID, iOpts instrument.Options) AggregateUsageMetrics { + if ns == nil { + return &usageMetrics{} + } + + scope := iOpts.MetricsScope() + buildCounter := func(val string) tally.Counter { + return scope. + Tagged(map[string]string{"type": val, "namespace": ns.String()}). + Counter("aggregated-results") + } + + return &usageMetrics{ + total: buildCounter("total"), + totalTerms: buildCounter("total-terms"), + dedupedTerms: buildCounter("deduped-terms"), + totalFields: buildCounter("total-fields"), + dedupedFields: buildCounter("deduped-fields"), + } } // NewAggregateResults returns a new AggregateResults object. @@ -60,9 +127,14 @@ func NewAggregateResults( aggregateOpts AggregateResultsOptions, opts Options, ) AggregateResults { + if aggregateOpts.AggregateUsageMetrics == nil { + aggregateOpts.AggregateUsageMetrics = &usageMetrics{} + } + return &aggregatedResults{ nsID: namespaceID, aggregateOpts: aggregateOpts, + iOpts: opts.InstrumentOptions(), resultsMap: newAggregateResultsMap(opts.IdentifierPool()), idPool: opts.IdentifierPool(), bytesPool: opts.CheckedBytesPool(), @@ -79,8 +151,11 @@ func (r *aggregatedResults) Reset( ) { r.Lock() - r.aggregateOpts = aggregateOpts + if aggregateOpts.AggregateUsageMetrics == nil { + aggregateOpts.AggregateUsageMetrics = NewAggregateUsageMetrics(nsID, r.iOpts) + } + r.aggregateOpts = aggregateOpts // finalize existing held nsID if r.nsID != nil { r.nsID.Finalize() @@ -97,205 +172,129 @@ func (r *aggregatedResults) Reset( valueMap := entry.Value() valueMap.finalize() } - // reset all keys in the map next r.resultsMap.Reset() r.totalDocsCount = 0 + r.size = 0 // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. r.Unlock() } -func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) { - r.Lock() - err := r.addDocumentsBatchWithLock(batch) - size := r.resultsMap.Len() - docsCount := r.totalDocsCount + len(batch) - r.totalDocsCount = docsCount - r.Unlock() - return size, docsCount, err -} - func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { return r.aggregateOpts } func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { r.Lock() - valueInsertions := 0 - for _, entry := range batch { //nolint:gocritic - f := entry.Field - aggValues, ok := r.resultsMap.Get(f) - if !ok { - aggValues = r.valuesPool.Get() - // we can avoid the copy because we assume ownership of the passed ident.ID, - // but still need to finalize it. - r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) - } else { - // because we already have a entry for this field, we release the ident back to - // the underlying pool. - f.Finalize() - } - valuesMap := aggValues.Map() - for _, t := range entry.Terms { - if !valuesMap.Contains(t) { - // we can avoid the copy because we assume ownership of the passed ident.ID, - // but still need to finalize it. - valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) - valueInsertions++ - } else { - // because we already have a entry for this term, we release the ident back to - // the underlying pool. - t.Finalize() - } - } - } - size := r.resultsMap.Len() - docsCount := r.totalDocsCount + valueInsertions - r.totalDocsCount = docsCount - r.Unlock() - return size, docsCount -} - -func (r *aggregatedResults) addDocumentsBatchWithLock( - batch []doc.Document, -) error { - for _, doc := range batch { //nolint:gocritic - switch r.aggregateOpts.Type { - case AggregateTagNamesAndValues: - if err := r.addDocumentWithLock(doc); err != nil { - return err - } + defer r.Unlock() - case AggregateTagNames: - // NB: if aggregating by name only, can ignore any additional documents - // after the result map size exceeds the optional size limit, since all - // incoming terms are either duplicates or new values which will exceed - // the limit. - size := r.resultsMap.Len() - if r.aggregateOpts.SizeLimit > 0 && size >= r.aggregateOpts.SizeLimit { - return nil - } - - if err := r.addDocumentTermsWithLock(doc); err != nil { - return err - } - default: - return fmt.Errorf("unsupported aggregation type: %v", r.aggregateOpts.Type) - } + // NB: init total count with batch length, since each aggregated entry + // will have one field. + totalCount := len(batch) + for idx := 0; idx < len(batch); idx++ { + totalCount += len(batch[idx].Terms) } - return nil -} - -func (r *aggregatedResults) addDocumentTermsWithLock( - container doc.Document, -) error { - document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) - if err != nil { - return fmt.Errorf("unable to decode encoded document; %w", err) - } - for _, field := range document.Fields { //nolint:gocritic - if err := r.addTermWithLock(field.Name); err != nil { - return fmt.Errorf("unable to add document terms [%+v]: %w", document, err) - } + r.aggregateOpts.AggregateUsageMetrics.IncTotal(int64(totalCount)) + remainingDocs := math.MaxInt64 + if r.aggregateOpts.DocsLimit != 0 { + remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount } - return nil -} - -func (r *aggregatedResults) addTermWithLock( - term []byte, -) error { - if len(term) == 0 { - return fmt.Errorf(missingDocumentFields, "term") - } - - // if a term filter is provided, ensure this field matches the filter, - // otherwise ignore it. - filter := r.aggregateOpts.FieldFilter - if filter != nil && !filter.Allow(term) { - return nil - } + // NB: already hit doc limit. + if remainingDocs <= 0 { + for idx := 0; idx < len(batch); idx++ { + batch[idx].Field.Finalize() + r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1) + for _, term := range batch[idx].Terms { + r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1) + term.Finalize() + } + } - // NB: can cast the []byte -> ident.ID to avoid an alloc - // before we're sure we need it. - termID := ident.BytesID(term) - if r.resultsMap.Contains(termID) { - // NB: this term is already added; continue. - return nil + return r.size, r.totalDocsCount } - // Set results map to an empty AggregateValues since we only care about - // existence of the term in the map, rather than its set of values. - r.resultsMap.Set(termID, emptyValues) - return nil -} - -func (r *aggregatedResults) addDocumentWithLock( - container doc.Document, -) error { - document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) - if err != nil { - return fmt.Errorf("unable to decode encoded document; %w", err) - } - for _, field := range document.Fields { //nolint:gocritic - if err := r.addFieldWithLock(field.Name, field.Value); err != nil { - return fmt.Errorf("unable to add document [%+v]: %w", document, err) + // NB: cannot insert more than max docs, so that acts as the upper bound here. + remainingInserts := remainingDocs + if r.aggregateOpts.SizeLimit != 0 { + if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < remainingInserts { + remainingInserts = remaining } } - return nil -} + docs := 0 + numInserts := 0 + for _, entry := range batch { + r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1) -func (r *aggregatedResults) addFieldWithLock( - term []byte, - value []byte, -) error { - if len(term) == 0 { - return fmt.Errorf(missingDocumentFields, "term") - } + if docs >= remainingDocs || numInserts >= remainingInserts { + entry.Field.Finalize() + for _, term := range entry.Terms { + r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1) + term.Finalize() + } - // if a term filter is provided, ensure this field matches the filter, - // otherwise ignore it. - filter := r.aggregateOpts.FieldFilter - if filter != nil && !filter.Allow(term) { - return nil - } + r.size += numInserts + r.totalDocsCount += docs + return r.size, r.totalDocsCount + } - // NB: can cast the []byte -> ident.ID to avoid an alloc - // before we're sure we need it. - termID := ident.BytesID(term) - valueID := ident.BytesID(value) + docs++ + f := entry.Field + aggValues, ok := r.resultsMap.Get(f) + if !ok { + if remainingInserts > numInserts { + r.aggregateOpts.AggregateUsageMetrics.IncDedupedFields(1) - valueMap, found := r.resultsMap.Get(termID) - if found { - return valueMap.addValue(valueID) - } + numInserts++ + aggValues = r.valuesPool.Get() + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + } else { + // this value exceeds the limit, so should be released to the underling + // pool without adding to the map. + f.Finalize() + } + } else { + // because we already have a entry for this field, we release the ident back to + // the underlying pool. + f.Finalize() + } - // NB: if over limit, do not add any new values to the map. - if r.aggregateOpts.SizeLimit > 0 && - r.resultsMap.Len() >= r.aggregateOpts.SizeLimit { - // Early return if limit enforced and we hit our limit. - return nil - } + valuesMap := aggValues.Map() + for _, t := range entry.Terms { + r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1) + if remainingDocs > docs { + docs++ + if !valuesMap.Contains(t) { + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + if remainingInserts > numInserts { + r.aggregateOpts.AggregateUsageMetrics.IncDedupedTerms(1) + valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + numInserts++ + continue + } + } + } - aggValues := r.valuesPool.Get() - if err := aggValues.addValue(valueID); err != nil { - // Return these values to the pool. - r.valuesPool.Put(aggValues) - return err + t.Finalize() + } } - r.resultsMap.Set(termID, aggValues) - return nil + r.size += numInserts + r.totalDocsCount += docs + return r.size, r.totalDocsCount } func (r *aggregatedResults) Namespace() ident.ID { @@ -314,9 +313,9 @@ func (r *aggregatedResults) Map() *AggregateResultsMap { func (r *aggregatedResults) Size() int { r.RLock() - l := r.resultsMap.Len() + size := r.size r.RUnlock() - return l + return size } func (r *aggregatedResults) TotalDocsCount() int { diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index 7a514b484b..085350a023 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -21,436 +21,264 @@ package index import ( - "bytes" + "sort" "testing" - "github.com/m3db/m3/src/m3ninx/doc" - "github.com/m3db/m3/src/x/ident" - xtest "github.com/m3db/m3/src/x/test" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" ) -func genDoc(strs ...string) doc.Document { - if len(strs)%2 != 0 { - panic("invalid test setup; need even str length") - } +func entries(entries ...AggregateResultsEntry) []AggregateResultsEntry { return entries } - fields := make([]doc.Field, len(strs)/2) - for i := range fields { - fields[i] = doc.Field{ - Name: []byte(strs[i*2]), - Value: []byte(strs[i*2+1]), - } +func genResultsEntry(field string, terms ...string) AggregateResultsEntry { + entryTerms := make([]ident.ID, 0, len(terms)) + for _, term := range terms { + entryTerms = append(entryTerms, ident.StringID(term)) } - return doc.NewDocumentFromMetadata(doc.Metadata{Fields: fields}) -} - -func TestAggResultsInsertInvalid(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - assert.True(t, res.EnforceLimits()) - - dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) - size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsInsertEmptyTermValue(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - dValidEmptyTerm := genDoc("foo", "") - size, docsCount, err := res.AddDocuments([]doc.Document{dValidEmptyTerm}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) -} - -func TestAggResultsInsertBatchOfTwo(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("d1", "") - d2 := genDoc("d2", "") - size, docsCount, err := res.AddDocuments([]doc.Document{d1, d2}) - require.NoError(t, err) - require.Equal(t, 2, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 2, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsTermOnlyInsert(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) - size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) - - valid := genDoc("foo", "") - size, docsCount, err = res.AddDocuments([]doc.Document{valid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 3, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 3, res.TotalDocsCount()) -} - -func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { - dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - size, docsCount, err = res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsInsertIdempotency(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - testAggResultsInsertIdempotency(t, res) -} - -func TestAggResultsTermOnlyInsertIdempotency(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - testAggResultsInsertIdempotency(t, res) -} - -func TestInvalidAggregateType(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: 100, - }, testOpts) - dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) -} - -func TestAggResultsSameName(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - rMap := res.Map() - aggVals, ok := rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.Equal(t, 1, aggVals.Size()) - assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) - - d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Document{d2}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - aggVals, ok = rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.Equal(t, 2, aggVals.Size()) - assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) - assert.True(t, aggVals.Map().Contains(ident.StringID("biz"))) -} - -func assertNoValuesInNameOnlyAggregate(t *testing.T, v AggregateValues) { - assert.False(t, v.hasValues) - assert.Nil(t, v.valuesMap) - assert.Nil(t, v.pool) - - assert.Equal(t, 0, v.Size()) - assert.Nil(t, v.Map()) - assert.False(t, v.HasValues()) -} - -func TestAggResultsTermOnlySameName(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - rMap := res.Map() - aggVals, ok := rMap.Get(ident.StringID("foo")) - require.True(t, ok) - assertNoValuesInNameOnlyAggregate(t, aggVals) - - d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Document{d2}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - aggVals, ok = rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.False(t, aggVals.hasValues) - assertNoValuesInNameOnlyAggregate(t, aggVals) + return AggregateResultsEntry{ + Field: ident.StringID(field), + Terms: entryTerms, + } } -func addMultipleDocuments(t *testing.T, res AggregateResults) (int, int) { - _, _, err := res.AddDocuments([]doc.Document{ - genDoc("foo", "bar"), - genDoc("fizz", "bar"), - genDoc("buzz", "bar"), - }) - require.NoError(t, err) - - _, _, err = res.AddDocuments([]doc.Document{ - genDoc("foo", "biz"), - genDoc("fizz", "bar"), - }) - require.NoError(t, err) - - size, docsCount, err := res.AddDocuments([]doc.Document{ - genDoc("foo", "baz", "buzz", "bag", "qux", "qaz"), - }) - - require.NoError(t, err) - return size, docsCount -} +func toMap(res AggregateResults) map[string][]string { + entries := res.Map().Iter() + resultMap := make(map[string][]string, len(entries)) + for _, entry := range entries { //nolint:gocritic + terms := entry.value.Map().Iter() + resultTerms := make([]string, 0, len(terms)) + for _, term := range terms { + resultTerms = append(resultTerms, term.Key().String()) + } -func toFilter(strs ...string) AggregateFieldFilter { - b := make([][]byte, len(strs)) - for i, s := range strs { - b[i] = []byte(s) + sort.Strings(resultTerms) + resultMap[entry.Key().String()] = resultTerms } - return AggregateFieldFilter(b) + return resultMap } -var mergeTests = []struct { - name string - opts AggregateResultsOptions - expected map[string][]string -}{ - { - name: "no limit no filter", - opts: AggregateResultsOptions{}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, - "qux": {"qaz"}, +func TestWithLimits(t *testing.T) { + tests := []struct { + name string + entries []AggregateResultsEntry + sizeLimit int + docLimit int + exSeries int + exDocs int + expected map[string][]string + exMetrics map[string]int64 + }{ + { + name: "single term", + entries: entries(genResultsEntry("foo")), + exSeries: 1, + exDocs: 1, + expected: map[string][]string{"foo": {}}, + + exMetrics: map[string]int64{ + "total": 1, "total-fields": 1, "deduped-fields": 1, + "total-terms": 0, "deduped-terms": 0, + }, }, - }, - { - name: "with limit no filter", - opts: AggregateResultsOptions{SizeLimit: 2}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, + { + name: "same term", + entries: entries(genResultsEntry("foo"), genResultsEntry("foo")), + exSeries: 1, + exDocs: 2, + expected: map[string][]string{"foo": {}}, + exMetrics: map[string]int64{ + "total": 2, "total-fields": 2, "deduped-fields": 1, + "total-terms": 0, "deduped-terms": 0, + }, }, - }, - { - name: "no limit empty filter", - opts: AggregateResultsOptions{FieldFilter: toFilter()}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, - "qux": {"qaz"}, + { + name: "multiple terms", + entries: entries(genResultsEntry("foo"), genResultsEntry("bar")), + exSeries: 2, + exDocs: 2, + expected: map[string][]string{"foo": {}, "bar": {}}, + exMetrics: map[string]int64{ + "total": 2, "total-fields": 2, "deduped-fields": 2, + "total-terms": 0, "deduped-terms": 0, + }, }, - }, - { - name: "no limit matchless filter", - opts: AggregateResultsOptions{FieldFilter: toFilter("zig")}, - expected: map[string][]string{}, - }, - { - name: "empty limit with filter", - opts: AggregateResultsOptions{FieldFilter: toFilter("buzz")}, - expected: map[string][]string{ - "buzz": {"bar", "bag"}, + { + name: "single entry", + entries: entries(genResultsEntry("foo", "bar")), + exSeries: 2, + exDocs: 2, + expected: map[string][]string{"foo": {"bar"}}, + exMetrics: map[string]int64{ + "total": 2, "total-fields": 1, "deduped-fields": 1, + "total-terms": 1, "deduped-terms": 1, + }, }, - }, - { - name: "with limit with filter", - opts: AggregateResultsOptions{ - SizeLimit: 2, FieldFilter: toFilter("buzz", "qux", "fizz"), + { + name: "single entry multiple fields", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "baz", "qux")), + exSeries: 4, + exDocs: 6, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 6, "total-fields": 1, "deduped-fields": 1, + "total-terms": 5, "deduped-terms": 3, + }, }, - expected: map[string][]string{ - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, + { + name: "multiple entry multiple fields", + entries: entries( + genResultsEntry("foo", "bar", "baz"), + genResultsEntry("foo", "baz", "baz", "qux")), + exSeries: 4, + exDocs: 7, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 7, "total-fields": 2, "deduped-fields": 1, + "total-terms": 5, "deduped-terms": 3, + }, + }, + { + name: "multiple entries", + entries: entries(genResultsEntry("foo", "baz"), genResultsEntry("bar", "baz", "qux")), + exSeries: 5, + exDocs: 5, + expected: map[string][]string{"foo": {"baz"}, "bar": {"baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 2, "deduped-fields": 2, + "total-terms": 3, "deduped-terms": 3, + }, }, - }, -} -func TestAggResultsMerge(t *testing.T) { - for _, tt := range mergeTests { - t.Run(tt.name, func(t *testing.T) { - res := NewAggregateResults(nil, tt.opts, testOpts) - size, docsCount := addMultipleDocuments(t, res) - - require.Equal(t, len(tt.expected), size) - require.Equal(t, 6, docsCount) - ac := res.Map() - require.Equal(t, len(tt.expected), ac.Len()) - for k, v := range tt.expected { - aggVals, ok := ac.Get(ident.StringID(k)) - require.True(t, ok) - require.Equal(t, len(v), aggVals.Size()) - for _, actual := range v { - require.True(t, aggVals.Map().Contains(ident.StringID(actual))) - } - } - }) - } -} + { + name: "single entry query at size limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), + sizeLimit: 4, + exSeries: 4, + exDocs: 5, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 1, "deduped-fields": 1, + "total-terms": 4, "deduped-terms": 3, + }, + }, + { + name: "single entry query at doc limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), + docLimit: 5, + exSeries: 4, + exDocs: 5, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 1, "deduped-fields": 1, + "total-terms": 4, "deduped-terms": 3, + }, + }, -func TestAggResultsMergeNameOnly(t *testing.T) { - for _, tt := range mergeTests { - t.Run(tt.name+" name only", func(t *testing.T) { - tt.opts.Type = AggregateTagNames - res := NewAggregateResults(nil, tt.opts, testOpts) - size, docsCount := addMultipleDocuments(t, res) - - require.Equal(t, len(tt.expected), size) - require.Equal(t, 6, docsCount) - - ac := res.Map() - require.Equal(t, len(tt.expected), ac.Len()) - for k := range tt.expected { - aggVals, ok := ac.Get(ident.StringID(k)) - require.True(t, ok) - assertNoValuesInNameOnlyAggregate(t, aggVals) - } - }) + { + name: "single entry query below size limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "qux")), + sizeLimit: 3, + exSeries: 3, + exDocs: 4, + expected: map[string][]string{"foo": {"bar", "baz"}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 1, "deduped-fields": 1, + "total-terms": 3, "deduped-terms": 2, + }, + }, + { + name: "single entry query below doc limit", + entries: entries(genResultsEntry("foo", "bar", "bar", "bar", "baz")), + docLimit: 3, + exSeries: 2, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}}, + exMetrics: map[string]int64{ + "total": 5, "total-fields": 1, "deduped-fields": 1, + "total-terms": 4, "deduped-terms": 1, + }, + }, + { + name: "multiple entry query below series limit", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + sizeLimit: 3, + exSeries: 3, + exDocs: 4, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 2, "deduped-fields": 2, + "total-terms": 2, "deduped-terms": 1, + }, + }, + { + name: "multiple entry query below doc limit", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + docLimit: 3, + exSeries: 3, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 2, "deduped-fields": 2, + "total-terms": 2, "deduped-terms": 1, + }, + }, + { + name: "multiple entry query both limits", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + docLimit: 3, + sizeLimit: 10, + exSeries: 3, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + exMetrics: map[string]int64{ + "total": 4, "total-fields": 2, "deduped-fields": 2, + "total-terms": 2, "deduped-terms": 1, + }, + }, } -} -func TestAggResultsInsertCopies(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - dValid := genDoc("foo", "bar") - - d, ok := dValid.Metadata() - require.True(t, ok) - name := d.Fields[0].Name - value := d.Fields[0].Value - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - found := false - - // our genny generated maps don't provide access to MapEntry directly, - // so we iterate over the map to find the added entry. Could avoid this - // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` - for _, entry := range res.Map().Iter() { - // see if this key has the same value as the added document's ID. - n := entry.Key().Bytes() - if !bytes.Equal(name, n) { - continue - } - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) - v := entry.Value() - for _, f := range v.Map().Iter() { - v := f.Key().Bytes() - if !bytes.Equal(value, v) { - continue + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + res := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + SizeLimit: tt.sizeLimit, + DocsLimit: tt.docLimit, + AggregateUsageMetrics: NewAggregateUsageMetrics(ident.StringID("ns"), iOpts), + }, testOpts) + + size, docsCount := res.AddFields(tt.entries) + assert.Equal(t, tt.exSeries, size) + assert.Equal(t, tt.exDocs, docsCount) + assert.Equal(t, tt.exSeries, res.Size()) + assert.Equal(t, tt.exDocs, res.TotalDocsCount()) + + assert.Equal(t, tt.expected, toMap(res)) + + counters := scope.Snapshot().Counters() + actualCounters := make(map[string]int64, len(counters)) + for _, v := range counters { + actualCounters[v.Tags()["type"]] = v.Value() } - found = true - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(v, value)) - } - } - - require.True(t, found) -} - -func TestAggResultsNameOnlyInsertCopies(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - dValid := genDoc("foo", "bar") - d, ok := dValid.Metadata() - require.True(t, ok) - name := d.Fields[0].Name - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - found := false - // our genny generated maps don't provide access to MapEntry directly, - // so we iterate over the map to find the added entry. Could avoid this - // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` - for _, entry := range res.Map().Iter() { - // see if this key has the same value as the added document's ID. - n := entry.Key().Bytes() - if !bytes.Equal(name, n) { - continue - } - - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) - found = true - assertNoValuesInNameOnlyAggregate(t, entry.Value()) + assert.Equal(t, tt.exMetrics, actualCounters) + }) } - - require.True(t, found) } func TestAggResultsReset(t *testing.T) { res := NewAggregateResults(ident.StringID("qux"), AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) + size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) + require.Equal(t, 2, size) + require.Equal(t, 2, docsCount) aggVals, ok := res.Map().Get(ident.StringID("foo")) require.True(t, ok) @@ -496,11 +324,9 @@ func TestAggResultsResetNamespaceClones(t *testing.T) { func TestAggResultFinalize(t *testing.T) { // Create a Results and insert some data. res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) + size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) + require.Equal(t, 2, size) + require.Equal(t, 2, docsCount) // Ensure the data is present. rMap := res.Map() @@ -522,3 +348,29 @@ func TestAggResultFinalize(t *testing.T) { require.False(t, id.IsNoFinalize()) } } + +func TestResetUpdatesMetics(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + testOpts = testOpts.SetInstrumentOptions(iOpts) + res := NewAggregateResults(nil, AggregateResultsOptions{ + AggregateUsageMetrics: NewAggregateUsageMetrics(ident.StringID("ns1"), iOpts), + }, testOpts) + res.AddFields(entries(genResultsEntry("foo"))) + res.Reset(ident.StringID("ns2"), AggregateResultsOptions{}) + res.AddFields(entries(genResultsEntry("bar"))) + + counters := scope.Snapshot().Counters() + seenNamespaces := make(map[string]struct{}) + for _, v := range counters { + ns := v.Tags()["namespace"] + seenNamespaces[ns] = struct{}{} + } + + assert.Equal(t, map[string]struct{}{ + "ns1": {}, + "ns2": {}, + }, seenNamespaces) + + res.Finalize() +} diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 1b02e60b53..26c7d55d4d 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -149,8 +149,8 @@ type block struct { blockOpts BlockOptions nsMD namespace.Metadata namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager - queryLimits limits.QueryLimits docsLimit limits.LookbackLimit + aggregatedAddedCounter tally.Counter metrics blockMetrics logger *zap.Logger @@ -234,6 +234,7 @@ func NewBlock( iopts, ) + aggAdded := opts.InstrumentOptions().MetricsScope().Counter("aggregate-added-counter") // NB(bodu): The length of coldMutableSegments is always at least 1. coldSegs := []*mutableSegments{ newMutableSegments( @@ -259,8 +260,8 @@ func NewBlock( namespaceRuntimeOptsMgr: namespaceRuntimeOptsMgr, metrics: newBlockMetrics(scope), logger: iopts.Logger(), - queryLimits: opts.QueryLimits(), docsLimit: opts.QueryLimits().DocsLimit(), + aggregatedAddedCounter: aggAdded, } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator b.newExecutorWithRLockFn = b.executorWithRLock @@ -412,7 +413,7 @@ func (b *block) Query( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, logFields []opentracinglog.Field, ) (bool, error) { ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) @@ -432,7 +433,7 @@ func (b *block) queryWithSpan( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, sp opentracing.Span, logFields []opentracinglog.Field, ) (bool, error) { @@ -541,7 +542,7 @@ func (b *block) closeAsync(closer io.Closer) { func (b *block) addQueryResults( cancellable *xresource.CancellableLifetime, - results BaseResults, + results DocumentResults, batch []doc.Document, source []byte, ) ([]doc.Document, int, int, error) { @@ -559,7 +560,7 @@ func (b *block) addQueryResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the xresource. + // try to add the docs to the resource. size, docsCount, err := results.AddDocuments(batch) // immediately release the checkout on the lifetime of query. @@ -650,15 +651,17 @@ func (b *block) aggregateWithSpan( } var ( - source = opts.Source - size = results.Size() - docsCount = results.TotalDocsCount() - batch = b.opts.AggregateResultsEntryArrayPool().Get() - batchSize = cap(batch) - iterClosed = false // tracking whether we need to free the iterator at the end. + source = opts.Source + size = results.Size() + resultCount = results.TotalDocsCount() + batch = b.opts.AggregateResultsEntryArrayPool().Get() + maxBatch = cap(batch) + iterClosed = false // tracking whether we need to free the iterator at the end. + currBatchSize int + numAdded int ) - if batchSize == 0 { - batchSize = defaultAggregateResultsEntryBatchSize + if maxBatch == 0 { + maxBatch = defaultAggregateResultsEntryBatchSize } // cleanup at the end @@ -684,8 +687,16 @@ func (b *block) aggregateWithSpan( })) } + if opts.SeriesLimit < maxBatch { + maxBatch = opts.SeriesLimit + } + + if opts.DocsLimit < maxBatch { + maxBatch = opts.DocsLimit + } + for _, reader := range readers { - if opts.LimitsExceeded(size, docsCount) { + if opts.LimitsExceeded(size, resultCount) { break } @@ -694,22 +705,33 @@ func (b *block) aggregateWithSpan( return false, err } iterClosed = false // only once the iterator has been successfully Reset(). - for iter.Next() { - if opts.LimitsExceeded(size, docsCount) { + if opts.LimitsExceeded(size, resultCount) { break } field, term := iter.Current() - batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) - if len(batch) < batchSize { + batch, numAdded = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) + currBatchSize += numAdded + + // continue appending to the batch until we hit our max batch size + if currBatchSize < maxBatch { continue } - batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) + // update recently queried docs to monitor memory. + if results.EnforceLimits() { + if err := b.docsLimit.Inc(len(batch), source); err != nil { + return false, err + } + } + + batch, size, resultCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } + + currBatchSize = 0 } if err := iter.Err(); err != nil { @@ -723,21 +745,23 @@ func (b *block) aggregateWithSpan( } // Add last batch to results if remaining. - if len(batch) > 0 { - batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) + for len(batch) > 0 { + batch, size, resultCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } } - return opts.exhaustive(size, docsCount), nil + return opts.exhaustive(size, resultCount), nil } +// appendFieldAndTermToBatch adds the provided field / term onto the batch, +// optionally reusing the last element of the batch if it pertains to the same field. func (b *block) appendFieldAndTermToBatch( batch []AggregateResultsEntry, field, term []byte, includeTerms bool, -) []AggregateResultsEntry { +) ([]AggregateResultsEntry, int) { // NB(prateek): we make a copy of the (field, term) entries returned // by the iterator during traversal, because the []byte are only valid per entry during // the traversal (i.e. calling Next() invalidates the []byte). We choose to do this @@ -750,6 +774,7 @@ func (b *block) appendFieldAndTermToBatch( lastField []byte lastFieldIsValid bool reuseLastEntry bool + numAppended int ) // we are iterating multiple segments so we may receive duplicates (same field/term), but // as we are iterating one segment at a time, and because the underlying index structures @@ -772,6 +797,7 @@ func (b *block) appendFieldAndTermToBatch( reuseLastEntry = true entry = batch[len(batch)-1] // avoid alloc cause we already have the field } else { + numAppended++ // allocate id because this is the first time we've seen it // NB(r): Iterating fields FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -780,6 +806,7 @@ func (b *block) appendFieldAndTermToBatch( } if includeTerms { + numAppended++ // terms are always new (as far we know without checking the map for duplicates), so we allocate // NB(r): Iterating terms FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -792,7 +819,8 @@ func (b *block) appendFieldAndTermToBatch( } else { batch = append(batch, entry) } - return batch + + return batch, numAppended } func (b *block) pooledID(id []byte) ident.ID { @@ -803,6 +831,8 @@ func (b *block) pooledID(id []byte) ident.ID { return b.opts.IdentifierPool().BinaryID(data) } +// addAggregateResults adds the fields on the batch +// to the provided results and resets the batch to be reused. func (b *block) addAggregateResults( cancellable *xresource.CancellableLifetime, results AggregateResults, @@ -823,8 +853,8 @@ func (b *block) addAggregateResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the xresource. - size, docsCount := results.AddFields(batch) + // try to add the docs to the resource. + size, resultCount := results.AddFields(batch) // immediately release the checkout on the lifetime of query. cancellable.ReleaseCheckout() @@ -837,7 +867,7 @@ func (b *block) addAggregateResults( batch = batch[:0] // return results. - return batch, size, docsCount, nil + return batch, size, resultCount, nil } func (b *block) AddResults( diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index cedac63b67..960147a3fd 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -1872,6 +1872,18 @@ func TestBlockAggregate(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(iOpts). + SetDocsLimitOpts(limits.LookbackLimitOptions{Limit: 50, Lookback: time.Minute}). + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + queryLimits, err := limits.NewQueryLimits((limitOpts)) + require.NoError(t, err) + testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits) + + // NB: seriesLimit must be higher than the number of fields to be exhaustive. + seriesLimit := 10 testMD := newTestNSMetadata(t) start := time.Now().Truncate(time.Hour) blk, err := NewBlock(start, testMD, BlockOptions{}, @@ -1894,7 +1906,7 @@ func TestBlockAggregate(t *testing.T) { } results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ - SizeLimit: 3, + SizeLimit: seriesLimit, Type: AggregateTagNamesAndValues, }, testOpts) @@ -1906,24 +1918,23 @@ func TestBlockAggregate(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - gomock.InOrder( - iter.EXPECT().Reset(reader, gomock.Any()).Return(nil), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")), - iter.EXPECT().Next().Return(false), - iter.EXPECT().Err().Return(nil), - iter.EXPECT().Close().Return(nil), - ) + iter.EXPECT().Reset(reader, gomock.Any()).Return(nil) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")) + iter.EXPECT().Next().Return(false) + iter.EXPECT().Err().Return(nil) + iter.EXPECT().Close().Return(nil) + exhaustive, err := b.Aggregate( ctx, xresource.NewCancellableLifetime(), - QueryOptions{SeriesLimit: 3}, + QueryOptions{SeriesLimit: seriesLimit}, results, emptyLogFields) require.NoError(t, err) @@ -1938,6 +1949,10 @@ func TestBlockAggregate(t *testing.T) { spans := mtr.FinishedSpans() require.Len(t, spans, 2) require.Equal(t, tracepoint.BlockAggregate, spans[0].OperationName) + + snap := scope.Snapshot() + tallytest.AssertCounterValue(t, 4, snap, "query-limit.total-docs-matched", nil) + tallytest.AssertCounterValue(t, 8, snap, "aggregate-added-counter", nil) } func TestBlockAggregateNotExhaustive(t *testing.T) { @@ -2006,7 +2021,7 @@ func TestBlockAggregateNotExhaustive(t *testing.T) { require.False(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ - "f1": {"t1"}, + "f1": {}, }, results) sp.Finish() diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 0c44cf9614..a4d097f18a 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -128,32 +128,123 @@ func (mr *MockBaseResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockBaseResults)(nil).EnforceLimits)) } -// AddDocuments mocks base method -func (m *MockBaseResults) AddDocuments(batch []doc.Document) (int, int, error) { +// Finalize mocks base method +func (m *MockBaseResults) Finalize() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddDocuments", batch) + m.ctrl.Call(m, "Finalize") +} + +// Finalize indicates an expected call of Finalize +func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) +} + +// MockDocumentResults is a mock of DocumentResults interface +type MockDocumentResults struct { + ctrl *gomock.Controller + recorder *MockDocumentResultsMockRecorder +} + +// MockDocumentResultsMockRecorder is the mock recorder for MockDocumentResults +type MockDocumentResultsMockRecorder struct { + mock *MockDocumentResults +} + +// NewMockDocumentResults creates a new mock instance +func NewMockDocumentResults(ctrl *gomock.Controller) *MockDocumentResults { + mock := &MockDocumentResults{ctrl: ctrl} + mock.recorder = &MockDocumentResultsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDocumentResults) EXPECT() *MockDocumentResultsMockRecorder { + return m.recorder +} + +// Namespace mocks base method +func (m *MockDocumentResults) Namespace() ident.ID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Namespace") + ret0, _ := ret[0].(ident.ID) + return ret0 +} + +// Namespace indicates an expected call of Namespace +func (mr *MockDocumentResultsMockRecorder) Namespace() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Namespace", reflect.TypeOf((*MockDocumentResults)(nil).Namespace)) +} + +// Size mocks base method +func (m *MockDocumentResults) Size() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") ret0, _ := ret[0].(int) - ret1, _ := ret[1].(int) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + return ret0 } -// AddDocuments indicates an expected call of AddDocuments -func (mr *MockBaseResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { +// Size indicates an expected call of Size +func (mr *MockDocumentResultsMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockDocumentResults)(nil).Size)) +} + +// TotalDocsCount mocks base method +func (m *MockDocumentResults) TotalDocsCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDocsCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// TotalDocsCount indicates an expected call of TotalDocsCount +func (mr *MockDocumentResultsMockRecorder) TotalDocsCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockDocumentResults)(nil).TotalDocsCount)) +} + +// EnforceLimits mocks base method +func (m *MockDocumentResults) EnforceLimits() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnforceLimits") + ret0, _ := ret[0].(bool) + return ret0 +} + +// EnforceLimits indicates an expected call of EnforceLimits +func (mr *MockDocumentResultsMockRecorder) EnforceLimits() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockBaseResults)(nil).AddDocuments), batch) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockDocumentResults)(nil).EnforceLimits)) } // Finalize mocks base method -func (m *MockBaseResults) Finalize() { +func (m *MockDocumentResults) Finalize() { m.ctrl.T.Helper() m.ctrl.Call(m, "Finalize") } // Finalize indicates an expected call of Finalize -func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { +func (mr *MockDocumentResultsMockRecorder) Finalize() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockDocumentResults)(nil).Finalize)) +} + +// AddDocuments mocks base method +func (m *MockDocumentResults) AddDocuments(batch []doc.Document) (int, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddDocuments", batch) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// AddDocuments indicates an expected call of AddDocuments +func (mr *MockDocumentResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockDocumentResults)(nil).AddDocuments), batch) } // MockQueryResults is a mock of QueryResults interface @@ -235,6 +326,18 @@ func (mr *MockQueryResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockQueryResults)(nil).EnforceLimits)) } +// Finalize mocks base method +func (m *MockQueryResults) Finalize() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Finalize") +} + +// Finalize indicates an expected call of Finalize +func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) +} + // AddDocuments mocks base method func (m *MockQueryResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() @@ -251,18 +354,6 @@ func (mr *MockQueryResultsMockRecorder) AddDocuments(batch interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockQueryResults)(nil).AddDocuments), batch) } -// Finalize mocks base method -func (m *MockQueryResults) Finalize() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Finalize") -} - -// Finalize indicates an expected call of Finalize -func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) -} - // Reset mocks base method func (m *MockQueryResults) Reset(nsID ident.ID, opts QueryResultsOptions) { m.ctrl.T.Helper() @@ -429,22 +520,6 @@ func (mr *MockAggregateResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockAggregateResults)(nil).EnforceLimits)) } -// AddDocuments mocks base method -func (m *MockAggregateResults) AddDocuments(batch []doc.Document) (int, int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddDocuments", batch) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(int) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// AddDocuments indicates an expected call of AddDocuments -func (mr *MockAggregateResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockAggregateResults)(nil).AddDocuments), batch) -} - // Finalize mocks base method func (m *MockAggregateResults) Finalize() { m.ctrl.T.Helper() @@ -512,6 +587,89 @@ func (mr *MockAggregateResultsMockRecorder) Map() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Map", reflect.TypeOf((*MockAggregateResults)(nil).Map)) } +// MockAggregateUsageMetrics is a mock of AggregateUsageMetrics interface +type MockAggregateUsageMetrics struct { + ctrl *gomock.Controller + recorder *MockAggregateUsageMetricsMockRecorder +} + +// MockAggregateUsageMetricsMockRecorder is the mock recorder for MockAggregateUsageMetrics +type MockAggregateUsageMetricsMockRecorder struct { + mock *MockAggregateUsageMetrics +} + +// NewMockAggregateUsageMetrics creates a new mock instance +func NewMockAggregateUsageMetrics(ctrl *gomock.Controller) *MockAggregateUsageMetrics { + mock := &MockAggregateUsageMetrics{ctrl: ctrl} + mock.recorder = &MockAggregateUsageMetricsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockAggregateUsageMetrics) EXPECT() *MockAggregateUsageMetricsMockRecorder { + return m.recorder +} + +// IncTotal mocks base method +func (m *MockAggregateUsageMetrics) IncTotal(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncTotal", val) +} + +// IncTotal indicates an expected call of IncTotal +func (mr *MockAggregateUsageMetricsMockRecorder) IncTotal(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncTotal", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncTotal), val) +} + +// IncTotalTerms mocks base method +func (m *MockAggregateUsageMetrics) IncTotalTerms(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncTotalTerms", val) +} + +// IncTotalTerms indicates an expected call of IncTotalTerms +func (mr *MockAggregateUsageMetricsMockRecorder) IncTotalTerms(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncTotalTerms", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncTotalTerms), val) +} + +// IncDedupedTerms mocks base method +func (m *MockAggregateUsageMetrics) IncDedupedTerms(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncDedupedTerms", val) +} + +// IncDedupedTerms indicates an expected call of IncDedupedTerms +func (mr *MockAggregateUsageMetricsMockRecorder) IncDedupedTerms(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncDedupedTerms", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncDedupedTerms), val) +} + +// IncTotalFields mocks base method +func (m *MockAggregateUsageMetrics) IncTotalFields(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncTotalFields", val) +} + +// IncTotalFields indicates an expected call of IncTotalFields +func (mr *MockAggregateUsageMetricsMockRecorder) IncTotalFields(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncTotalFields", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncTotalFields), val) +} + +// IncDedupedFields mocks base method +func (m *MockAggregateUsageMetrics) IncDedupedFields(val int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncDedupedFields", val) +} + +// IncDedupedFields indicates an expected call of IncDedupedFields +func (mr *MockAggregateUsageMetricsMockRecorder) IncDedupedFields(val interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncDedupedFields", reflect.TypeOf((*MockAggregateUsageMetrics)(nil).IncDedupedFields), val) +} + // MockAggregateResultsPool is a mock of AggregateResultsPool interface type MockAggregateResultsPool struct { ctrl *gomock.Controller @@ -774,7 +932,7 @@ func (mr *MockBlockMockRecorder) WriteBatch(inserts interface{}) *gomock.Call { } // Query mocks base method -func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results BaseResults, logFields []log.Field) (bool, error) { +func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results DocumentResults, logFields []log.Field) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Query", ctx, cancellable, query, opts, results, logFields) ret0, _ := ret[0].(bool) diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 9e5e07ec07..01fa78f609 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -159,22 +159,29 @@ type BaseResults interface { // EnforceLimits returns whether this should enforce and increment limits. EnforceLimits() bool + // Finalize releases any resources held by the Results object, + // including returning it to a backing pool. + Finalize() +} + +// DocumentResults is a collection of query results that allow accumulation of +// document values, it is synchronized when access to the results set is used +// as documented by the methods. +type DocumentResults interface { + BaseResults + // AddDocuments adds the batch of documents to the results set, it will // take a copy of the bytes backing the documents so the original can be // modified after this function returns without affecting the results map. // TODO(r): We will need to change this behavior once index fields are // mutable and the most recent need to shadow older entries. AddDocuments(batch []doc.Document) (size, docsCount int, err error) - - // Finalize releases any resources held by the Results object, - // including returning it to a backing pool. - Finalize() } // QueryResults is a collection of results for a query, it is synchronized // when access to the results set is used as documented by the methods. type QueryResults interface { - BaseResults + DocumentResults // Reset resets the Results object to initial state. Reset(nsID ident.ID, opts QueryResultsOptions) @@ -259,6 +266,9 @@ type AggregateResultsOptions struct { // overflown will return early successfully. SizeLimit int + // DocsLimit limits the amount of documents + DocsLimit int + // Type determines what result is required. Type AggregationType @@ -268,6 +278,24 @@ type AggregateResultsOptions struct { // RestrictByQuery is a query to restrict the set of documents that must // be present for an aggregated term to be returned. RestrictByQuery *Query + + // AggregateUsageMetrics are aggregate usage metrics that track field + // and term counts for aggregate queries. + AggregateUsageMetrics AggregateUsageMetrics +} + +// AggregateUsageMetrics are metrics for aggregate query usage. +type AggregateUsageMetrics interface { + // IncTotal increments the total metric count. + IncTotal(val int64) + // IncTotalTerms increments the totalTerms metric count. + IncTotalTerms(val int64) + // IncDedupedTerms increments the dedupedTerms metric count. + IncDedupedTerms(val int64) + // IncTotalFields increments the totalFields metric count. + IncTotalFields(val int64) + // IncDedupedFields increments the dedupedFields metric count. + IncDedupedFields(val int64) } // AggregateResultsAllocator allocates AggregateResults types. @@ -357,7 +385,7 @@ type Block interface { cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, logFields []opentracinglog.Field, ) (bool, error) diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index af6b707584..cb5ad85312 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -38,6 +38,8 @@ var ErrWideQueryResultsExhausted = errors.New("no more values to add to wide que type shardFilterFn func(ident.ID) (uint32, bool) +var _ DocumentResults = (*wideResults)(nil) + type wideResults struct { sync.RWMutex size int @@ -72,7 +74,7 @@ func NewWideQueryResults( shardFilter shardFilterFn, collector chan *ident.IDBatch, opts WideQueryOptions, -) BaseResults { +) DocumentResults { batchSize := opts.BatchSize results := &wideResults{ nsID: namespaceID, diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index a4cbadcdfa..5438bcebb9 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -829,7 +829,7 @@ func TestLimits(t *testing.T) { cancellable interface{}, query interface{}, opts interface{}, - results index.BaseResults, + results index.DocumentResults, logFields interface{}) (bool, error) { _, _, err = results.AddDocuments([]doc.Document{ // Results in size=1 and docs=2. From 01d2dbf3f6779dda3e00c16a1fbde6601a3d56fe Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 29 Jan 2021 03:59:12 -0500 Subject: [PATCH 4/7] Added fixes to aggregate query limit calculation --- src/dbnode/storage/index/block.go | 92 +++++++++++++++++--------- src/dbnode/storage/index/block_test.go | 6 +- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 26c7d55d4d..7cd269fad2 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -657,8 +657,12 @@ func (b *block) aggregateWithSpan( batch = b.opts.AggregateResultsEntryArrayPool().Get() maxBatch = cap(batch) iterClosed = false // tracking whether we need to free the iterator at the end. - currBatchSize int - numAdded int + fieldAppended bool + termAppended bool + lastField []byte + batchedFields int + currFields int + currTerms int ) if maxBatch == 0 { maxBatch = defaultAggregateResultsEntryBatchSize @@ -687,11 +691,11 @@ func (b *block) aggregateWithSpan( })) } - if opts.SeriesLimit < maxBatch { + if opts.SeriesLimit > 0 && opts.SeriesLimit < maxBatch { maxBatch = opts.SeriesLimit } - if opts.DocsLimit < maxBatch { + if opts.DocsLimit > 0 && opts.DocsLimit < maxBatch { maxBatch = opts.DocsLimit } @@ -711,19 +715,49 @@ func (b *block) aggregateWithSpan( } field, term := iter.Current() - batch, numAdded = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) - currBatchSize += numAdded - // continue appending to the batch until we hit our max batch size - if currBatchSize < maxBatch { - continue - } - - // update recently queried docs to monitor memory. + // TODO: remove this legacy doc tracking implementation when alternative + // limits are in place. if results.EnforceLimits() { - if err := b.docsLimit.Inc(len(batch), source); err != nil { - return false, err + if lastField == nil { + lastField = append(lastField, field...) + batchedFields++ + if err := b.docsLimit.Inc(1, source); err != nil { + return false, err + } + } else if !bytes.Equal(lastField, field) { + lastField = lastField[:0] + lastField = append(lastField, field...) + batchedFields++ + if err := b.docsLimit.Inc(1, source); err != nil { + return false, err + } + } + + // NB: this logic increments the doc count to account for where the + // legacy limits would have been updated. It increments by two to + // reflect the term appearing as both the last element of the previous + // batch, as well as the first element in the next batch. + if batchedFields > maxBatch { + if err := b.docsLimit.Inc(2, source); err != nil { + return false, err + } + + batchedFields = 1 } + + } + + batch, fieldAppended, termAppended = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) + if fieldAppended { + currFields++ + } + if termAppended { + currTerms++ + } + // continue appending to the batch until we hit our max batch size. + if currFields+currTerms < maxBatch { + continue } batch, size, resultCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) @@ -731,7 +765,8 @@ func (b *block) aggregateWithSpan( return false, err } - currBatchSize = 0 + currFields = 0 + currTerms = 0 } if err := iter.Err(); err != nil { @@ -757,11 +792,13 @@ func (b *block) aggregateWithSpan( // appendFieldAndTermToBatch adds the provided field / term onto the batch, // optionally reusing the last element of the batch if it pertains to the same field. +// First boolean result indicates that a unique field was added to the batch +// and the second boolean indicates if a unique term was added. func (b *block) appendFieldAndTermToBatch( batch []AggregateResultsEntry, field, term []byte, includeTerms bool, -) ([]AggregateResultsEntry, int) { +) ([]AggregateResultsEntry, bool, bool) { // NB(prateek): we make a copy of the (field, term) entries returned // by the iterator during traversal, because the []byte are only valid per entry during // the traversal (i.e. calling Next() invalidates the []byte). We choose to do this @@ -770,11 +807,11 @@ func (b *block) appendFieldAndTermToBatch( // idents is transferred to the results map, which either hangs on to them (if they are new), // or finalizes them if they are duplicates. var ( - entry AggregateResultsEntry - lastField []byte - lastFieldIsValid bool - reuseLastEntry bool - numAppended int + entry AggregateResultsEntry + lastField []byte + lastFieldIsValid bool + reuseLastEntry bool + fieldsAdded, termsAdded bool ) // we are iterating multiple segments so we may receive duplicates (same field/term), but // as we are iterating one segment at a time, and because the underlying index structures @@ -797,7 +834,7 @@ func (b *block) appendFieldAndTermToBatch( reuseLastEntry = true entry = batch[len(batch)-1] // avoid alloc cause we already have the field } else { - numAppended++ + fieldsAdded = true // allocate id because this is the first time we've seen it // NB(r): Iterating fields FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -806,7 +843,7 @@ func (b *block) appendFieldAndTermToBatch( } if includeTerms { - numAppended++ + termsAdded = true // terms are always new (as far we know without checking the map for duplicates), so we allocate // NB(r): Iterating terms FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -820,7 +857,7 @@ func (b *block) appendFieldAndTermToBatch( batch = append(batch, entry) } - return batch, numAppended + return batch, fieldsAdded, termsAdded } func (b *block) pooledID(id []byte) ident.ID { @@ -839,13 +876,6 @@ func (b *block) addAggregateResults( batch []AggregateResultsEntry, source []byte, ) ([]AggregateResultsEntry, int, int, error) { - // update recently queried docs to monitor memory. - if results.EnforceLimits() { - if err := b.docsLimit.Inc(len(batch), source); err != nil { - return batch, 0, 0, err - } - } - // checkout the lifetime of the query before adding results. queryValid := cancellable.TryCheckout() if !queryValid { diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 960147a3fd..18e79c713a 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -2416,10 +2416,8 @@ func TestBlockAggregateBatching(t *testing.T) { count += len(entry.Terms) } - // FIXME: this currently fails, but will be fixed after - // https://github.com/m3db/m3/pull/3133 is reverted. - // require.True(t, count <= tt.batchSize, - // fmt.Sprintf("batch %v exceeds batchSize %d", batch, tt.batchSize)) + require.True(t, count <= tt.batchSize, + fmt.Sprintf("batch %v exceeds batchSize %d", batch, tt.batchSize)) return addAggregateResultsFn(cancellable, results, batch, source) } From 969c7bc32db36b7bc7cd20a6a91be230de66c528 Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 29 Jan 2021 09:08:03 -0500 Subject: [PATCH 5/7] Lint + build fix --- src/dbnode/storage/index/aggregate_results.go | 11 ++++++++--- src/dbnode/storage/index/block.go | 3 --- src/dbnode/storage/index/block_test.go | 3 +-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index bcee7b81c3..05d6c5ac43 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -225,9 +225,14 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } } - docs := 0 - numInserts := 0 - for _, entry := range batch { + var ( + docs int + numInserts int + entry AggregateResultsEntry + ) + + for idx := 0; idx < len(batch); idx++ { + entry = batch[idx] r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1) if docs >= remainingDocs || numInserts >= remainingInserts { diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 7cd269fad2..5ff7ec0340 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -234,7 +234,6 @@ func NewBlock( iopts, ) - aggAdded := opts.InstrumentOptions().MetricsScope().Counter("aggregate-added-counter") // NB(bodu): The length of coldMutableSegments is always at least 1. coldSegs := []*mutableSegments{ newMutableSegments( @@ -261,7 +260,6 @@ func NewBlock( metrics: newBlockMetrics(scope), logger: iopts.Logger(), docsLimit: opts.QueryLimits().DocsLimit(), - aggregatedAddedCounter: aggAdded, } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator b.newExecutorWithRLockFn = b.executorWithRLock @@ -745,7 +743,6 @@ func (b *block) aggregateWithSpan( batchedFields = 1 } - } batch, fieldAppended, termAppended = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 18e79c713a..dd1e850641 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -1951,8 +1951,7 @@ func TestBlockAggregate(t *testing.T) { require.Equal(t, tracepoint.BlockAggregate, spans[0].OperationName) snap := scope.Snapshot() - tallytest.AssertCounterValue(t, 4, snap, "query-limit.total-docs-matched", nil) - tallytest.AssertCounterValue(t, 8, snap, "aggregate-added-counter", nil) + tallytest.AssertCounterValue(t, 3, snap, "query-limit.total-docs-matched", nil) } func TestBlockAggregateNotExhaustive(t *testing.T) { From 2048c637c37f0732090718e83f936b38b88221ab Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 1 Feb 2021 11:12:19 -0500 Subject: [PATCH 6/7] Lint + response --- src/dbnode/storage/index/block.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 5ff7ec0340..a86bf38cd3 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -150,7 +150,6 @@ type block struct { nsMD namespace.Metadata namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager docsLimit limits.LookbackLimit - aggregatedAddedCounter tally.Counter metrics blockMetrics logger *zap.Logger @@ -804,11 +803,11 @@ func (b *block) appendFieldAndTermToBatch( // idents is transferred to the results map, which either hangs on to them (if they are new), // or finalizes them if they are duplicates. var ( - entry AggregateResultsEntry - lastField []byte - lastFieldIsValid bool - reuseLastEntry bool - fieldsAdded, termsAdded bool + entry AggregateResultsEntry + lastField []byte + lastFieldIsValid bool + reuseLastEntry bool + newFieldAdded, newTermAdded bool ) // we are iterating multiple segments so we may receive duplicates (same field/term), but // as we are iterating one segment at a time, and because the underlying index structures @@ -831,7 +830,7 @@ func (b *block) appendFieldAndTermToBatch( reuseLastEntry = true entry = batch[len(batch)-1] // avoid alloc cause we already have the field } else { - fieldsAdded = true + newFieldAdded = true // allocate id because this is the first time we've seen it // NB(r): Iterating fields FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -840,7 +839,7 @@ func (b *block) appendFieldAndTermToBatch( } if includeTerms { - termsAdded = true + newTermAdded = true // terms are always new (as far we know without checking the map for duplicates), so we allocate // NB(r): Iterating terms FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -854,7 +853,7 @@ func (b *block) appendFieldAndTermToBatch( batch = append(batch, entry) } - return batch, fieldsAdded, termsAdded + return batch, newFieldAdded, newTermAdded } func (b *block) pooledID(id []byte) ident.ID { From 97f5761d82ad1f095d8268bb74964522515499ae Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 8 Feb 2021 19:52:41 -0500 Subject: [PATCH 7/7] codegen --- src/dbnode/storage/index/index_mock.go | 38 ++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 6b4e52bc01..d33d2c13c0 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -243,6 +243,44 @@ func (mr *MockDocumentResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockDocumentResults)(nil).TotalDocsCount)) } +// TotalDuration mocks base method +func (m *MockDocumentResults) TotalDuration() ResultDurations { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDuration") + ret0, _ := ret[0].(ResultDurations) + return ret0 +} + +// TotalDuration indicates an expected call of TotalDuration +func (mr *MockDocumentResultsMockRecorder) TotalDuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockDocumentResults)(nil).TotalDuration)) +} + +// AddBlockProcessingDuration mocks base method +func (m *MockDocumentResults) AddBlockProcessingDuration(duration time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddBlockProcessingDuration", duration) +} + +// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration +func (mr *MockDocumentResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockProcessingDuration), duration) +} + +// AddBlockSearchDuration mocks base method +func (m *MockDocumentResults) AddBlockSearchDuration(duration time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddBlockSearchDuration", duration) +} + +// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration +func (mr *MockDocumentResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockSearchDuration), duration) +} + // EnforceLimits mocks base method func (m *MockDocumentResults) EnforceLimits() bool { m.ctrl.T.Helper()