Skip to content

Commit

Permalink
[dbnode] Add aggregate term limit regression test (#3135)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Jan 29, 2021
1 parent 922a33b commit bef2564
Show file tree
Hide file tree
Showing 3 changed files with 421 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ func (s shardRangesSegmentsByVolumeType) forEachSegmentGroup(cb func(group block
return nil
}

type addAggregateResultsFn func(
cancellable *xresource.CancellableLifetime,
results AggregateResults,
batch []AggregateResultsEntry,
source []byte,
) ([]AggregateResultsEntry, int, int, error)

// nolint: maligned
type block struct {
sync.RWMutex
Expand All @@ -133,6 +140,7 @@ type block struct {
shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType
newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn
newExecutorWithRLockFn newExecutorFn
addAggregateResultsFn addAggregateResultsFn
blockStart time.Time
blockEnd time.Time
blockSize time.Duration
Expand Down Expand Up @@ -256,6 +264,7 @@ func NewBlock(
}
b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator
b.newExecutorWithRLockFn = b.executorWithRLock
b.addAggregateResultsFn = b.addAggregateResults

return b, nil
}
Expand Down Expand Up @@ -697,7 +706,7 @@ func (b *block) aggregateWithSpan(
continue
}

batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source)
batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source)
if err != nil {
return false, err
}
Expand All @@ -715,7 +724,7 @@ func (b *block) aggregateWithSpan(

// Add last batch to results if remaining.
if len(batch) > 0 {
batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source)
batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source)
if err != nil {
return false, err
}
Expand Down
208 changes: 208 additions & 0 deletions src/dbnode/storage/index/block_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,31 @@ import (
"fmt"
"math/rand"
"os"
"sort"
"testing"
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
"github.com/m3db/m3/src/m3ninx/index/segment/mem"
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
"github.com/m3db/m3/src/m3ninx/search"
"github.com/m3db/m3/src/m3ninx/search/proptest"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xresource "github.com/m3db/m3/src/x/resource"

"github.com/leanovate/gopter"
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
)

var (
Expand Down Expand Up @@ -197,3 +204,204 @@ func newPropTestBlock(t *testing.T, blockStart time.Time, nsMeta namespace.Metad
require.NoError(t, err)
return blk, nil
}

type testFields struct {
name string
values []string
}

func genField() gopter.Gen {
return gopter.CombineGens(
gen.AlphaString(),
gen.SliceOf(gen.AlphaString()),
).Map(func(input []interface{}) testFields {
var (
name = input[0].(string)
values = input[1].([]string)
)

return testFields{
name: name,
values: values,
}
})
}

type propTestSegment struct {
metadata doc.Metadata
exCount int64
segmentMap segmentMap
}

type testValuesSet map[string]struct{} //nolint:gofumpt
type segmentMap map[string]testValuesSet //nolint:gofumpt

func genTestSegment() gopter.Gen {
return gen.SliceOf(genField()).Map(func(input []testFields) propTestSegment {
segMap := make(segmentMap, len(input))
for _, field := range input { //nolint:gocritic
for _, value := range field.values {
exVals, found := segMap[field.name]
if !found {
exVals = make(testValuesSet)
}
exVals[value] = struct{}{}
segMap[field.name] = exVals
}
}

fields := make([]testFields, 0, len(input))
for name, valSet := range segMap {
vals := make([]string, 0, len(valSet))
for val := range valSet {
vals = append(vals, val)
}

sort.Strings(vals)
fields = append(fields, testFields{name: name, values: vals})
}

sort.Slice(fields, func(i, j int) bool {
return fields[i].name < fields[j].name
})

docFields := []doc.Field{}
for _, field := range fields { //nolint:gocritic
for _, val := range field.values {
docFields = append(docFields, doc.Field{
Name: []byte(field.name),
Value: []byte(val),
})
}
}

return propTestSegment{
metadata: doc.Metadata{Fields: docFields},
exCount: int64(len(segMap)),
segmentMap: segMap,
}
})
}

func verifyResults(
t *testing.T,
results AggregateResults,
exMap segmentMap,
) {
resultMap := make(segmentMap, results.Map().Len())
for _, field := range results.Map().Iter() { //nolint:gocritic
name := field.Key().String()
_, found := resultMap[name]
require.False(t, found, "duplicate values in results map")

values := make(testValuesSet, field.value.Map().Len())
for _, value := range field.value.Map().Iter() {
val := value.Key().String()
_, found := values[val]
require.False(t, found, "duplicate values in results map")

values[val] = struct{}{}
}

resultMap[name] = values
}

require.Equal(t, resultMap, exMap)
}

func TestAggregateDocLimits(t *testing.T) {
var (
parameters = gopter.DefaultTestParameters()
seed = time.Now().UnixNano()
reporter = gopter.NewFormatedReporter(true, 160, os.Stdout)
)

parameters.MinSuccessfulTests = 1000
parameters.MinSize = 5
parameters.MaxSize = 10
parameters.Rng = rand.New(rand.NewSource(seed)) //nolint:gosec
properties := gopter.NewProperties(parameters)

properties.Property("segments dedupe and have correct docs counts", prop.ForAll(
func(testSegment propTestSegment) (bool, error) {
seg, err := mem.NewSegment(mem.NewOptions())
if err != nil {
return false, err
}

_, err = seg.Insert(testSegment.metadata)
if err != nil {
return false, err
}

err = seg.Seal()
if err != nil {
return false, err
}

scope := tally.NewTestScope("", nil)
iOpts := instrument.NewOptions().SetMetricsScope(scope)
limitOpts := limits.NewOptions().
SetInstrumentOptions(iOpts).
SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}).
SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute})
queryLimits, err := limits.NewQueryLimits((limitOpts))
require.NoError(t, err)
testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits)

testMD := newTestNSMetadata(t)
start := time.Now().Truncate(time.Hour)
blk, err := NewBlock(start, testMD, BlockOptions{},
namespace.NewRuntimeOptionsManager("foo"), testOpts)
if err != nil {
return false, err
}

b, ok := blk.(*block)
if !ok {
return false, errors.New("bad block type")
}

b.mutableSegments.foregroundSegments = []*readableSeg{
newReadableSeg(seg, testOpts),
}

results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{
Type: AggregateTagNamesAndValues,
}, testOpts)

ctx := context.NewContext()
defer ctx.BlockingClose()

exhaustive, err := b.Aggregate(
ctx,
xresource.NewCancellableLifetime(),
QueryOptions{},
results,
emptyLogFields)

if err != nil {
return false, err
}

require.True(t, exhaustive, errors.New("not exhaustive"))
verifyResults(t, results, testSegment.segmentMap)
found := false
for _, c := range scope.Snapshot().Counters() {
if c.Name() == "query-limit.total-docs-matched" {
require.Equal(t, testSegment.exCount, c.Value(), "docs count mismatch")
found = true
break
}
}

require.True(t, found, "counter not found in metrics")
return true, nil
},
genTestSegment(),
))

if !properties.Run(reporter) {
t.Errorf("failed with initial seed: %d", seed)
}
}
Loading

0 comments on commit bef2564

Please sign in to comment.