Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict the number of results processed per index worker #3269

Merged
merged 26 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9abfd17
Restrict the number of results processed per index worker
ryanhall07 Feb 23, 2021
b4b543e
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 23, 2021
7c11e7d
mock gen
ryanhall07 Feb 23, 2021
c8e2e40
use channels for fixed permits
ryanhall07 Feb 23, 2021
2fa777b
use TryAcquire to limit # of go routines
ryanhall07 Feb 24, 2021
ae102f5
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 24, 2021
4581f11
Just use Acquire to get permits
ryanhall07 Feb 24, 2021
2107c97
review comments
ryanhall07 Feb 25, 2021
a22b186
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
13cb785
fix tests
ryanhall07 Feb 25, 2021
d5b6680
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
2d24354
fix config test
ryanhall07 Feb 25, 2021
ae5ac98
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
ae900ca
Limit block iters by time
ryanhall07 Feb 25, 2021
1f9ebbd
fix tests
ryanhall07 Feb 25, 2021
b9cf04c
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
92c93e3
remove ctx from Release
ryanhall07 Feb 26, 2021
ad52f6f
Merge branch 'rhall-worker-pool-iter' of github.com:m3db/m3 into rhal…
ryanhall07 Feb 26, 2021
a3c33af
permit quota is int64
ryanhall07 Feb 26, 2021
53c5a48
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 26, 2021
dd0678a
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 27, 2021
1a563c3
default max worker time to 1s
ryanhall07 Feb 27, 2021
c08b57a
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 28, 2021
b7ab449
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Mar 1, 2021
5b9b982
Remove unused MaxResultsPerWorkerConfiguration
ryanhall07 Mar 1, 2021
1337a6d
Merge branch 'rhall-worker-pool-iter' of github.com:m3db/m3 into rhal…
ryanhall07 Mar 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,14 @@ type IndexConfiguration struct {
// as they are very CPU-intensive (regex and FST matching).
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"`

// MaxResultsPerPermit is the maximum index results a query can process after obtaining a permit. If a query needs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we split this up into different values for 1) regular queries 2) aggregate queries?

Aggregate queries when scoped with match[]=... are relatively more expensive per iteration since they have to do postings list interception with each aggregate term which is progressed to, as per:

fti.current.term, fti.current.postings = fti.termIter.Current()
if fti.restrictByPostings == nil {
// No restrictions.
return true, nil
}
bitmap, ok := roaring.BitmapFromPostingsList(fti.current.postings)
if !ok {
return false, errUnpackBitmapFromPostingsList
}
// Check term is part of at least some of the documents we're
// restricted to providing results for based on intersection
// count.
// Note: IntersectionCount is significantly faster than intersecting and
// counting results and also does not allocate.
if n := fti.restrictByPostings.IntersectionCount(bitmap); n > 0 {
// Matches, this is next result.
return true, nil
}
(term progression)
and
field, pl := fieldIter.Current()
if !fti.opts.allow(field) {
continue
}
if fti.restrictByPostings == nil {
// No restrictions.
fti.current.field = field
return true
}
bitmap, ok := roaring.BitmapFromPostingsList(pl)
if !ok {
fti.err = errUnpackBitmapFromPostingsList
return false
}
// Check field is part of at least some of the documents we're
// restricted to providing results for based on intersection
// count.
// Note: IntersectionCount is significantly faster than intersecting and
// counting results and also does not allocate.
if n := fti.restrictByPostings.IntersectionCount(bitmap); n < 1 {
// No match, not next result.
continue
}
// Matches, this is next result.
fti.current.field = field
return true
(field progression)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could do this in a followup? Might be too much to add to scope of this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's pretty easy to do now, just 2 different config values.

// to process more results it yields the permit and must wait again for another permit to resume. The number of
// permits available to all queries is defined by MaxQueryIDsConcurrency.
// Capping the maximum results per permit ensures a few large queries don't hold all the concurrent permits and lock
// out many small queries from running. This should be set higher than the max results returned by the vast majority
// of queries, so most queries only need to obtain a single permit.
MaxResultsPerPermit int `yaml:"maxResultsPerPermit"`
ryanhall07 marked this conversation as resolved.
Show resolved Hide resolved

// RegexpDFALimit is the limit on the max number of states used by a
// regexp deterministic finite automaton. Default is 10,000 states.
RegexpDFALimit *int `yaml:"regexpDFALimit"`
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func TestConfiguration(t *testing.T) {
expected := `db:
index:
maxQueryIDsConcurrency: 0
maxResultsPerPermit: 0
regexpDFALimit: null
regexpFSALimit: null
forwardIndexProbability: 0
Expand Down
26 changes: 14 additions & 12 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ import (
xos "github.com/m3db/m3/src/x/os"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"

apachethrift "github.com/apache/thrift/lib/go/thrift"
"github.com/m3dbx/vellum/levenshtein"
"github.com/m3dbx/vellum/levenshtein2"
"github.com/m3dbx/vellum/regexp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -371,14 +370,6 @@ func Run(runOpts RunOptions) {

opentracing.SetGlobalTracer(tracer)

if cfg.Index.MaxQueryIDsConcurrency != 0 {
queryIDsWorkerPool := xsync.NewWorkerPool(cfg.Index.MaxQueryIDsConcurrency)
queryIDsWorkerPool.Init()
opts = opts.SetQueryIDsWorkerPool(queryIDsWorkerPool)
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}

// Set global index options.
if n := cfg.Index.RegexpDFALimitOrDefault(); n > 0 {
regexp.SetStateLimit(n)
Expand Down Expand Up @@ -481,8 +472,14 @@ func Run(runOpts RunOptions) {
seriesReadPermits.Start()
defer seriesReadPermits.Stop()

opts = opts.SetPermitsOptions(opts.PermitsOptions().
SetSeriesReadPermitsManager(seriesReadPermits))
permitOpts := opts.PermitsOptions().SetSeriesReadPermitsManager(seriesReadPermits)
if cfg.Index.MaxQueryIDsConcurrency != 0 {
permitOpts.SetIndexQueryPermitsManager(permits.NewFixedPermitsManager(cfg.Index.MaxQueryIDsConcurrency))
ryanhall07 marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}

opts = opts.SetPermitsOptions(permitOpts)

// Setup postings list cache.
var (
Expand Down Expand Up @@ -524,6 +521,11 @@ func Run(runOpts RunOptions) {
}).
SetMmapReporter(mmapReporter).
SetQueryLimits(queryLimits)

if cfg.Index.MaxResultsPerPermit > 0 {
indexOpts.SetResultsPerPermit(cfg.Index.MaxResultsPerPermit)
ryanhall07 marked this conversation as resolved.
Show resolved Hide resolved
}

opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
Expand Down
Loading