Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict the number of results processed per index worker #3269

Merged
merged 26 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9abfd17
Restrict the number of results processed per index worker
ryanhall07 Feb 23, 2021
b4b543e
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 23, 2021
7c11e7d
mock gen
ryanhall07 Feb 23, 2021
c8e2e40
use channels for fixed permits
ryanhall07 Feb 23, 2021
2fa777b
use TryAcquire to limit # of go routines
ryanhall07 Feb 24, 2021
ae102f5
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 24, 2021
4581f11
Just use Acquire to get permits
ryanhall07 Feb 24, 2021
2107c97
review comments
ryanhall07 Feb 25, 2021
a22b186
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
13cb785
fix tests
ryanhall07 Feb 25, 2021
d5b6680
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
2d24354
fix config test
ryanhall07 Feb 25, 2021
ae5ac98
Merge remote-tracking branch 'origin/master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
ae900ca
Limit block iters by time
ryanhall07 Feb 25, 2021
1f9ebbd
fix tests
ryanhall07 Feb 25, 2021
b9cf04c
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 25, 2021
92c93e3
remove ctx from Release
ryanhall07 Feb 26, 2021
ad52f6f
Merge branch 'rhall-worker-pool-iter' of github.com:m3db/m3 into rhal…
ryanhall07 Feb 26, 2021
a3c33af
permit quota is int64
ryanhall07 Feb 26, 2021
53c5a48
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 26, 2021
dd0678a
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 27, 2021
1a563c3
default max worker time to 1s
ryanhall07 Feb 27, 2021
c08b57a
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Feb 28, 2021
b7ab449
Merge branch 'master' into rhall-worker-pool-iter
ryanhall07 Mar 1, 2021
5b9b982
Remove unused MaxResultsPerWorkerConfiguration
ryanhall07 Mar 1, 2021
1337a6d
Merge branch 'rhall-worker-pool-iter' of github.com:m3db/m3 into rhal…
ryanhall07 Mar 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ type IndexConfiguration struct {
// as they are very CPU-intensive (regex and FST matching).
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"`

// MaxWorkerTime is the maximum time a query can hold an index worker at once. If a query does not finish in this
// time it yields the worker and must wait again for another worker to resume. The number of workers available to
// all queries is defined by MaxQueryIDsConcurrency.
// Capping the maximum time per worker ensures a few large queries don't hold all the concurrent workers and lock
// out many small queries from running.
MaxWorkerTime time.Duration `yaml:"maxWorkerTime"`

// RegexpDFALimit is the limit on the max number of states used by a
// regexp deterministic finite automaton. Default is 10,000 states.
RegexpDFALimit *int `yaml:"regexpDFALimit"`
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func TestConfiguration(t *testing.T) {
expected := `db:
index:
maxQueryIDsConcurrency: 0
maxWorkerTime: 0s
regexpDFALimit: null
regexpFSALimit: null
forwardIndexProbability: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *fakePermits) TryAcquire(_ context.Context) (bool, error) {
return true, nil
}

func (p *fakePermits) Release() {
func (p *fakePermits) Release(_ int64) {
p.released++
p.available++
}
5 changes: 3 additions & 2 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,9 +1092,10 @@ func (i *fetchTaggedResultsIter) Close(err error) {

i.seriesBlocks.RecordValue(float64(i.totalSeriesBlocks))

for n := 0; n < i.batchesAcquired; n++ {
i.blockPermits.Release()
for n := 0; n < i.batchesAcquired-1; n++ {
i.blockPermits.Release(int64(i.blocksPerBatch))
}
i.blockPermits.Release(int64(i.blocksPerBatch - i.blocksAvailable))
}

// IDResult is the FetchTagged result for a series ID.
Expand Down
26 changes: 14 additions & 12 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ import (
xos "github.com/m3db/m3/src/x/os"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"

apachethrift "github.com/apache/thrift/lib/go/thrift"
"github.com/m3dbx/vellum/levenshtein"
"github.com/m3dbx/vellum/levenshtein2"
"github.com/m3dbx/vellum/regexp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -371,14 +370,6 @@ func Run(runOpts RunOptions) {

opentracing.SetGlobalTracer(tracer)

if cfg.Index.MaxQueryIDsConcurrency != 0 {
queryIDsWorkerPool := xsync.NewWorkerPool(cfg.Index.MaxQueryIDsConcurrency)
queryIDsWorkerPool.Init()
opts = opts.SetQueryIDsWorkerPool(queryIDsWorkerPool)
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}

// Set global index options.
if n := cfg.Index.RegexpDFALimitOrDefault(); n > 0 {
regexp.SetStateLimit(n)
Expand Down Expand Up @@ -481,8 +472,14 @@ func Run(runOpts RunOptions) {
seriesReadPermits.Start()
defer seriesReadPermits.Stop()

opts = opts.SetPermitsOptions(opts.PermitsOptions().
SetSeriesReadPermitsManager(seriesReadPermits))
permitOptions := opts.PermitsOptions().SetSeriesReadPermitsManager(seriesReadPermits)
if cfg.Index.MaxQueryIDsConcurrency != 0 {
permitOptions = permitOptions.SetIndexQueryPermitsManager(
permits.NewFixedPermitsManager(cfg.Index.MaxQueryIDsConcurrency))
} else {
logger.Warn("max index query IDs concurrency was not set, falling back to default value")
}
opts = opts.SetPermitsOptions(permitOptions)

// Setup postings list cache.
var (
Expand Down Expand Up @@ -524,6 +521,11 @@ func Run(runOpts RunOptions) {
}).
SetMmapReporter(mmapReporter).
SetQueryLimits(queryLimits)

if cfg.Index.MaxWorkerTime > 0 {
indexOpts = indexOpts.SetMaxWorkerTime(cfg.Index.MaxWorkerTime)
}

opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
Expand Down
Loading