Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Add aggregate term limit regression test #3135

Merged
merged 4 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ func (s shardRangesSegmentsByVolumeType) forEachSegmentGroup(cb func(group block
return nil
}

type addAggregateResultsFn func(
cancellable *xresource.CancellableLifetime,
results AggregateResults,
batch []AggregateResultsEntry,
source []byte,
) ([]AggregateResultsEntry, int, int, error)

// nolint: maligned
type block struct {
sync.RWMutex
Expand All @@ -133,6 +140,7 @@ type block struct {
shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType
newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn
newExecutorWithRLockFn newExecutorFn
addAggregateResultsFn addAggregateResultsFn
blockStart time.Time
blockEnd time.Time
blockSize time.Duration
Expand Down Expand Up @@ -256,6 +264,7 @@ func NewBlock(
}
b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator
b.newExecutorWithRLockFn = b.executorWithRLock
b.addAggregateResultsFn = b.addAggregateResults

return b, nil
}
Expand Down Expand Up @@ -697,7 +706,7 @@ func (b *block) aggregateWithSpan(
continue
}

batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source)
batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source)
if err != nil {
return false, err
}
Expand All @@ -715,7 +724,7 @@ func (b *block) aggregateWithSpan(

// Add last batch to results if remaining.
if len(batch) > 0 {
batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source)
batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source)
if err != nil {
return false, err
}
Expand Down
208 changes: 208 additions & 0 deletions src/dbnode/storage/index/block_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,31 @@ import (
"fmt"
"math/rand"
"os"
"sort"
"testing"
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
"github.com/m3db/m3/src/m3ninx/index/segment/mem"
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
"github.com/m3db/m3/src/m3ninx/search"
"github.com/m3db/m3/src/m3ninx/search/proptest"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xresource "github.com/m3db/m3/src/x/resource"

"github.com/leanovate/gopter"
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
)

var (
Expand Down Expand Up @@ -197,3 +204,204 @@ func newPropTestBlock(t *testing.T, blockStart time.Time, nsMeta namespace.Metad
require.NoError(t, err)
return blk, nil
}

type testFields struct {
name string
values []string
}

func genField() gopter.Gen {
return gopter.CombineGens(
gen.AlphaString(),
gen.SliceOf(gen.AlphaString()),
).Map(func(input []interface{}) testFields {
var (
name = input[0].(string)
values = input[1].([]string)
)

return testFields{
name: name,
values: values,
}
})
}

type propTestSegment struct {
metadata doc.Metadata
exCount int64
segmentMap segmentMap
}

type testValuesSet map[string]struct{} //nolint:gofumpt
type segmentMap map[string]testValuesSet //nolint:gofumpt

func genTestSegment() gopter.Gen {
return gen.SliceOf(genField()).Map(func(input []testFields) propTestSegment {
segMap := make(segmentMap, len(input))
for _, field := range input { //nolint:gocritic
for _, value := range field.values {
exVals, found := segMap[field.name]
if !found {
exVals = make(testValuesSet)
}
exVals[value] = struct{}{}
segMap[field.name] = exVals
}
}

fields := make([]testFields, 0, len(input))
for name, valSet := range segMap {
vals := make([]string, 0, len(valSet))
for val := range valSet {
vals = append(vals, val)
}

sort.Strings(vals)
fields = append(fields, testFields{name: name, values: vals})
}

sort.Slice(fields, func(i, j int) bool {
return fields[i].name < fields[j].name
})

docFields := []doc.Field{}
for _, field := range fields { //nolint:gocritic
for _, val := range field.values {
docFields = append(docFields, doc.Field{
Name: []byte(field.name),
Value: []byte(val),
})
}
}

return propTestSegment{
metadata: doc.Metadata{Fields: docFields},
exCount: int64(len(segMap)),
segmentMap: segMap,
}
})
}

func verifyResults(
t *testing.T,
results AggregateResults,
exMap segmentMap,
) {
resultMap := make(segmentMap, results.Map().Len())
for _, field := range results.Map().Iter() { //nolint:gocritic
name := field.Key().String()
_, found := resultMap[name]
require.False(t, found, "duplicate values in results map")

values := make(testValuesSet, field.value.Map().Len())
for _, value := range field.value.Map().Iter() {
val := value.Key().String()
_, found := values[val]
require.False(t, found, "duplicate values in results map")

values[val] = struct{}{}
}

resultMap[name] = values
}

require.Equal(t, resultMap, exMap)
}

func TestAggregateDocLimits(t *testing.T) {
var (
parameters = gopter.DefaultTestParameters()
seed = time.Now().UnixNano()
reporter = gopter.NewFormatedReporter(true, 160, os.Stdout)
)

parameters.MinSuccessfulTests = 1000
parameters.MinSize = 5
parameters.MaxSize = 10
parameters.Rng = rand.New(rand.NewSource(seed)) //nolint:gosec
properties := gopter.NewProperties(parameters)

properties.Property("segments dedupe and have correct docs counts", prop.ForAll(
func(testSegment propTestSegment) (bool, error) {
seg, err := mem.NewSegment(mem.NewOptions())
if err != nil {
return false, err
}

_, err = seg.Insert(testSegment.metadata)
if err != nil {
return false, err
}

err = seg.Seal()
if err != nil {
return false, err
}

scope := tally.NewTestScope("", nil)
iOpts := instrument.NewOptions().SetMetricsScope(scope)
limitOpts := limits.NewOptions().
SetInstrumentOptions(iOpts).
SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}).
SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute})
queryLimits, err := limits.NewQueryLimits((limitOpts))
require.NoError(t, err)
testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits)

testMD := newTestNSMetadata(t)
start := time.Now().Truncate(time.Hour)
blk, err := NewBlock(start, testMD, BlockOptions{},
namespace.NewRuntimeOptionsManager("foo"), testOpts)
if err != nil {
return false, err
}

b, ok := blk.(*block)
if !ok {
return false, errors.New("bad block type")
}

b.mutableSegments.foregroundSegments = []*readableSeg{
newReadableSeg(seg, testOpts),
}

results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{
Type: AggregateTagNamesAndValues,
}, testOpts)

ctx := context.NewContext()
defer ctx.BlockingClose()

exhaustive, err := b.Aggregate(
ctx,
xresource.NewCancellableLifetime(),
QueryOptions{},
results,
emptyLogFields)

if err != nil {
return false, err
}

require.True(t, exhaustive, errors.New("not exhaustive"))
verifyResults(t, results, testSegment.segmentMap)
found := false
for _, c := range scope.Snapshot().Counters() {
if c.Name() == "query-limit.total-docs-matched" {
require.Equal(t, testSegment.exCount, c.Value(), "docs count mismatch")
found = true
break
}
}

require.True(t, found, "counter not found in metrics")
return true, nil
},
genTestSegment(),
))

if !properties.Run(reporter) {
t.Errorf("failed with initial seed: %d", seed)
}
}
Loading