diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 1f655f7533..e2a4db455d 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -387,6 +387,13 @@ type IndexConfiguration struct { // as they are very CPU-intensive (regex and FST matching). MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"` + // MaxWorkerTime is the maximum time a query can hold an index worker at once. If a query does not finish in this + // time it yields the worker and must wait again for another worker to resume. The number of workers available to + // all queries is defined by MaxQueryIDsConcurrency. + // Capping the maximum time per worker ensures a few large queries don't hold all the concurrent workers and lock + // out many small queries from running. + MaxWorkerTime time.Duration `yaml:"maxWorkerTime"` + // RegexpDFALimit is the limit on the max number of states used by a // regexp deterministic finite automaton. Default is 10,000 states. RegexpDFALimit *int `yaml:"regexpDFALimit"` diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 2a465803b6..3d03f31f06 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -339,6 +339,7 @@ func TestConfiguration(t *testing.T) { expected := `db: index: maxQueryIDsConcurrency: 0 + maxWorkerTime: 0s regexpDFALimit: null regexpFSALimit: null forwardIndexProbability: 0 diff --git a/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go b/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go index bc95fa2cd6..78ef5e03ba 100644 --- a/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go @@ -201,7 +201,7 @@ func (p *fakePermits) TryAcquire(_ context.Context) (bool, error) { return true, nil } -func (p *fakePermits) Release() { +func (p *fakePermits) Release(_ int64) { p.released++ p.available++ } diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 8b12fa9350..54ec72f068 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -1092,9 +1092,10 @@ func (i *fetchTaggedResultsIter) Close(err error) { i.seriesBlocks.RecordValue(float64(i.totalSeriesBlocks)) - for n := 0; n < i.batchesAcquired; n++ { - i.blockPermits.Release() + for n := 0; n < i.batchesAcquired-1; n++ { + i.blockPermits.Release(int64(i.blocksPerBatch)) } + i.blockPermits.Release(int64(i.blocksPerBatch - i.blocksAvailable)) } // IDResult is the FetchTagged result for a series ID. diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 8eb0c1b1a1..7125a05701 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -90,13 +90,12 @@ import ( xos "github.com/m3db/m3/src/x/os" "github.com/m3db/m3/src/x/pool" "github.com/m3db/m3/src/x/serialize" - xsync "github.com/m3db/m3/src/x/sync" apachethrift "github.com/apache/thrift/lib/go/thrift" "github.com/m3dbx/vellum/levenshtein" "github.com/m3dbx/vellum/levenshtein2" "github.com/m3dbx/vellum/regexp" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" "github.com/uber/tchannel-go" "go.etcd.io/etcd/embed" @@ -371,14 +370,6 @@ func Run(runOpts RunOptions) { opentracing.SetGlobalTracer(tracer) - if cfg.Index.MaxQueryIDsConcurrency != 0 { - queryIDsWorkerPool := xsync.NewWorkerPool(cfg.Index.MaxQueryIDsConcurrency) - queryIDsWorkerPool.Init() - opts = opts.SetQueryIDsWorkerPool(queryIDsWorkerPool) - } else { - logger.Warn("max index query IDs concurrency was not set, falling back to default value") - } - // Set global index options. if n := cfg.Index.RegexpDFALimitOrDefault(); n > 0 { regexp.SetStateLimit(n) @@ -481,8 +472,14 @@ func Run(runOpts RunOptions) { seriesReadPermits.Start() defer seriesReadPermits.Stop() - opts = opts.SetPermitsOptions(opts.PermitsOptions(). - SetSeriesReadPermitsManager(seriesReadPermits)) + permitOptions := opts.PermitsOptions().SetSeriesReadPermitsManager(seriesReadPermits) + if cfg.Index.MaxQueryIDsConcurrency != 0 { + permitOptions = permitOptions.SetIndexQueryPermitsManager( + permits.NewFixedPermitsManager(cfg.Index.MaxQueryIDsConcurrency)) + } else { + logger.Warn("max index query IDs concurrency was not set, falling back to default value") + } + opts = opts.SetPermitsOptions(permitOptions) // Setup postings list cache. var ( @@ -524,6 +521,11 @@ func Run(runOpts RunOptions) { }). SetMmapReporter(mmapReporter). SetQueryLimits(queryLimits) + + if cfg.Index.MaxWorkerTime > 0 { + indexOpts = indexOpts.SetMaxWorkerTime(cfg.Index.MaxWorkerTime) + } + opts = opts.SetIndexOptions(indexOpts) if tick := cfg.Tick; tick != nil { diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 097ab06243..18766491e9 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -25,6 +25,7 @@ import ( gocontext "context" "errors" "fmt" + "io" "math" goruntime "runtime" "sort" @@ -45,6 +46,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/limits" + "github.com/m3db/m3/src/dbnode/storage/limits/permits" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -54,6 +56,7 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/builder" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + "github.com/m3db/m3/src/m3ninx/x" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" @@ -61,7 +64,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xopentracing "github.com/m3db/m3/src/x/opentracing" xresource "github.com/m3db/m3/src/x/resource" - xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" "github.com/m3db/bitset" @@ -89,9 +91,7 @@ const ( defaultFlushDocsBatchSize = 8192 ) -var ( - allQuery = idx.NewAllQuery() -) +var allQuery = idx.NewAllQuery() // nolint: maligned type nsIndex struct { @@ -122,9 +122,8 @@ type nsIndex struct { resultsPool index.QueryResultsPool aggregateResultsPool index.AggregateResultsPool - // NB(r): Use a pooled goroutine worker once pooled goroutine workers - // support timeouts for query workers pool. - queryWorkersPool xsync.WorkerPool + permitsManager permits.Manager + maxWorkerTime time.Duration // queriesWg tracks outstanding queries to ensure // we wait for all queries to complete before actually closing @@ -210,18 +209,37 @@ type newNamespaceIndexOpts struct { type execBlockQueryFn func( ctx context.Context, block index.Block, - query index.Query, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, logFields []opentracinglog.Field, ) -// asyncQueryExecState tracks the async execution errors and results for a query. +// newBlockIterFn returns a new ResultIterator for the query. +type newBlockIterFn func( + ctx context.Context, + block index.Block, + query index.Query, + results index.BaseResults, +) (index.ResultIterator, error) + +// asyncQueryExecState tracks the async execution errors for a query. type asyncQueryExecState struct { - sync.Mutex - multiErr xerrors.MultiError - exhaustive bool + sync.RWMutex + multiErr xerrors.MultiError +} + +func (s *asyncQueryExecState) hasErr() bool { + s.RLock() + defer s.RUnlock() + return s.multiErr.NumErrors() > 0 +} + +func (s *asyncQueryExecState) addErr(err error) { + s.Lock() + s.multiErr = s.multiErr.Add(err) + s.Unlock() } // newNamespaceIndex returns a new namespaceIndex for the provided namespace. @@ -346,11 +364,12 @@ func newNamespaceIndexWithOptions( resultsPool: indexOpts.QueryResultsPool(), aggregateResultsPool: indexOpts.AggregateResultsPool(), - queryWorkersPool: newIndexOpts.opts.QueryIDsWorkerPool(), - metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts), + permitsManager: newIndexOpts.opts.PermitsOptions().IndexQueryPermitsManager(), + metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts), doNotIndexWithFields: doNotIndexWithFields, shardSet: shardSet, + maxWorkerTime: indexOpts.MaxWorkerTime(), } // Assign shard set upfront. @@ -1345,7 +1364,8 @@ func (i *nsIndex) Query( FilterID: i.shardsFilterID(), }) ctx.RegisterFinalizer(results) - exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn, logFields, i.metrics.queryMetrics) + exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn, i.newBlockQueryIterFn, + logFields, i.metrics.queryMetrics) if err != nil { sp.LogFields(opentracinglog.Error(err)) return index.QueryResult{}, err @@ -1387,7 +1407,8 @@ func (i *nsIndex) WideQuery( defer results.Finalize() queryOpts := opts.ToQueryOptions() - _, err := i.query(ctx, query, results, queryOpts, i.execBlockWideQueryFn, logFields, i.metrics.wideQueryMetrics) + _, err := i.query(ctx, query, results, queryOpts, i.execBlockWideQueryFn, i.newBlockQueryIterFn, logFields, + i.metrics.wideQueryMetrics) if err != nil { sp.LogFields(opentracinglog.Error(err)) return err @@ -1443,7 +1464,9 @@ func (i *nsIndex) AggregateQuery( } aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe() results.Reset(id, aopts) - exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields, i.metrics.aggQueryMetrics) + exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, + i.newBlockAggregatorIterFn, + logFields, i.metrics.aggQueryMetrics) if err != nil { return index.AggregateQueryResult{}, err } @@ -1459,6 +1482,7 @@ func (i *nsIndex) query( results index.BaseResults, opts index.QueryOptions, execBlockFn execBlockQueryFn, + newBlockIterFn newBlockIterFn, logFields []opentracinglog.Field, queryMetrics queryMetrics, //nolint: gocritic ) (bool, error) { @@ -1466,7 +1490,8 @@ func (i *nsIndex) query( sp.LogFields(logFields...) defer sp.Finish() - exhaustive, err := i.queryWithSpan(ctx, query, results, opts, execBlockFn, sp, logFields, queryMetrics) + exhaustive, err := i.queryWithSpan(ctx, query, results, opts, execBlockFn, newBlockIterFn, sp, logFields, + queryMetrics) if err != nil { sp.LogFields(opentracinglog.Error(err)) @@ -1511,12 +1536,23 @@ func (i *nsIndex) query( return exhaustive, nil } +// blockIter is a composite type to hold various state about a block while iterating over the results. +type blockIter struct { + iter index.ResultIterator + iterCloser io.Closer + block index.Block + waitTime time.Duration + searchTime time.Duration + processingTime time.Duration +} + func (i *nsIndex) queryWithSpan( ctx context.Context, query index.Query, results index.BaseResults, opts index.QueryOptions, execBlockFn execBlockQueryFn, + newBlockIterFn newBlockIterFn, span opentracing.Span, logFields []opentracinglog.Field, queryMetrics queryMetrics, //nolint: gocritic @@ -1555,52 +1591,110 @@ func (i *nsIndex) queryWithSpan( var ( // State contains concurrent mutable state for async execution below. - state = asyncQueryExecState{ - exhaustive: true, - } - wg sync.WaitGroup - totalWaitTime time.Duration + state = &asyncQueryExecState{} + wg sync.WaitGroup ) + permits, err := i.permitsManager.NewPermits(ctx) + if err != nil { + return false, err + } + blockIters := make([]*blockIter, 0, len(blocks)) for _, block := range blocks { - // Capture block for async query execution below. - block := block - - // We're looping through all the blocks that we need to query and kicking - // off parallel queries which are bounded by the queryWorkersPool's maximum - // concurrency. This means that it's possible at this point that we've - // completed querying one or more blocks and already exhausted the maximum - // number of results that we're allowed to return. If thats the case, there - // is no value in kicking off more parallel queries, so we break out of - // the loop. - seriesCount := results.Size() - docsCount := results.TotalDocsCount() - alreadyExceededLimit := opts.SeriesLimitExceeded(seriesCount) || opts.DocsLimitExceeded(docsCount) - if alreadyExceededLimit { - state.Lock() - state.exhaustive = false - state.Unlock() - // Break out if already not exhaustive. - break + iter, err := newBlockIterFn(ctx, block, query, results) + if err != nil { + return false, err } - - // Calculate time spent waiting for a worker - wg.Add(1) - scheduleResult := i.queryWorkersPool.GoWithContext(ctx, func() { - startProcessing := time.Now() - execBlockFn(ctx, block, query, opts, &state, results, logFields) - i.metrics.queryMetrics.blockProcessingTime.RecordDuration(time.Since(startProcessing)) - wg.Done() + blockIters = append(blockIters, &blockIter{ + iter: iter, + iterCloser: x.NewSafeCloser(iter), + block: block, }) - totalWaitTime += scheduleResult.WaitTime - if !scheduleResult.Available { - state.Lock() - state.multiErr = state.multiErr.Add(gocontext.Canceled) - state.Unlock() - // Did not launch task, need to ensure don't wait for it - wg.Done() + } + + defer func() { + for _, iter := range blockIters { + // safe to call Close multiple times, so it's fine to eagerly close in the loop below and here. + _ = iter.iterCloser.Close() + } + }() + + // queryCanceled returns true if the query has been canceled and the current iteration should terminate. + queryCanceled := func() bool { + return opts.LimitsExceeded(results.Size(), results.TotalDocsCount()) || state.hasErr() + } + // waitForPermit waits for a permit. returns true if the permit was acquired and the wait time. + waitForPermit := func() (bool, time.Duration) { + // make sure the query hasn't been canceled before waiting for a permit. + if queryCanceled() { + return false, 0 + } + startWait := time.Now() + err := permits.Acquire(ctx) + waitTime := time.Since(startWait) + if err != nil { + state.addErr(err) + return false, waitTime + } + // make sure the query hasn't been canceled while waiting for a permit. + if queryCanceled() { + permits.Release(0) + return false, waitTime + } + return true, waitTime + } + + // We're looping through all the blocks that we need to query and kicking + // off parallel queries which are bounded by the permits maximum + // concurrency. It's possible at this point that we've completed querying one or more blocks and already exhausted + // the maximum number of results that we're allowed to return. If thats the case, there is no value in kicking off + // more parallel queries, so we break out of the loop. + for _, blockIter := range blockIters { + // Capture for async query execution below. + blockIter := blockIter + + // acquire a permit before kicking off the goroutine to process the iterator. this limits the number of + // concurrent goroutines to # of permits + large queries that needed multiple iterations to finish. + acq, waitTime := waitForPermit() + blockIter.waitTime += waitTime + if !acq { break } + + wg.Add(1) + // kick off a go routine to process the entire iterator. + go func() { + defer wg.Done() + first := true + for !blockIter.iter.Done() { + // if this is not the first iteration of the iterator, need to acquire another permit. + if !first { + acq, waitTime := waitForPermit() + blockIter.waitTime += waitTime + if !acq { + break + } + } + first = false + startProcessing := time.Now() + execBlockFn(ctx, blockIter.block, blockIter.iter, opts, state, results, logFields) + processingTime := time.Since(startProcessing) + queryMetrics.blockProcessingTime.RecordDuration(processingTime) + blockIter.processingTime += processingTime + permits.Release(int64(processingTime)) + } + if first { + // this should never happen since a new iter cannot be Done, but just to be safe. + permits.Release(0) + } + blockIter.searchTime += blockIter.iter.SearchDuration() + + // close the iterator since it's no longer needed. it's safe to call Close multiple times, here and in the + // defer when the function returns. + if err := blockIter.iterCloser.Close(); err != nil { + state.addErr(err) + } + }() } // wait for all workers to finish. if the caller cancels the call, the workers will be interrupted and eventually @@ -1609,11 +1703,9 @@ func (i *nsIndex) queryWithSpan( i.metrics.loadedDocsPerQuery.RecordValue(float64(results.TotalDocsCount())) - state.Lock() - // Take reference to vars to return while locked. - exhaustive := state.exhaustive + exhaustive := opts.Exhaustive(results.Size(), results.TotalDocsCount()) + // ok to read state without lock since all parallel queries are done. multiErr := state.multiErr - state.Unlock() err = multiErr.FinalError() if err != nil && !multiErr.Contains(gocontext.DeadlineExceeded) && !multiErr.Contains(gocontext.Canceled) { @@ -1624,18 +1716,40 @@ func (i *nsIndex) queryWithSpan( // update timing metrics even if the query was canceled due to a timeout queryRuntime := time.Since(start) + var ( + totalWaitTime time.Duration + totalProcessingTime time.Duration + totalSearchTime time.Duration + ) + + for _, blockIter := range blockIters { + totalWaitTime += blockIter.waitTime + totalProcessingTime += blockIter.processingTime + totalSearchTime += blockIter.searchTime + } + queryMetrics.queryTotalTime.ByDocs.Record(results.TotalDocsCount(), queryRuntime) queryMetrics.queryWaitTime.ByDocs.Record(results.TotalDocsCount(), totalWaitTime) - queryMetrics.queryProcessingTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Processing) - queryMetrics.querySearchTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Search) + queryMetrics.queryProcessingTime.ByDocs.Record(results.TotalDocsCount(), totalProcessingTime) + queryMetrics.querySearchTime.ByDocs.Record(results.TotalDocsCount(), totalSearchTime) return exhaustive, err } -func (i *nsIndex) execBlockQueryFn( +func (i *nsIndex) newBlockQueryIterFn( ctx context.Context, block index.Block, query index.Query, + _ index.BaseResults, +) (index.ResultIterator, error) { + return block.QueryIter(ctx, query) +} + +//nolint: dupl +func (i *nsIndex) execBlockQueryFn( + ctx context.Context, + block index.Block, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, @@ -1652,14 +1766,16 @@ func (i *nsIndex) execBlockQueryFn( docResults, ok := results.(index.DocumentResults) if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during query", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() + state.addErr(fmt.Errorf("unknown results type [%T] received during query", results)) + return + } + queryIter, ok := iter.(index.QueryIterator) + if !ok { // should never happen + state.addErr(fmt.Errorf("unknown results type [%T] received during query", iter)) return } - blockExhaustive, err := block.Query(ctx, query, opts, docResults, logFields) + err := block.QueryWithIter(ctx, opts, queryIter, docResults, time.Now().Add(i.maxWorkerTime), logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1668,20 +1784,16 @@ func (i *nsIndex) execBlockQueryFn( err = nil } - state.Lock() - defer state.Unlock() - if err != nil { sp.LogFields(opentracinglog.Error(err)) - state.multiErr = state.multiErr.Add(err) + state.addErr(err) } - state.exhaustive = state.exhaustive && blockExhaustive } func (i *nsIndex) execBlockWideQueryFn( ctx context.Context, block index.Block, - query index.Query, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, @@ -1698,14 +1810,16 @@ func (i *nsIndex) execBlockWideQueryFn( docResults, ok := results.(index.DocumentResults) if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during wide query", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() + state.addErr(fmt.Errorf("unknown results type [%T] received during wide query", results)) + return + } + queryIter, ok := iter.(index.QueryIterator) + if !ok { // should never happen + state.addErr(fmt.Errorf("unknown results type [%T] received during query", iter)) return } - _, err := block.Query(ctx, query, opts, docResults, logFields) + err := block.QueryWithIter(ctx, opts, queryIter, docResults, time.Now().Add(i.maxWorkerTime), logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1718,22 +1832,29 @@ func (i *nsIndex) execBlockWideQueryFn( err = nil } - state.Lock() - defer state.Unlock() - if err != nil { sp.LogFields(opentracinglog.Error(err)) - state.multiErr = state.multiErr.Add(err) + state.addErr(err) } +} - // NB: wide queries are always exhaustive. - state.exhaustive = true +func (i *nsIndex) newBlockAggregatorIterFn( + ctx context.Context, + block index.Block, + _ index.Query, + results index.BaseResults, +) (index.ResultIterator, error) { + aggResults, ok := results.(index.AggregateResults) + if !ok { // should never happen + return nil, fmt.Errorf("unknown results type [%T] received during aggregation", results) + } + return block.AggregateIter(ctx, aggResults.AggregateResultsOptions()) } func (i *nsIndex) execBlockAggregateQueryFn( ctx context.Context, block index.Block, - _ index.Query, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, @@ -1750,14 +1871,16 @@ func (i *nsIndex) execBlockAggregateQueryFn( aggResults, ok := results.(index.AggregateResults) if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during aggregation", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() + state.addErr(fmt.Errorf("unknown results type [%T] received during aggregation", results)) + return + } + aggIter, ok := iter.(index.AggregateIterator) + if !ok { // should never happen + state.addErr(fmt.Errorf("unknown results type [%T] received during query", iter)) return } - blockExhaustive, err := block.Aggregate(ctx, opts, aggResults, logFields) + err := block.AggregateWithIter(ctx, aggIter, opts, aggResults, time.Now().Add(i.maxWorkerTime), logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1766,13 +1889,10 @@ func (i *nsIndex) execBlockAggregateQueryFn( err = nil } - state.Lock() - defer state.Unlock() if err != nil { sp.LogFields(opentracinglog.Error(err)) - state.multiErr = state.multiErr.Add(err) + state.addErr(err) } - state.exhaustive = state.exhaustive && blockExhaustive } func (i *nsIndex) overriddenOptsForQueryWithRLock( diff --git a/src/dbnode/storage/index/aggregate_iter.go b/src/dbnode/storage/index/aggregate_iter.go index f4cbbe1d95..1d136bc6d5 100644 --- a/src/dbnode/storage/index/aggregate_iter.go +++ b/src/dbnode/storage/index/aggregate_iter.go @@ -41,11 +41,12 @@ type aggregateIter struct { searchDuration time.Duration // mutable state - idx int - err error - done bool - currField, currTerm []byte - nextField, nextTerm []byte + idx int + err error + done bool + currField, currTerm []byte + nextField, nextTerm []byte + docsCount, seriesCount int } func (it *aggregateIter) Next(ctx context.Context) bool { @@ -144,3 +145,15 @@ func (it *aggregateIter) Close() error { func (it *aggregateIter) SearchDuration() time.Duration { return it.searchDuration } + +func (it *aggregateIter) AddSeries(count int) { + it.seriesCount += count +} + +func (it *aggregateIter) AddDocs(count int) { + it.docsCount += count +} + +func (it *aggregateIter) Counts() (series, docs int) { + return it.seriesCount, it.docsCount +} diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index ac3cd7c194..05d6c5ac43 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -23,7 +23,6 @@ package index import ( "math" "sync" - "time" "github.com/uber-go/tally" @@ -49,7 +48,6 @@ type aggregatedResults struct { pool AggregateResultsPool valuesPool AggregateValuesPool encodedDocReader docs.EncodedDocumentReader - resultDuration ResultDurations iOpts instrument.Options } @@ -145,24 +143,6 @@ func NewAggregateResults( } } -func (r *aggregatedResults) TotalDuration() ResultDurations { - r.RLock() - defer r.RUnlock() - return r.resultDuration -} - -func (r *aggregatedResults) AddBlockProcessingDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddProcessing(duration) -} - -func (r *aggregatedResults) AddBlockSearchDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddSearch(duration) -} - func (r *aggregatedResults) EnforceLimits() bool { return true } func (r *aggregatedResults) Reset( @@ -197,8 +177,6 @@ func (r *aggregatedResults) Reset( r.totalDocsCount = 0 r.size = 0 - r.resultDuration = ResultDurations{} - // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. r.Unlock() diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 10696dd333..0fd12bc1e1 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -25,7 +25,6 @@ import ( "errors" "fmt" "io" - "math" "sync" "time" @@ -40,7 +39,6 @@ import ( "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/m3ninx/search/executor" - "github.com/m3db/m3/src/m3ninx/x" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" @@ -48,7 +46,6 @@ import ( xresource "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" - "github.com/opentracing/opentracing-go" opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" "go.uber.org/zap" @@ -410,64 +407,11 @@ func (b *block) segmentReadersWithRLock() ([]segment.Reader, error) { return readers, nil } -// Query acquires a read lock on the block so that the segments -// are guaranteed to not be freed/released while accumulating results. -// This allows references to the mmap'd segment data to be accumulated -// and then copied into the results before this method returns (it is not -// safe to return docs directly from the segments from this method, the -// results datastructure is used to copy it every time documents are added -// to the results datastructure). -func (b *block) Query( - ctx context.Context, - query Query, - opts QueryOptions, - results DocumentResults, - logFields []opentracinglog.Field, -) (bool, error) { - ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) - sp.LogFields(logFields...) - defer sp.Finish() - - start := time.Now() - exhaustive, err := b.queryWithSpan(ctx, query, opts, results) - if err != nil { - sp.LogFields(opentracinglog.Error(err)) - } - results.AddBlockProcessingDuration(time.Since(start)) - return exhaustive, err -} - -func (b *block) queryWithSpan( - ctx context.Context, - query Query, - opts QueryOptions, - results DocumentResults, -) (bool, error) { - iter, err := b.QueryIter(ctx, query) - if err != nil { - return false, err - } - - iterCloser := x.NewSafeCloser(iter) - defer func() { - _ = iterCloser.Close() - b.metrics.queryDocsMatched.RecordValue(float64(results.TotalDocsCount())) - b.metrics.querySeriesMatched.RecordValue(float64(results.Size())) - }() - - if err := b.QueryWithIter(ctx, opts, iter, results, math.MaxInt64); err != nil { - return false, err - } - - if err := iterCloser.Close(); err != nil { - return false, err - } - results.AddBlockSearchDuration(iter.SearchDuration()) - - return opts.exhaustive(results.Size(), results.TotalDocsCount()), nil -} - -func (b *block) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterator, error) { +// QueryIter acquires a read lock on the block to get the set of segments for the returned iterator. However, the +// segments are searched and results are processed lazily in the returned iterator. The segments are finalized when +// the ctx is finalized to ensure the mmaps are not freed until the ctx closes. This allows the returned results to +// reference data in the mmap without copying. +func (b *block) QueryIter(ctx context.Context, query Query) (QueryIterator, error) { b.RLock() defer b.RUnlock() @@ -480,7 +424,7 @@ func (b *block) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterato } // FOLLOWUP(prateek): push down QueryOptions to restrict results - iter, err := exec.Execute(ctx, query.Query.SearchQuery()) + docIter, err := exec.Execute(ctx, query.Query.SearchQuery()) if err != nil { b.closeAsync(exec) return nil, err @@ -493,15 +437,40 @@ func (b *block) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterato b.closeAsync(exec) })) - return iter, nil + return NewQueryIter(docIter), nil } +// nolint: dupl func (b *block) QueryWithIter( ctx context.Context, opts QueryOptions, - docIter doc.Iterator, + iter QueryIterator, + results DocumentResults, + deadline time.Time, + logFields []opentracinglog.Field, +) error { + ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) + sp.LogFields(logFields...) + defer sp.Finish() + + err := b.queryWithSpan(ctx, opts, iter, results, deadline) + if err != nil { + sp.LogFields(opentracinglog.Error(err)) + } + if iter.Done() { + docs, series := iter.Counts() + b.metrics.queryDocsMatched.RecordValue(float64(docs)) + b.metrics.querySeriesMatched.RecordValue(float64(series)) + } + return err +} + +func (b *block) queryWithSpan( + ctx context.Context, + opts QueryOptions, + iter QueryIterator, results DocumentResults, - limit int, + deadline time.Time, ) error { var ( err error @@ -511,7 +480,6 @@ func (b *block) QueryWithIter( docsPool = b.opts.DocumentArrayPool() batch = docsPool.Get() batchSize = cap(batch) - count int ) if batchSize == 0 { batchSize = defaultQueryDocsBatchSize @@ -520,8 +488,7 @@ func (b *block) QueryWithIter( // Register local data structures that need closing. defer docsPool.Put(batch) - for count < limit && docIter.Next() { - count++ + for time.Now().Before(deadline) && iter.Next(ctx) { if opts.LimitsExceeded(size, docsCount) { break } @@ -539,7 +506,7 @@ func (b *block) QueryWithIter( } } - batch = append(batch, docIter.Current()) + batch = append(batch, iter.Current()) if len(batch) < batchSize { continue } @@ -549,7 +516,7 @@ func (b *block) QueryWithIter( return err } } - if err := docIter.Err(); err != nil { + if err := iter.Err(); err != nil { return err } @@ -561,6 +528,9 @@ func (b *block) QueryWithIter( } } + iter.AddSeries(size) + iter.AddDocs(docsCount) + return nil } @@ -604,57 +574,10 @@ func (b *block) addQueryResults( return batch, size, docsCount, err } -// Aggregate acquires a read lock on the block so that the segments -// are guaranteed to not be freed/released while accumulating results. -// NB: Aggregate is an optimization of the general aggregate Query approach -// for the case when we can skip going to raw documents, and instead rely on -// pre-aggregated results via the FST underlying the index. -func (b *block) Aggregate( - ctx context.Context, - opts QueryOptions, - results AggregateResults, - logFields []opentracinglog.Field, -) (bool, error) { - ctx, sp := ctx.StartTraceSpan(tracepoint.BlockAggregate) - sp.LogFields(logFields...) - defer sp.Finish() - - start := time.Now() - exhaustive, err := b.aggregateWithSpan(ctx, opts, results, sp) - if err != nil { - sp.LogFields(opentracinglog.Error(err)) - } - results.AddBlockProcessingDuration(time.Since(start)) - - return exhaustive, err -} - -func (b *block) aggregateWithSpan( - ctx context.Context, - opts QueryOptions, - results AggregateResults, - sp opentracing.Span, -) (bool, error) { - iter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) - if err != nil { - return false, err - } - - defer func() { - _ = iter.Close() - b.metrics.aggregateDocsMatched.RecordValue(float64(results.TotalDocsCount())) - b.metrics.aggregateSeriesMatched.RecordValue(float64(results.Size())) - }() - - if err := b.AggregateWithIter(ctx, iter, opts, results, math.MaxInt64); err != nil { - return false, err - } - - results.AddBlockSearchDuration(iter.SearchDuration()) - - return opts.exhaustive(results.Size(), results.TotalDocsCount()), nil -} - +// AggIter acquires a read lock on the block to get the set of segments for the returned iterator. However, the +// segments are searched and results are processed lazily in the returned iterator. The segments are finalized when +// the ctx is finalized to ensure the mmaps are not freed until the ctx closes. This allows the returned results to +// reference data in the mmap without copying. func (b *block) AggregateIter(ctx context.Context, aggOpts AggregateResultsOptions) (AggregateIterator, error) { b.RLock() defer b.RUnlock() @@ -711,14 +634,40 @@ func (b *block) AggregateIter(ctx context.Context, aggOpts AggregateResultsOptio }, nil } +// nolint: dupl func (b *block) AggregateWithIter( ctx context.Context, iter AggregateIterator, opts QueryOptions, results AggregateResults, - limit int) error { + deadline time.Time, + logFields []opentracinglog.Field, +) error { + ctx, sp := ctx.StartTraceSpan(tracepoint.BlockAggregate) + sp.LogFields(logFields...) + defer sp.Finish() + + err := b.aggregateWithSpan(ctx, iter, opts, results, deadline) + if err != nil { + sp.LogFields(opentracinglog.Error(err)) + } + if iter.Done() { + docs, series := iter.Counts() + b.metrics.aggregateDocsMatched.RecordValue(float64(docs)) + b.metrics.aggregateSeriesMatched.RecordValue(float64(series)) + } + + return err +} + +func (b *block) aggregateWithSpan( + ctx context.Context, + iter AggregateIterator, + opts QueryOptions, + results AggregateResults, + deadline time.Time, +) error { var ( - count int err error source = opts.Source size = results.Size() @@ -747,8 +696,7 @@ func (b *block) AggregateWithIter( maxBatch = opts.DocsLimit } - for count < limit && iter.Next(ctx) { - count++ + for time.Now().Before(deadline) && iter.Next(ctx) { if opts.LimitsExceeded(size, docsCount) { break } @@ -832,6 +780,9 @@ func (b *block) AggregateWithIter( } } + iter.AddSeries(size) + iter.AddDocs(docsCount) + return nil } diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 0c912eb930..4e7c5d3c23 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -120,23 +120,29 @@ func TestPostingsListCacheDoesNotAffectBlockQueryResults(t *testing.T) { } uncachedResults := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := uncachedBlock.Query(context.NewBackground(), indexQuery, - queryOpts, uncachedResults, emptyLogFields) + ctx := context.NewBackground() + queryIter, err := uncachedBlock.QueryIter(ctx, indexQuery) if err != nil { - return false, fmt.Errorf("error querying uncached block: %v", err) + return false, err } - if !exhaustive { - return false, errors.New("querying uncached block was not exhaustive") + require.NoError(t, err) + for !queryIter.Done() { + err = uncachedBlock.QueryWithIter(ctx, + queryOpts, queryIter, uncachedResults, time.Now().Add(time.Millisecond * 10), emptyLogFields) + if err != nil { + return false, fmt.Errorf("error querying uncached block: %v", err) + } } cachedResults := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err = cachedBlock.Query(context.NewBackground(), indexQuery, - queryOpts, cachedResults, emptyLogFields) - if err != nil { - return false, fmt.Errorf("error querying cached block: %v", err) - } - if !exhaustive { - return false, errors.New("querying cached block was not exhaustive") + ctx = context.NewBackground() + queryIter, err = cachedBlock.QueryIter(ctx, indexQuery) + for !queryIter.Done() { + err = cachedBlock.QueryWithIter(ctx, queryOpts, queryIter, cachedResults, + time.Now().Add(time.Millisecond * 10), emptyLogFields) + if err != nil { + return false, fmt.Errorf("error querying cached block: %v", err) + } } uncachedMap := uncachedResults.Map() @@ -362,17 +368,23 @@ func TestAggregateDocLimits(t *testing.T) { ctx := context.NewBackground() defer ctx.BlockingClose() - exhaustive, err := b.Aggregate( - ctx, - QueryOptions{}, - results, - emptyLogFields) - + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) if err != nil { return false, err } + for !aggIter.Done() { + err = b.AggregateWithIter( + ctx, + aggIter, + QueryOptions{}, + results, + time.Now().Add(time.Millisecond * 10), + emptyLogFields) - require.True(t, exhaustive, errors.New("not exhaustive")) + if err != nil { + return false, err + } + } verifyResults(t, results, testSegment.segmentMap) snap := scope.Snapshot() tallytest.AssertCounterValue(t, testSegment.exCount, snap, diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 8d55cb1429..eb6489956c 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -373,9 +373,7 @@ func TestBlockQueryAfterClose(t *testing.T) { require.Equal(t, start.Add(time.Hour), b.EndTime()) require.NoError(t, b.Close()) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Error(t, err) } @@ -401,7 +399,7 @@ func TestBlockQueryWithCancelledQuery(t *testing.T) { gomock.InOrder( exec.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Close().Return(nil), + dIter.EXPECT().Done().Return(false), exec.EXPECT().Close().Return(nil), ) @@ -416,7 +414,9 @@ func TestBlockQueryWithCancelledQuery(t *testing.T) { results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - _, err = b.Query(ctx, defaultQuery, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) require.Equal(t, stdlibctx.Canceled, err) } @@ -435,9 +435,7 @@ func TestBlockQueryExecutorError(t *testing.T) { return nil, fmt.Errorf("random-err") } - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Error(t, err) } @@ -459,9 +457,7 @@ func TestBlockQuerySegmentReaderError(t *testing.T) { randErr := fmt.Errorf("random-err") seg.EXPECT().Reader().Return(nil, randErr) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Equal(t, randErr, err) } @@ -500,9 +496,7 @@ func TestBlockQueryAddResultsSegmentsError(t *testing.T) { randErr := fmt.Errorf("random-err") seg3.EXPECT().Reader().Return(nil, randErr) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Equal(t, randErr, err) } @@ -529,8 +523,7 @@ func TestBlockMockQueryExecutorExecError(t *testing.T) { exec.EXPECT().Close(), ) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Error(t, err) } @@ -559,15 +552,17 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(fmt.Errorf("randomerr")), - dIter.EXPECT().Close(), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close(), ) ctx := context.NewBackground() - _, err = b.Query(ctx, - defaultQuery, QueryOptions{}, - NewQueryResults(nil, QueryResultsOptions{}, testOpts), emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, + NewQueryResults(nil, QueryResultsOptions{}, testOpts), time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) // NB(r): Make sure to call finalizers blockingly (to finish @@ -600,8 +595,7 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) limit := 1 @@ -610,9 +604,11 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) d, ok := results.Map().Get(testDoc1().ID) @@ -628,44 +624,6 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { ctx.BlockingClose() } -func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - testMD := newTestNSMetadata(t) - start := time.Now().Truncate(time.Hour) - blk, err := NewBlock(start, testMD, BlockOptions{}, - namespace.NewRuntimeOptionsManager("foo"), testOpts) - require.NoError(t, err) - - b, ok := blk.(*block) - require.True(t, ok) - - exec := search.NewMockExecutor(ctrl) - b.newExecutorWithRLockFn = func() (search.Executor, error) { - return exec, nil - } - - dIter := doc.NewMockQueryDocIterator(ctrl) - gomock.InOrder( - exec.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(dIter, nil), - dIter.EXPECT().Next().Return(false), - dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(fmt.Errorf("random-err")), - exec.EXPECT().Close().Return(nil), - ) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - ctx := context.NewBackground() - - _, err = b.Query(ctx, defaultQuery, QueryOptions{}, results, emptyLogFields) - require.Error(t, err) - - // NB(r): Make sure to call finalizers blockingly (to finish - // the expected close calls) - ctx.BlockingClose() -} - func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -691,8 +649,7 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) limit := 1 @@ -700,9 +657,11 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) @@ -744,8 +703,7 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) limit := 2 @@ -754,9 +712,11 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) rMap := results.Map() require.Equal(t, 1, rMap.Len()) @@ -797,8 +757,7 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) docsLimit := 1 @@ -806,9 +765,11 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{DocsLimit: docsLimit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{DocsLimit: docsLimit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) d, ok := results.Map().Get(testDoc1().ID) @@ -848,8 +809,7 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(false), exec.EXPECT().Close().Return(nil), ) docsLimit := 2 @@ -858,9 +818,11 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{DocsLimit: docsLimit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{DocsLimit: docsLimit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) rMap := results.Map() require.Equal(t, 1, rMap.Len()) @@ -908,16 +870,17 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) { exec.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) rMap := results.Map() require.Equal(t, 1, rMap.Len()) @@ -966,16 +929,17 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc2())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) rMap := results.Map() require.Equal(t, 2, rMap.Len()) @@ -1443,9 +1407,10 @@ func TestBlockE2EInsertQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, Query{q}, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) require.Equal(t, 2, results.Size()) rMap := results.Map() @@ -1522,10 +1487,12 @@ func TestBlockE2EInsertQueryLimit(t *testing.T) { limit := 1 results := NewQueryResults(nil, QueryResultsOptions{SizeLimit: limit}, testOpts) - exhaustive, err := b.Query(context.NewBackground(), Query{q}, QueryOptions{SeriesLimit: limit}, results, + ctx := context.NewBackground() + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Size()) rMap := results.Map() @@ -1613,9 +1580,10 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, Query{q}, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) require.Equal(t, 2, results.Size()) rMap := results.Map() @@ -1697,9 +1665,10 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, Query{q}, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) require.Equal(t, 2, results.Size()) rMap := results.Map() @@ -1845,7 +1814,7 @@ func TestBlockAggregateAfterClose(t *testing.T) { require.Equal(t, start.Add(time.Hour), b.EndTime()) require.NoError(t, b.Close()) - _, err = b.Aggregate(context.NewBackground(), QueryOptions{}, &aggregatedResults{}, emptyLogFields) + _, err = b.AggregateIter(context.NewBackground(), AggregateResultsOptions{}) require.Error(t, err) } @@ -1885,16 +1854,19 @@ func TestBlockAggregateIterationErr(t *testing.T) { iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")), iter.EXPECT().Next().Return(false), iter.EXPECT().Err().Return(fmt.Errorf("unknown error")), - iter.EXPECT().Close().Return(nil), ) ctx := context.NewBackground() defer ctx.BlockingClose() - _, err = b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: 3}, results, + time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) } @@ -1963,13 +1935,16 @@ func TestBlockAggregate(t *testing.T) { iter.EXPECT().Err().Return(nil) iter.EXPECT().Close().Return(nil) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: seriesLimit}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "f1": {"t1", "t2", "t3"}, @@ -2048,17 +2023,19 @@ func TestBlockAggregateWithAggregateLimits(t *testing.T) { curr := []byte(fmt.Sprint(i)) iter.EXPECT().Current().Return([]byte("f1"), curr) } - iter.EXPECT().Close().Return(nil) iter.EXPECT().SearchDuration().Return(time.Second) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: seriesLimit}, results, + time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) assert.True(t, strings.Contains(err.Error(), "query aborted due to limit")) - require.False(t, exhaustive) sp.Finish() spans := mtr.FinishedSpans() @@ -2130,15 +2107,17 @@ func TestBlockAggregateNotExhaustive(t *testing.T) { iter.EXPECT().Current().Return([]byte("f2"), []byte("t2")), iter.EXPECT().Next().Return(true), iter.EXPECT().Current().Return([]byte("f3"), []byte("f3")), - iter.EXPECT().Close().Return(nil), ) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: 1}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "f1": {}, @@ -2224,13 +2203,16 @@ func TestBlockE2EInsertAggregate(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: 1000}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "bar": {"baz", "qux"}, "some": {"more", "other"}, @@ -2241,13 +2223,16 @@ func TestBlockE2EInsertAggregate(t *testing.T) { Type: AggregateTagNamesAndValues, FieldFilter: AggregateFieldFilter{[]byte("bar")}, }, testOpts) - exhaustive, err = b.Aggregate( + aggIter, err = b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, - QueryOptions{SeriesLimit: 10}, + aggIter, + QueryOptions{SeriesLimit: 1000}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "bar": {"baz", "qux"}, }, results) @@ -2257,13 +2242,16 @@ func TestBlockE2EInsertAggregate(t *testing.T) { Type: AggregateTagNamesAndValues, FieldFilter: AggregateFieldFilter{[]byte("random")}, }, testOpts) - exhaustive, err = b.Aggregate( + aggIter, err = b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, - QueryOptions{SeriesLimit: 100}, + aggIter, + QueryOptions{SeriesLimit: 1000}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{}, results) sp.Finish() @@ -2556,13 +2544,16 @@ func TestBlockAggregateBatching(t *testing.T) { ctx := context.NewBackground() defer ctx.BlockingClose() - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) snap := scope.Snapshot() tallytest.AssertCounterValue(t, tt.expectedDocsMatched, snap, diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index c503f85053..fa10affc80 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -112,44 +112,6 @@ func (mr *MockBaseResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockBaseResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockBaseResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockBaseResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockBaseResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockBaseResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockBaseResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockBaseResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockBaseResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockBaseResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockBaseResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockBaseResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -241,44 +203,6 @@ func (mr *MockDocumentResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockDocumentResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockDocumentResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockDocumentResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockDocumentResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockDocumentResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockDocumentResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockDocumentResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockDocumentResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockDocumentResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -386,44 +310,6 @@ func (mr *MockQueryResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockQueryResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockQueryResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockQueryResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockQueryResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockQueryResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockQueryResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockQueryResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockQueryResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockQueryResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockQueryResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockQueryResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -618,44 +504,6 @@ func (mr *MockAggregateResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockAggregateResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockAggregateResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockAggregateResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockAggregateResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockAggregateResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockAggregateResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockAggregateResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockAggregateResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockAggregateResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockAggregateResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockAggregateResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -1081,40 +929,25 @@ func (mr *MockBlockMockRecorder) WriteBatch(inserts interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatch", reflect.TypeOf((*MockBlock)(nil).WriteBatch), inserts) } -// Query mocks base method -func (m *MockBlock) Query(ctx context.Context, query Query, opts QueryOptions, results DocumentResults, logFields []log.Field) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Query", ctx, query, opts, results, logFields) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Query indicates an expected call of Query -func (mr *MockBlockMockRecorder) Query(ctx, query, opts, results, logFields interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", reflect.TypeOf((*MockBlock)(nil).Query), ctx, query, opts, results, logFields) -} - // QueryWithIter mocks base method -func (m *MockBlock) QueryWithIter(ctx context.Context, opts QueryOptions, docIter doc.Iterator, results DocumentResults, limit int) error { +func (m *MockBlock) QueryWithIter(ctx context.Context, opts QueryOptions, iter QueryIterator, results DocumentResults, deadline time.Time, logFields []log.Field) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "QueryWithIter", ctx, opts, docIter, results, limit) + ret := m.ctrl.Call(m, "QueryWithIter", ctx, opts, iter, results, deadline, logFields) ret0, _ := ret[0].(error) return ret0 } // QueryWithIter indicates an expected call of QueryWithIter -func (mr *MockBlockMockRecorder) QueryWithIter(ctx, opts, docIter, results, limit interface{}) *gomock.Call { +func (mr *MockBlockMockRecorder) QueryWithIter(ctx, opts, iter, results, deadline, logFields interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryWithIter", reflect.TypeOf((*MockBlock)(nil).QueryWithIter), ctx, opts, docIter, results, limit) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryWithIter", reflect.TypeOf((*MockBlock)(nil).QueryWithIter), ctx, opts, iter, results, deadline, logFields) } // QueryIter mocks base method -func (m *MockBlock) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterator, error) { +func (m *MockBlock) QueryIter(ctx context.Context, query Query) (QueryIterator, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "QueryIter", ctx, query) - ret0, _ := ret[0].(doc.QueryDocIterator) + ret0, _ := ret[0].(QueryIterator) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -1125,33 +958,18 @@ func (mr *MockBlockMockRecorder) QueryIter(ctx, query interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIter", reflect.TypeOf((*MockBlock)(nil).QueryIter), ctx, query) } -// Aggregate mocks base method -func (m *MockBlock) Aggregate(ctx context.Context, opts QueryOptions, results AggregateResults, logFields []log.Field) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Aggregate", ctx, opts, results, logFields) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Aggregate indicates an expected call of Aggregate -func (mr *MockBlockMockRecorder) Aggregate(ctx, opts, results, logFields interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Aggregate", reflect.TypeOf((*MockBlock)(nil).Aggregate), ctx, opts, results, logFields) -} - // AggregateWithIter mocks base method -func (m *MockBlock) AggregateWithIter(ctx context.Context, iter AggregateIterator, opts QueryOptions, results AggregateResults, limit int) error { +func (m *MockBlock) AggregateWithIter(ctx context.Context, iter AggregateIterator, opts QueryOptions, results AggregateResults, deadline time.Time, logFields []log.Field) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateWithIter", ctx, iter, opts, results, limit) + ret := m.ctrl.Call(m, "AggregateWithIter", ctx, iter, opts, results, deadline, logFields) ret0, _ := ret[0].(error) return ret0 } // AggregateWithIter indicates an expected call of AggregateWithIter -func (mr *MockBlockMockRecorder) AggregateWithIter(ctx, iter, opts, results, limit interface{}) *gomock.Call { +func (mr *MockBlockMockRecorder) AggregateWithIter(ctx, iter, opts, results, deadline, logFields interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateWithIter", reflect.TypeOf((*MockBlock)(nil).AggregateWithIter), ctx, iter, opts, results, limit) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateWithIter", reflect.TypeOf((*MockBlock)(nil).AggregateWithIter), ctx, iter, opts, results, deadline, logFields) } // AggregateIter mocks base method @@ -1384,6 +1202,152 @@ func (mr *MockBlockStatsReporterMockRecorder) ReportIndexingStats(stats interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportIndexingStats", reflect.TypeOf((*MockBlockStatsReporter)(nil).ReportIndexingStats), stats) } +// MockQueryIterator is a mock of QueryIterator interface +type MockQueryIterator struct { + ctrl *gomock.Controller + recorder *MockQueryIteratorMockRecorder +} + +// MockQueryIteratorMockRecorder is the mock recorder for MockQueryIterator +type MockQueryIteratorMockRecorder struct { + mock *MockQueryIterator +} + +// NewMockQueryIterator creates a new mock instance +func NewMockQueryIterator(ctrl *gomock.Controller) *MockQueryIterator { + mock := &MockQueryIterator{ctrl: ctrl} + mock.recorder = &MockQueryIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockQueryIterator) EXPECT() *MockQueryIteratorMockRecorder { + return m.recorder +} + +// Done mocks base method +func (m *MockQueryIterator) Done() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Done") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Done indicates an expected call of Done +func (mr *MockQueryIteratorMockRecorder) Done() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockQueryIterator)(nil).Done)) +} + +// Next mocks base method +func (m *MockQueryIterator) Next(ctx context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next", ctx) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockQueryIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockQueryIterator)(nil).Next), ctx) +} + +// Err mocks base method +func (m *MockQueryIterator) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockQueryIteratorMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockQueryIterator)(nil).Err)) +} + +// SearchDuration mocks base method +func (m *MockQueryIterator) SearchDuration() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchDuration") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// SearchDuration indicates an expected call of SearchDuration +func (mr *MockQueryIteratorMockRecorder) SearchDuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockQueryIterator)(nil).SearchDuration)) +} + +// Close mocks base method +func (m *MockQueryIterator) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockQueryIteratorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockQueryIterator)(nil).Close)) +} + +// AddSeries mocks base method +func (m *MockQueryIterator) AddSeries(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSeries", count) +} + +// AddSeries indicates an expected call of AddSeries +func (mr *MockQueryIteratorMockRecorder) AddSeries(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSeries", reflect.TypeOf((*MockQueryIterator)(nil).AddSeries), count) +} + +// AddDocs mocks base method +func (m *MockQueryIterator) AddDocs(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddDocs", count) +} + +// AddDocs indicates an expected call of AddDocs +func (mr *MockQueryIteratorMockRecorder) AddDocs(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocs", reflect.TypeOf((*MockQueryIterator)(nil).AddDocs), count) +} + +// Counts mocks base method +func (m *MockQueryIterator) Counts() (int, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Counts") + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Counts indicates an expected call of Counts +func (mr *MockQueryIteratorMockRecorder) Counts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Counts", reflect.TypeOf((*MockQueryIterator)(nil).Counts)) +} + +// Current mocks base method +func (m *MockQueryIterator) Current() doc.Document { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Current") + ret0, _ := ret[0].(doc.Document) + return ret0 +} + +// Current indicates an expected call of Current +func (mr *MockQueryIteratorMockRecorder) Current() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockQueryIterator)(nil).Current)) +} + // MockAggregateIterator is a mock of AggregateIterator interface type MockAggregateIterator struct { ctrl *gomock.Controller @@ -1407,32 +1371,32 @@ func (m *MockAggregateIterator) EXPECT() *MockAggregateIteratorMockRecorder { return m.recorder } -// Next mocks base method -func (m *MockAggregateIterator) Next(ctx context.Context) bool { +// Done mocks base method +func (m *MockAggregateIterator) Done() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Next", ctx) + ret := m.ctrl.Call(m, "Done") ret0, _ := ret[0].(bool) return ret0 } -// Next indicates an expected call of Next -func (mr *MockAggregateIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { +// Done indicates an expected call of Done +func (mr *MockAggregateIteratorMockRecorder) Done() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockAggregateIterator)(nil).Next), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockAggregateIterator)(nil).Done)) } -// Done mocks base method -func (m *MockAggregateIterator) Done() bool { +// Next mocks base method +func (m *MockAggregateIterator) Next(ctx context.Context) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Done") + ret := m.ctrl.Call(m, "Next", ctx) ret0, _ := ret[0].(bool) return ret0 } -// Done indicates an expected call of Done -func (mr *MockAggregateIteratorMockRecorder) Done() *gomock.Call { +// Next indicates an expected call of Next +func (mr *MockAggregateIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockAggregateIterator)(nil).Done)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockAggregateIterator)(nil).Next), ctx) } // Err mocks base method @@ -1449,6 +1413,73 @@ func (mr *MockAggregateIteratorMockRecorder) Err() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockAggregateIterator)(nil).Err)) } +// SearchDuration mocks base method +func (m *MockAggregateIterator) SearchDuration() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchDuration") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// SearchDuration indicates an expected call of SearchDuration +func (mr *MockAggregateIteratorMockRecorder) SearchDuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockAggregateIterator)(nil).SearchDuration)) +} + +// Close mocks base method +func (m *MockAggregateIterator) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockAggregateIteratorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAggregateIterator)(nil).Close)) +} + +// AddSeries mocks base method +func (m *MockAggregateIterator) AddSeries(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSeries", count) +} + +// AddSeries indicates an expected call of AddSeries +func (mr *MockAggregateIteratorMockRecorder) AddSeries(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSeries", reflect.TypeOf((*MockAggregateIterator)(nil).AddSeries), count) +} + +// AddDocs mocks base method +func (m *MockAggregateIterator) AddDocs(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddDocs", count) +} + +// AddDocs indicates an expected call of AddDocs +func (mr *MockAggregateIteratorMockRecorder) AddDocs(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocs", reflect.TypeOf((*MockAggregateIterator)(nil).AddDocs), count) +} + +// Counts mocks base method +func (m *MockAggregateIterator) Counts() (int, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Counts") + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Counts indicates an expected call of Counts +func (mr *MockAggregateIteratorMockRecorder) Counts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Counts", reflect.TypeOf((*MockAggregateIterator)(nil).Counts)) +} + // Current mocks base method func (m *MockAggregateIterator) Current() ([]byte, []byte) { m.ctrl.T.Helper() @@ -1464,22 +1495,87 @@ func (mr *MockAggregateIteratorMockRecorder) Current() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockAggregateIterator)(nil).Current)) } -// Close mocks base method -func (m *MockAggregateIterator) Close() error { +// fieldsAndTermsIteratorOpts mocks base method +func (m *MockAggregateIterator) fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") + ret := m.ctrl.Call(m, "fieldsAndTermsIteratorOpts") + ret0, _ := ret[0].(fieldsAndTermsIteratorOpts) + return ret0 +} + +// fieldsAndTermsIteratorOpts indicates an expected call of fieldsAndTermsIteratorOpts +func (mr *MockAggregateIteratorMockRecorder) fieldsAndTermsIteratorOpts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "fieldsAndTermsIteratorOpts", reflect.TypeOf((*MockAggregateIterator)(nil).fieldsAndTermsIteratorOpts)) +} + +// MockResultIterator is a mock of ResultIterator interface +type MockResultIterator struct { + ctrl *gomock.Controller + recorder *MockResultIteratorMockRecorder +} + +// MockResultIteratorMockRecorder is the mock recorder for MockResultIterator +type MockResultIteratorMockRecorder struct { + mock *MockResultIterator +} + +// NewMockResultIterator creates a new mock instance +func NewMockResultIterator(ctrl *gomock.Controller) *MockResultIterator { + mock := &MockResultIterator{ctrl: ctrl} + mock.recorder = &MockResultIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockResultIterator) EXPECT() *MockResultIteratorMockRecorder { + return m.recorder +} + +// Done mocks base method +func (m *MockResultIterator) Done() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Done") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Done indicates an expected call of Done +func (mr *MockResultIteratorMockRecorder) Done() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockResultIterator)(nil).Done)) +} + +// Next mocks base method +func (m *MockResultIterator) Next(ctx context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next", ctx) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockResultIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockResultIterator)(nil).Next), ctx) +} + +// Err mocks base method +func (m *MockResultIterator) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") ret0, _ := ret[0].(error) return ret0 } -// Close indicates an expected call of Close -func (mr *MockAggregateIteratorMockRecorder) Close() *gomock.Call { +// Err indicates an expected call of Err +func (mr *MockResultIteratorMockRecorder) Err() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAggregateIterator)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockResultIterator)(nil).Err)) } // SearchDuration mocks base method -func (m *MockAggregateIterator) SearchDuration() time.Duration { +func (m *MockResultIterator) SearchDuration() time.Duration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SearchDuration") ret0, _ := ret[0].(time.Duration) @@ -1487,23 +1583,62 @@ func (m *MockAggregateIterator) SearchDuration() time.Duration { } // SearchDuration indicates an expected call of SearchDuration -func (mr *MockAggregateIteratorMockRecorder) SearchDuration() *gomock.Call { +func (mr *MockResultIteratorMockRecorder) SearchDuration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockAggregateIterator)(nil).SearchDuration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockResultIterator)(nil).SearchDuration)) } -// fieldsAndTermsIteratorOpts mocks base method -func (m *MockAggregateIterator) fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts { +// Close mocks base method +func (m *MockResultIterator) Close() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "fieldsAndTermsIteratorOpts") - ret0, _ := ret[0].(fieldsAndTermsIteratorOpts) + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) return ret0 } -// fieldsAndTermsIteratorOpts indicates an expected call of fieldsAndTermsIteratorOpts -func (mr *MockAggregateIteratorMockRecorder) fieldsAndTermsIteratorOpts() *gomock.Call { +// Close indicates an expected call of Close +func (mr *MockResultIteratorMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "fieldsAndTermsIteratorOpts", reflect.TypeOf((*MockAggregateIterator)(nil).fieldsAndTermsIteratorOpts)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockResultIterator)(nil).Close)) +} + +// AddSeries mocks base method +func (m *MockResultIterator) AddSeries(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSeries", count) +} + +// AddSeries indicates an expected call of AddSeries +func (mr *MockResultIteratorMockRecorder) AddSeries(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSeries", reflect.TypeOf((*MockResultIterator)(nil).AddSeries), count) +} + +// AddDocs mocks base method +func (m *MockResultIterator) AddDocs(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddDocs", count) +} + +// AddDocs indicates an expected call of AddDocs +func (mr *MockResultIteratorMockRecorder) AddDocs(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocs", reflect.TypeOf((*MockResultIterator)(nil).AddDocs), count) +} + +// Counts mocks base method +func (m *MockResultIterator) Counts() (int, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Counts") + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Counts indicates an expected call of Counts +func (mr *MockResultIteratorMockRecorder) Counts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Counts", reflect.TypeOf((*MockResultIterator)(nil).Counts)) } // MockfieldsAndTermsIterator is a mock of fieldsAndTermsIterator interface @@ -2252,3 +2387,31 @@ func (mr *MockOptionsMockRecorder) QueryLimits() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryLimits", reflect.TypeOf((*MockOptions)(nil).QueryLimits)) } + +// MaxWorkerTime mocks base method +func (m *MockOptions) MaxWorkerTime() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxWorkerTime") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// MaxWorkerTime indicates an expected call of MaxWorkerTime +func (mr *MockOptionsMockRecorder) MaxWorkerTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxWorkerTime", reflect.TypeOf((*MockOptions)(nil).MaxWorkerTime)) +} + +// SetMaxWorkerTime mocks base method +func (m *MockOptions) SetMaxWorkerTime(value time.Duration) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetMaxWorkerTime", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetMaxWorkerTime indicates an expected call of SetMaxWorkerTime +func (mr *MockOptionsMockRecorder) SetMaxWorkerTime(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxWorkerTime", reflect.TypeOf((*MockOptions)(nil).SetMaxWorkerTime), value) +} diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 055ab95cac..3e4328a473 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -22,6 +22,7 @@ package index import ( "errors" + "time" "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/limits" @@ -81,6 +82,8 @@ var ( defaultForegroundCompactionOpts compaction.PlannerOptions defaultBackgroundCompactionOpts compaction.PlannerOptions + // defaultMaxWorkerTime sets the default time a query can hold an index worker. + defaultMaxWorkerTime = time.Second ) func init() { @@ -135,6 +138,7 @@ type opts struct { readThroughSegmentOptions ReadThroughSegmentOptions mmapReporter mmap.Reporter queryLimits limits.QueryLimits + maxWorkerTime time.Duration } var undefinedUUIDFn = func() ([]byte, error) { return nil, errIDGenerationDisabled } @@ -195,6 +199,7 @@ func NewOptions() Options { foregroundCompactionPlannerOpts: defaultForegroundCompactionOpts, backgroundCompactionPlannerOpts: defaultBackgroundCompactionOpts, queryLimits: limits.NoOpQueryLimits(), + maxWorkerTime: defaultMaxWorkerTime, } resultsPool.Init(func() QueryResults { return NewQueryResults(nil, QueryResultsOptions{}, opts) @@ -460,3 +465,13 @@ func (o *opts) SetQueryLimits(value limits.QueryLimits) Options { func (o *opts) QueryLimits() limits.QueryLimits { return o.queryLimits } + +func (o *opts) MaxWorkerTime() time.Duration { + return o.maxWorkerTime +} + +func (o *opts) SetMaxWorkerTime(value time.Duration) Options { + opts := *o + opts.maxWorkerTime = value + return &opts +} diff --git a/src/dbnode/storage/index/query_iter.go b/src/dbnode/storage/index/query_iter.go new file mode 100644 index 0000000000..97e3d8f703 --- /dev/null +++ b/src/dbnode/storage/index/query_iter.go @@ -0,0 +1,81 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "time" + + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/x/context" +) + +type queryIter struct { + // immutable state + docIter doc.QueryDocIterator + + // mutable state + seriesCount, docCount int +} + +var _ QueryIterator = &queryIter{} + +// NewQueryIter wraps the provided QueryDocIterator as a QueryIterator +func NewQueryIter(docIter doc.QueryDocIterator) QueryIterator { + return &queryIter{ + docIter: docIter, + } +} + +func (q *queryIter) Done() bool { + return q.docIter.Done() +} + +func (q *queryIter) Next(_ context.Context) bool { + return q.docIter.Next() +} + +func (q *queryIter) Err() error { + return q.docIter.Err() +} + +func (q *queryIter) SearchDuration() time.Duration { + return q.docIter.SearchDuration() +} + +func (q *queryIter) Close() error { + return q.docIter.Close() +} + +func (q *queryIter) AddSeries(count int) { + q.seriesCount += count +} + +func (q *queryIter) AddDocs(count int) { + q.docCount += count +} + +func (q *queryIter) Counts() (series, docs int) { + return q.seriesCount, q.docCount +} + +func (q *queryIter) Current() doc.Document { + return q.docIter.Current() +} diff --git a/src/dbnode/storage/index/query_options.go b/src/dbnode/storage/index/query_options.go index ff7ffb5b4a..eccbc08c1d 100644 --- a/src/dbnode/storage/index/query_options.go +++ b/src/dbnode/storage/index/query_options.go @@ -43,7 +43,8 @@ func (o QueryOptions) LimitsExceeded(seriesCount, docsCount int) bool { return o.SeriesLimitExceeded(seriesCount) || o.DocsLimitExceeded(docsCount) } -func (o QueryOptions) exhaustive(seriesCount, docsCount int) bool { +// Exhaustive returns true if the provided counts did not exceeded the query limits. +func (o QueryOptions) Exhaustive(seriesCount, docsCount int) bool { return !o.SeriesLimitExceeded(seriesCount) && !o.DocsLimitExceeded(docsCount) } diff --git a/src/dbnode/storage/index/query_options_test.go b/src/dbnode/storage/index/query_options_test.go index 01add7e973..4706f297b7 100644 --- a/src/dbnode/storage/index/query_options_test.go +++ b/src/dbnode/storage/index/query_options_test.go @@ -45,9 +45,9 @@ func TestQueryOptions(t *testing.T) { assert.True(t, opts.LimitsExceeded(20, 9)) assert.False(t, opts.LimitsExceeded(19, 9)) - assert.False(t, opts.exhaustive(19, 10)) - assert.False(t, opts.exhaustive(20, 9)) - assert.True(t, opts.exhaustive(19, 9)) + assert.False(t, opts.Exhaustive(19, 10)) + assert.False(t, opts.Exhaustive(20, 9)) + assert.True(t, opts.Exhaustive(19, 9)) } func TestInvalidWideQueryOptions(t *testing.T) { diff --git a/src/dbnode/storage/index/results.go b/src/dbnode/storage/index/results.go index 385fa92bda..58a30a0f6c 100644 --- a/src/dbnode/storage/index/results.go +++ b/src/dbnode/storage/index/results.go @@ -23,7 +23,6 @@ package index import ( "errors" "sync" - "time" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" @@ -52,8 +51,7 @@ type results struct { idPool ident.Pool bytesPool pool.CheckedBytesPool - pool QueryResultsPool - resultDuration ResultDurations + pool QueryResultsPool } // NewQueryResults returns a new query results object. @@ -73,24 +71,6 @@ func NewQueryResults( } } -func (r *results) TotalDuration() ResultDurations { - r.RLock() - defer r.RUnlock() - return r.resultDuration -} - -func (r *results) AddBlockProcessingDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddProcessing(duration) -} - -func (r *results) AddBlockSearchDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddSearch(duration) -} - func (r *results) EnforceLimits() bool { return true } func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) { @@ -110,8 +90,6 @@ func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) { r.resultsMap.Reset() r.totalDocsCount = 0 - r.resultDuration = ResultDurations{} - r.opts = opts r.Unlock() diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 56ef408915..5e30c4f72b 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -154,15 +154,6 @@ type BaseResults interface { // TotalDocsCount returns the total number of documents observed. TotalDocsCount() int - // TotalDuration is the total ResultDurations for the query. - TotalDuration() ResultDurations - - // AddBlockProcessingDuration adds the processing duration for a single block to the TotalDuration. - AddBlockProcessingDuration(duration time.Duration) - - // AddBlockSearchDuration adds the search duration for a single block to the TotalDuration. - AddBlockSearchDuration(duration time.Duration) - // EnforceLimits returns whether this should enforce and increment limits. EnforceLimits() bool @@ -410,36 +401,18 @@ type Block interface { // WriteBatch writes a batch of provided entries. WriteBatch(inserts *WriteBatch) (WriteBatchResult, error) - // Query resolves the given query into known IDs. - Query( - ctx context.Context, - query Query, - opts QueryOptions, - results DocumentResults, - logFields []opentracinglog.Field, - ) (bool, error) - // QueryWithIter processes n docs from the iterator into known IDs. QueryWithIter( ctx context.Context, opts QueryOptions, - docIter doc.Iterator, + iter QueryIterator, results DocumentResults, - limit int, + deadline time.Time, + logFields []opentracinglog.Field, ) error - // QueryIter returns a new QueryDocIterator for the query. - QueryIter(ctx context.Context, query Query) (doc.QueryDocIterator, error) - - // Aggregate aggregates known tag names/values. - // NB(prateek): different from aggregating by means of Query, as we can - // avoid going to documents, relying purely on the indexed FSTs. - Aggregate( - ctx context.Context, - opts QueryOptions, - results AggregateResults, - logFields []opentracinglog.Field, - ) (bool, error) + // QueryIter returns a new QueryIterator for the query. + QueryIter(ctx context.Context, query Query) (QueryIterator, error) // AggregateWithIter aggregates N known tag names/values from the iterator. AggregateWithIter( @@ -447,7 +420,8 @@ type Block interface { iter AggregateIterator, opts QueryOptions, results AggregateResults, - limit int, + deadline time.Time, + logFields []opentracinglog.Field, ) error // AggregateIter returns a new AggregatorIterator. @@ -922,31 +896,49 @@ func (e WriteBatchEntry) Result() WriteBatchEntryResult { return *e.result } +// QueryIterator iterates through the documents for a block. +type QueryIterator interface { + ResultIterator + + // Current returns the current (field, term). + Current() doc.Document +} + // AggregateIterator iterates through the (field,term)s for a block. type AggregateIterator interface { + ResultIterator + + // Current returns the current (field, term). + Current() (field, term []byte) + + fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts +} + +// ResultIterator is a common interface for query and aggregate result iterators. +type ResultIterator interface { + // Done returns true if there are no more elements in the iterator. Allows checking if the query should acquire + // a permit, which might block, before calling Next(). + Done() bool + // Next processes the next (field,term) available with Current. Returns true if there are more to process. // Callers need to check Err after this returns false to check if an error occurred while iterating. Next(ctx context.Context) bool - // Done returns true if the iterator is exhausted. This non-standard iterating method allows any index query to - // check if there is more work to be done before waiting for a worker from the pool. - // If this method returns true, Next is guaranteed to return false. However, on the first iteration this will always - // return false and Next may return false for an empty iterator. - Done() bool - // Err returns an non-nil error if an error occurred calling Next. Err() error - // Current returns the current (field, term). - Current() (field, term []byte) + // SearchDuration is how long it took search the FSTs for the results returned by the iterator. + SearchDuration() time.Duration - // Close the iterator and underlying resources. + // Close the iterator. Close() error - // SearchDuration is how long it took to search the segments in the block. - SearchDuration() time.Duration + AddSeries(count int) - fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts + AddDocs(count int) + + // Counts returns the number of series and documents processed by the iterator. + Counts() (series, docs int) } // fieldsAndTermsIterator iterates over all known fields and terms for a segment. @@ -1105,4 +1097,10 @@ type Options interface { // QueryLimits returns the current query limits. QueryLimits() limits.QueryLimits + + // MaxWorkerTime returns the max time a query can hold an index worker. + MaxWorkerTime() time.Duration + + // SetMaxWorkerTime sets MaxWorkerTime. + SetMaxWorkerTime(value time.Duration) Options } diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index f209184034..cb5ad85312 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "sync" - "time" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" @@ -62,8 +61,7 @@ type wideResults struct { // document is discovered whose shard exceeds the last shard this results // is responsible for, using the fact that incoming documents are sorted by // shard then by ID. - pastLastShard bool - resultDuration ResultDurations + pastLastShard bool } // NewWideQueryResults returns a new wide query results object. @@ -94,24 +92,6 @@ func NewWideQueryResults( return results } -func (r *wideResults) TotalDuration() ResultDurations { - r.RLock() - defer r.RUnlock() - return r.resultDuration -} - -func (r *wideResults) AddBlockProcessingDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddProcessing(duration) -} - -func (r *wideResults) AddBlockSearchDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddSearch(duration) -} - func (r *wideResults) EnforceLimits() bool { // NB: wide results should not enforce limits, as they may span an entire // block in a memory constrained batch-wise fashion. diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index 31ffdc82d1..71f5fb8284 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -45,6 +45,7 @@ import ( "github.com/golang/mock/gomock" opentracing "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/require" ) @@ -642,7 +643,12 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + mockIter0 := index.NewMockQueryIterator(ctrl) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) + result, err := idx.Query(ctx, q, qOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -653,8 +659,17 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { EndExclusive: t2.Add(time.Minute), RequireExhaustive: test.requireExhaustive, } - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) - b1.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) + + mockIter1 := index.NewMockQueryIterator(ctrl) + b1.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter1, nil) + mockIter1.EXPECT().Done().Return(true) + mockIter1.EXPECT().SearchDuration().Return(time.Minute) + mockIter1.EXPECT().Close().Return(nil) + result, err = idx.Query(ctx, q, qOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -664,8 +679,32 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { StartInclusive: t0, EndExclusive: t0.Add(time.Minute), RequireExhaustive: test.requireExhaustive, + SeriesLimit: 1, } - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(false, nil) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter0, nil) + b0.EXPECT().QueryWithIter(gomock.Any(), qOpts, mockIter0, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + opts index.QueryOptions, + iter index.QueryIterator, + r index.QueryResults, + deadline time.Time, + logFields []opentracinglog.Field, + ) error { + _, _, err = r.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("A")}), + doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("B")}), + }) + require.NoError(t, err) + return nil + }) + gomock.InOrder( + mockIter0.EXPECT().Done().Return(false), + mockIter0.EXPECT().Done().Return(true), + mockIter0.EXPECT().SearchDuration().Return(time.Minute), + mockIter0.EXPECT().Close().Return(nil), + ) + result, err = idx.Query(ctx, q, qOpts) if test.requireExhaustive { require.Error(t, err) @@ -677,7 +716,7 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { sp.Finish() spans := mtr.FinishedSpans() - require.Len(t, spans, 15) + require.Len(t, spans, 8) }) } } @@ -775,15 +814,6 @@ func TestLimits(t *testing.T) { requireExhaustive: false, expectedErr: "", }, - { - name: "no limits", - seriesLimit: 0, - docsLimit: 0, - requireExhaustive: true, - expectedErr: "query exceeded limit: require_exhaustive=true, " + - "series_limit=0, series_matched=1, docs_limit=0, docs_matched=2", - expectedQueryLimitExceededError: true, - }, { name: "series limit only", seriesLimit: 1, @@ -829,12 +859,22 @@ func TestLimits(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()). + mockIter := index.NewMockQueryIterator(ctrl) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter, nil) + gomock.InOrder( + mockIter.EXPECT().Done().Return(false), + mockIter.EXPECT().Done().Return(true), + mockIter.EXPECT().SearchDuration().Return(time.Minute), + mockIter.EXPECT().Close().Return(err), + ) + + b0.EXPECT().QueryWithIter(gomock.Any(), qOpts, mockIter, gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, - query interface{}, opts interface{}, + iter interface{}, results index.DocumentResults, - logFields interface{}) (bool, error) { + deadline interface{}, + logFields interface{}) error { _, _, err = results.AddDocuments([]doc.Document{ // Results in size=1 and docs=2. // Byte array represents ID encoded as bytes. @@ -844,11 +884,16 @@ func TestLimits(t *testing.T) { doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("A")}), }) require.NoError(t, err) - return false, nil + return nil }) result, err := idx.Query(ctx, q, qOpts) - require.False(t, result.Exhaustive) + if test.seriesLimit == 0 && test.docsLimit == 0 { + require.True(t, result.Exhaustive) + } else { + require.False(t, result.Exhaustive) + } + if test.requireExhaustive { require.Error(t, err) require.Equal(t, test.expectedErr, err.Error()) @@ -951,9 +996,13 @@ func TestNamespaceIndexBlockQueryReleasingContext(t *testing.T) { StartInclusive: t0, EndExclusive: now.Add(time.Minute), } + mockIter := index.NewMockQueryIterator(ctrl) gomock.InOrder( mockPool.EXPECT().Get().Return(stubResult), - b0.EXPECT().Query(ctx, q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil), + b0.EXPECT().QueryIter(ctx, q).Return(mockIter, nil), + mockIter.EXPECT().Done().Return(true), + mockIter.EXPECT().SearchDuration().Return(time.Minute), + mockIter.EXPECT().Close().Return(nil), mockPool.EXPECT().Put(stubResult), ) _, err = idx.Query(ctx, q, qOpts) @@ -1062,7 +1111,11 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { } aggOpts := index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + mockIter0 := index.NewMockAggregateIterator(ctrl) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) result, err := idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1074,8 +1127,16 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { RequireExhaustive: test.requireExhaustive, } aggOpts = index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) - b1.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) + + mockIter1 := index.NewMockAggregateIterator(ctrl) + b1.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter1, nil) + mockIter1.EXPECT().Done().Return(true) + mockIter1.EXPECT().SearchDuration().Return(time.Minute) + mockIter1.EXPECT().Close().Return(nil) result, err = idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1085,8 +1146,35 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { StartInclusive: t0, EndExclusive: t0.Add(time.Minute), RequireExhaustive: test.requireExhaustive, + DocsLimit: 1, } - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(false, nil) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + //nolint: dupl + b0.EXPECT(). + AggregateWithIter(gomock.Any(), mockIter0, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + iter index.AggregateIterator, + opts index.QueryOptions, + results index.AggregateResults, + deadline time.Time, + logFields []opentracinglog.Field, + ) error { + _, _ = results.AddFields([]index.AggregateResultsEntry{{ + Field: ident.StringID("A"), + Terms: []ident.ID{ident.StringID("foo")}, + }, { + Field: ident.StringID("B"), + Terms: []ident.ID{ident.StringID("bar")}, + }}) + return nil + }) + gomock.InOrder( + mockIter0.EXPECT().Done().Return(false), + mockIter0.EXPECT().Done().Return(true), + mockIter0.EXPECT().SearchDuration().Return(time.Minute), + mockIter0.EXPECT().Close().Return(nil), + ) aggOpts = index.AggregationOptions{QueryOptions: qOpts} result, err = idx.AggregateQuery(ctx, q, aggOpts) if test.requireExhaustive { @@ -1099,7 +1187,7 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { sp.Finish() spans := mtr.FinishedSpans() - require.Len(t, spans, 15) + require.Len(t, spans, 8) }) } } @@ -1201,9 +1289,13 @@ func TestNamespaceIndexBlockAggregateQueryReleasingContext(t *testing.T) { } aggOpts := index.AggregationOptions{QueryOptions: qOpts} + mockIter := index.NewMockAggregateIterator(ctrl) gomock.InOrder( mockPool.EXPECT().Get().Return(stubResult), - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil), + b0.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter, nil), + mockIter.EXPECT().Done().Return(true), + mockIter.EXPECT().SearchDuration().Return(time.Minute), + mockIter.EXPECT().Close().Return(nil), mockPool.EXPECT().Put(stubResult), ) _, err = idx.AggregateQuery(ctx, q, aggOpts) @@ -1307,7 +1399,11 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { q := index.Query{ Query: query, } - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + mockIter0 := index.NewMockAggregateIterator(ctrl) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Second) + mockIter0.EXPECT().Close().Return(nil) + b0.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter0, nil) result, err := idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1319,8 +1415,17 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { RequireExhaustive: test.requireExhaustive, } aggOpts = index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) - b1.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Second) + mockIter0.EXPECT().Close().Return(nil) + b0.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter0, nil) + + mockIter1 := index.NewMockAggregateIterator(ctrl) + mockIter1.EXPECT().Done().Return(true) + mockIter1.EXPECT().SearchDuration().Return(time.Second) + mockIter1.EXPECT().Close().Return(nil) + b1.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter1, nil) result, err = idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1330,8 +1435,35 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { StartInclusive: t0, EndExclusive: t0.Add(time.Minute), RequireExhaustive: test.requireExhaustive, + DocsLimit: 1, } - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(false, nil) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + //nolint: dupl + b0.EXPECT(). + AggregateWithIter(gomock.Any(), mockIter0, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + iter index.AggregateIterator, + opts index.QueryOptions, + results index.AggregateResults, + deadline time.Time, + logFields []opentracinglog.Field, + ) error { + _, _ = results.AddFields([]index.AggregateResultsEntry{{ + Field: ident.StringID("A"), + Terms: []ident.ID{ident.StringID("foo")}, + }, { + Field: ident.StringID("B"), + Terms: []ident.ID{ident.StringID("bar")}, + }}) + return nil + }) + gomock.InOrder( + mockIter0.EXPECT().Done().Return(false), + mockIter0.EXPECT().Done().Return(true), + mockIter0.EXPECT().SearchDuration().Return(time.Minute), + mockIter0.EXPECT().Close().Return(nil), + ) aggOpts = index.AggregationOptions{QueryOptions: qOpts} result, err = idx.AggregateQuery(ctx, q, aggOpts) if test.requireExhaustive { diff --git a/src/dbnode/storage/index_query_concurrent_test.go b/src/dbnode/storage/index_query_concurrent_test.go index 4443693e51..fb5f24127e 100644 --- a/src/dbnode/storage/index_query_concurrent_test.go +++ b/src/dbnode/storage/index_query_concurrent_test.go @@ -34,12 +34,12 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/index/convert" + "github.com/m3db/m3/src/dbnode/storage/limits/permits" testutil "github.com/m3db/m3/src/dbnode/test" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" - xsync "github.com/m3db/m3/src/x/sync" xtest "github.com/m3db/m3/src/x/test" "github.com/fortytw2/leaktest" @@ -123,8 +123,7 @@ func testNamespaceIndexHighConcurrentQueries( nsIdx := test.index.(*nsIndex) nsIdx.state.Lock() // Make the query pool really high to improve concurrency likelihood - nsIdx.queryWorkersPool = xsync.NewWorkerPool(1000) - nsIdx.queryWorkersPool.Init() + nsIdx.permitsManager = permits.NewFixedPermitsManager(1000) currNow := min nowLock := &sync.Mutex{} @@ -223,32 +222,39 @@ func testNamespaceIndexHighConcurrentQueries( EndTime(). DoAndReturn(func() time.Time { return block.EndTime() }). AnyTimes() + mockBlock.EXPECT().QueryIter(gomock.Any(), gomock.Any()).DoAndReturn(func( + ctx context.Context, query index.Query) (index.QueryIterator, error) { + return block.QueryIter(ctx, query) + }, + ).AnyTimes() if opts.blockErrors { mockBlock.EXPECT(). - Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + QueryWithIter(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( _ context.Context, - _ index.Query, _ index.QueryOptions, + _ index.QueryIterator, _ index.QueryResults, + _ time.Time, _ []opentracinglog.Field, - ) (bool, error) { - return false, errors.New("some-error") + ) error { + return errors.New("some-error") }). AnyTimes() } else { mockBlock.EXPECT(). - Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + QueryWithIter(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( ctx context.Context, - q index.Query, opts index.QueryOptions, + iter index.QueryIterator, r index.QueryResults, + deadline time.Time, logFields []opentracinglog.Field, - ) (bool, error) { + ) error { time.Sleep(timeoutValue + time.Second) - return block.Query(ctx, q, opts, r, logFields) + return block.QueryWithIter(ctx, opts, iter, r, deadline, logFields) }). AnyTimes() } diff --git a/src/dbnode/storage/index_test.go b/src/dbnode/storage/index_test.go index b551869bec..07b7014f8a 100644 --- a/src/dbnode/storage/index_test.go +++ b/src/dbnode/storage/index_test.go @@ -400,22 +400,29 @@ func TestNamespaceIndexQueryTimeout(t *testing.T) { ctx := context.NewWithGoContext(stdCtx) defer ctx.Close() + mockIter := index.NewMockQueryIterator(ctrl) + mockIter.EXPECT().Done().Return(false).Times(2) + mockIter.EXPECT().SearchDuration().Return(time.Minute * 1) + mockIter.EXPECT().Close().Return(nil) + mockBlock := index.NewMockBlock(ctrl) mockBlock.EXPECT().Stats(gomock.Any()).Return(nil).AnyTimes() blockTime := now.Add(-1 * test.indexBlockSize) mockBlock.EXPECT().StartTime().Return(blockTime).AnyTimes() mockBlock.EXPECT().EndTime().Return(blockTime.Add(test.indexBlockSize)).AnyTimes() + mockBlock.EXPECT().QueryIter(gomock.Any(), gomock.Any()).Return(mockIter, nil) mockBlock.EXPECT(). - Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + QueryWithIter(gomock.Any(), gomock.Any(), mockIter, gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( ctx context.Context, - q index.Query, opts index.QueryOptions, + iter index.QueryIterator, r index.QueryResults, + deadline time.Time, logFields []opentracinglog.Field, - ) (bool, error) { + ) error { <-ctx.GoContext().Done() - return false, ctx.GoContext().Err() + return ctx.GoContext().Err() }) mockBlock.EXPECT().Close().Return(nil) idx.state.blocksByTime[xtime.ToUnixNano(blockTime)] = mockBlock diff --git a/src/dbnode/storage/limits/permits/fixed_permits.go b/src/dbnode/storage/limits/permits/fixed_permits.go new file mode 100644 index 0000000000..13db162038 --- /dev/null +++ b/src/dbnode/storage/limits/permits/fixed_permits.go @@ -0,0 +1,91 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package permits + +import ( + "github.com/m3db/m3/src/x/context" +) + +type fixedPermits struct { + permits chan struct{} +} + +type fixedPermitsManager struct { + fp fixedPermits +} + +var ( + _ Permits = &fixedPermits{} + _ Manager = &fixedPermitsManager{} +) + +// NewFixedPermitsManager returns a permits manager that uses a fixed size of permits. +func NewFixedPermitsManager(size int) Manager { + fp := fixedPermits{permits: make(chan struct{}, size)} + for i := 0; i < size; i++ { + fp.permits <- struct{}{} + } + return &fixedPermitsManager{fp} +} + +func (f *fixedPermitsManager) NewPermits(_ context.Context) (Permits, error) { + return &f.fp, nil +} + +func (f *fixedPermits) Acquire(ctx context.Context) error { + // don't acquire a permit if ctx is already done. + select { + case <-ctx.GoContext().Done(): + return ctx.GoContext().Err() + default: + } + + select { + case <-ctx.GoContext().Done(): + return ctx.GoContext().Err() + case <-f.permits: + return nil + } +} + +func (f *fixedPermits) TryAcquire(ctx context.Context) (bool, error) { + // don't acquire a permit if ctx is already done. + select { + case <-ctx.GoContext().Done(): + return false, ctx.GoContext().Err() + default: + } + + select { + case <-f.permits: + return true, nil + default: + return false, nil + } +} + +func (f *fixedPermits) Release(_ int64) { + select { + case f.permits <- struct{}{}: + default: + panic("more permits released than acquired") + } +} diff --git a/src/dbnode/storage/limits/permits/fixed_permits_test.go b/src/dbnode/storage/limits/permits/fixed_permits_test.go new file mode 100644 index 0000000000..7b6f0dd80e --- /dev/null +++ b/src/dbnode/storage/limits/permits/fixed_permits_test.go @@ -0,0 +1,69 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package permits + +import ( + stdctx "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/x/context" +) + +func TestFixedPermits(t *testing.T) { + ctx := context.NewBackground() + fp, err := NewFixedPermitsManager(3).NewPermits(ctx) + require.NoError(t, err) + require.NoError(t, fp.Acquire(ctx)) + require.NoError(t, fp.Acquire(ctx)) + require.NoError(t, fp.Acquire(ctx)) + + acq, err := fp.TryAcquire(ctx) + require.NoError(t, err) + require.False(t, acq) + + fp.Release(0) + require.NoError(t, fp.Acquire(ctx)) +} + +func TestFixedPermitsTimeouts(t *testing.T) { + ctx := context.NewBackground() + fp, err := NewFixedPermitsManager(1).NewPermits(ctx) + require.NoError(t, err) + require.NoError(t, fp.Acquire(ctx)) + + acq, err := fp.TryAcquire(ctx) + require.NoError(t, err) + require.False(t, acq) + + stdCtx, cancel := stdctx.WithCancel(stdctx.Background()) + cancel() + ctx = context.NewWithGoContext(stdCtx) + + fp.Release(0) + + err = fp.Acquire(ctx) + require.Error(t, err) + + _, err = fp.TryAcquire(ctx) + require.Error(t, err) +} diff --git a/src/dbnode/storage/limits/permits/lookback_limit_permit.go b/src/dbnode/storage/limits/permits/lookback_limit_permit.go index 38e4284c38..b495e37596 100644 --- a/src/dbnode/storage/limits/permits/lookback_limit_permit.go +++ b/src/dbnode/storage/limits/permits/lookback_limit_permit.go @@ -99,7 +99,7 @@ func (p *LookbackLimitPermit) TryAcquire(context.Context) (bool, error) { } // Release is a no-op in this implementation. -func (p *LookbackLimitPermit) Release() { +func (p *LookbackLimitPermit) Release(_ int64) { } func sourceFromContext(ctx context.Context) []byte { diff --git a/src/dbnode/storage/limits/permits/noop_permit.go b/src/dbnode/storage/limits/permits/noop_permit.go index f9f9e4833f..6afb174b4a 100644 --- a/src/dbnode/storage/limits/permits/noop_permit.go +++ b/src/dbnode/storage/limits/permits/noop_permit.go @@ -51,5 +51,5 @@ func (p noOpPermits) TryAcquire(context.Context) (bool, error) { return true, nil } -func (p noOpPermits) Release() { +func (p noOpPermits) Release(_ int64) { } diff --git a/src/dbnode/storage/limits/permits/options.go b/src/dbnode/storage/limits/permits/options.go index fe0bedcd28..7e74cb1899 100644 --- a/src/dbnode/storage/limits/permits/options.go +++ b/src/dbnode/storage/limits/permits/options.go @@ -20,14 +20,22 @@ package permits +import ( + "math" + "runtime" +) + type options struct { seriesReadManager Manager + indexQueryManager Manager } // NewOptions return a new set of default permit managers. func NewOptions() Options { return &options{ seriesReadManager: NewNoOpPermitsManager(), + // Default to using half of the available cores for querying IDs + indexQueryManager: NewFixedPermitsManager(int(math.Ceil(float64(runtime.NumCPU()) / 2))), } } @@ -42,3 +50,13 @@ func (o *options) SetSeriesReadPermitsManager(value Manager) Options { func (o *options) SeriesReadPermitsManager() Manager { return o.seriesReadManager } + +func (o *options) IndexQueryPermitsManager() Manager { + return o.indexQueryManager +} + +func (o *options) SetIndexQueryPermitsManager(value Manager) Options { + opts := *o + opts.indexQueryManager = value + return &opts +} diff --git a/src/dbnode/storage/limits/permits/types.go b/src/dbnode/storage/limits/permits/types.go index 504e0a29f7..61c69248b8 100644 --- a/src/dbnode/storage/limits/permits/types.go +++ b/src/dbnode/storage/limits/permits/types.go @@ -25,6 +25,10 @@ import "github.com/m3db/m3/src/x/context" // Options is the permit options. type Options interface { + // IndexQueryPermitsManager returns the index query permits manager. + IndexQueryPermitsManager() Manager + // SetIndexQueryPermitsManager sets the index query permits manager. + SetIndexQueryPermitsManager(manager Manager) Options // SeriesReadPermitsManager returns the series read permits manager. SeriesReadPermitsManager() Manager // SetSeriesReadPermitsManager sets the series read permits manager. @@ -46,7 +50,8 @@ type Permits interface { // true if an resource was acquired. TryAcquire(ctx context.Context) (bool, error) - // Release gives back one acquired permit from the specific permits instance. + // Release gives back one acquired permit from the specific permits instance. The user can pass an optional quota + // indicating how much of quota was used while holding the permit. // Cannot release more permits than have been acquired. - Release() + Release(quota int64) } diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index ec56c22bda..7e61b283a1 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -23,8 +23,6 @@ package storage import ( "errors" "fmt" - "math" - "runtime" "time" "github.com/m3db/m3/src/dbnode/client" @@ -52,7 +50,6 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" "github.com/m3db/m3/src/x/pool" - xsync "github.com/m3db/m3/src/x/sync" ) const ( @@ -159,7 +156,6 @@ type options struct { identifierPool ident.Pool fetchBlockMetadataResultsPool block.FetchBlockMetadataResultsPool fetchBlocksMetadataResultsPool block.FetchBlocksMetadataResultsPool - queryIDsWorkerPool xsync.WorkerPool writeBatchPool *writes.WriteBatchPool bufferBucketPool *series.BufferBucketPool bufferBucketVersionsPool *series.BufferBucketVersionsPool @@ -197,10 +193,6 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { bytesPool.Init() seriesOpts := series.NewOptions() - // Default to using half of the available cores for querying IDs - queryIDsWorkerPool := xsync.NewWorkerPool(int(math.Ceil(float64(runtime.NumCPU()) / 2))) - queryIDsWorkerPool.Init() - writeBatchPool := writes.NewWriteBatchPool(poolOpts, nil, nil) writeBatchPool.Init() @@ -245,7 +237,6 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { }), fetchBlockMetadataResultsPool: block.NewFetchBlockMetadataResultsPool(poolOpts, 0), fetchBlocksMetadataResultsPool: block.NewFetchBlocksMetadataResultsPool(poolOpts, 0), - queryIDsWorkerPool: queryIDsWorkerPool, writeBatchPool: writeBatchPool, bufferBucketVersionsPool: series.NewBufferBucketVersionsPool(poolOpts), bufferBucketPool: series.NewBufferBucketPool(poolOpts), @@ -703,16 +694,6 @@ func (o *options) FetchBlocksMetadataResultsPool() block.FetchBlocksMetadataResu return o.fetchBlocksMetadataResultsPool } -func (o *options) SetQueryIDsWorkerPool(value xsync.WorkerPool) Options { - opts := *o - opts.queryIDsWorkerPool = value - return &opts -} - -func (o *options) QueryIDsWorkerPool() xsync.WorkerPool { - return o.queryIDsWorkerPool -} - func (o *options) SetWriteBatchPool(value *writes.WriteBatchPool) Options { opts := *o opts.writeBatchPool = value diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 18d227f3aa..74378deb2f 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -56,7 +56,6 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" "github.com/m3db/m3/src/x/pool" - sync0 "github.com/m3db/m3/src/x/sync" time0 "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -4609,34 +4608,6 @@ func (mr *MockOptionsMockRecorder) FetchBlocksMetadataResultsPool() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBlocksMetadataResultsPool", reflect.TypeOf((*MockOptions)(nil).FetchBlocksMetadataResultsPool)) } -// SetQueryIDsWorkerPool mocks base method -func (m *MockOptions) SetQueryIDsWorkerPool(value sync0.WorkerPool) Options { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetQueryIDsWorkerPool", value) - ret0, _ := ret[0].(Options) - return ret0 -} - -// SetQueryIDsWorkerPool indicates an expected call of SetQueryIDsWorkerPool -func (mr *MockOptionsMockRecorder) SetQueryIDsWorkerPool(value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetQueryIDsWorkerPool", reflect.TypeOf((*MockOptions)(nil).SetQueryIDsWorkerPool), value) -} - -// QueryIDsWorkerPool mocks base method -func (m *MockOptions) QueryIDsWorkerPool() sync0.WorkerPool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "QueryIDsWorkerPool") - ret0, _ := ret[0].(sync0.WorkerPool) - return ret0 -} - -// QueryIDsWorkerPool indicates an expected call of QueryIDsWorkerPool -func (mr *MockOptionsMockRecorder) QueryIDsWorkerPool() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIDsWorkerPool", reflect.TypeOf((*MockOptions)(nil).QueryIDsWorkerPool)) -} - // SetWriteBatchPool mocks base method func (m *MockOptions) SetWriteBatchPool(value *writes.WriteBatchPool) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 0e5d313003..e1b27f1616 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -54,7 +54,6 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" "github.com/m3db/m3/src/x/pool" - xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" ) @@ -1217,12 +1216,6 @@ type Options interface { // FetchBlocksMetadataResultsPool returns the fetchBlocksMetadataResultsPool. FetchBlocksMetadataResultsPool() block.FetchBlocksMetadataResultsPool - // SetQueryIDsWorkerPool sets the QueryIDs worker pool. - SetQueryIDsWorkerPool(value xsync.WorkerPool) Options - - // QueryIDsWorkerPool returns the QueryIDs worker pool. - QueryIDsWorkerPool() xsync.WorkerPool - // SetWriteBatchPool sets the WriteBatch pool. SetWriteBatchPool(value *writes.WriteBatchPool) Options