Skip to content

Commit

Permalink
Restrict the number of results processed per index worker
Browse files Browse the repository at this point in the history
A new MaxResultsPerPermit option is introduced to cap how many index
results an index worker can process at a time. If the max is exceeded, the
index worker must yield the permit back and acquire it again
(potentially waiting) to continue processing the results.

This cap ensures large queries don't dominate the finite number of index
workers allowed to run concurrently and lock out smaller queries. The
idea is users would want to set the max large enough so the vast
majority of typical queries can finish with only a single permit
acquisition.
  • Loading branch information
ryanhall07 committed Feb 23, 2021
1 parent 9e3ecd6 commit 9abfd17
Show file tree
Hide file tree
Showing 25 changed files with 789 additions and 745 deletions.
8 changes: 8 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,14 @@ type IndexConfiguration struct {
// as they are very CPU-intensive (regex and FST matching).
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"`

// MaxResultsPerPermit is the maximum index results a query can process after obtaining a permit. If a query needs
// to process more results it yields the permit and must wait again for another permit to resume. The number of
// permits available to all queries is defined by MaxQueryIDsConcurrency.
// Capping the maximum results per permit ensures a few large queries don't hold all the concurrent permits and lock
// out many small queries from running. This should be set higher than the max results returned by the vast majority
// of queries, so most queries only need to obtain a single permit.
MaxResultsPerPermit int `yaml:"maxResultsPerPermit"`

// RegexpDFALimit is the limit on the max number of states used by a
// regexp deterministic finite automaton. Default is 10,000 states.
RegexpDFALimit *int `yaml:"regexpDFALimit"`
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func TestConfiguration(t *testing.T) {
expected := `db:
index:
maxQueryIDsConcurrency: 0
maxResultsPerPermit: 0
regexpDFALimit: null
regexpFSALimit: null
forwardIndexProbability: 0
Expand Down
26 changes: 14 additions & 12 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ import (
xos "github.com/m3db/m3/src/x/os"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"

apachethrift "github.com/apache/thrift/lib/go/thrift"
"github.com/m3dbx/vellum/levenshtein"
"github.com/m3dbx/vellum/levenshtein2"
"github.com/m3dbx/vellum/regexp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -371,14 +370,6 @@ func Run(runOpts RunOptions) {

opentracing.SetGlobalTracer(tracer)

if cfg.Index.MaxQueryIDsConcurrency != 0 {
queryIDsWorkerPool := xsync.NewWorkerPool(cfg.Index.MaxQueryIDsConcurrency)
queryIDsWorkerPool.Init()
opts = opts.SetQueryIDsWorkerPool(queryIDsWorkerPool)
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}

// Set global index options.
if n := cfg.Index.RegexpDFALimitOrDefault(); n > 0 {
regexp.SetStateLimit(n)
Expand Down Expand Up @@ -481,8 +472,14 @@ func Run(runOpts RunOptions) {
seriesReadPermits.Start()
defer seriesReadPermits.Stop()

opts = opts.SetPermitsOptions(opts.PermitsOptions().
SetSeriesReadPermitsManager(seriesReadPermits))
permitOpts := opts.PermitsOptions().SetSeriesReadPermitsManager(seriesReadPermits)
if cfg.Index.MaxQueryIDsConcurrency != 0 {
permitOpts.SetIndexQueryPermitsManager(permits.NewFixedPermitsManager(int64(cfg.Index.MaxQueryIDsConcurrency)))
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}

opts = opts.SetPermitsOptions(permitOpts)

// Setup postings list cache.
var (
Expand Down Expand Up @@ -524,6 +521,11 @@ func Run(runOpts RunOptions) {
}).
SetMmapReporter(mmapReporter).
SetQueryLimits(queryLimits)

if cfg.Index.MaxResultsPerPermit > 0 {
indexOpts.SetResultsPerPermit(cfg.Index.MaxResultsPerPermit)
}

opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
Expand Down
Loading

0 comments on commit 9abfd17

Please sign in to comment.