Skip to content

Commit

Permalink
Cherry pick with updates: Restrict the time a query can hold an index…
Browse files Browse the repository at this point in the history
… worker (#3269)
  • Loading branch information
robskillington committed Jul 17, 2021
1 parent afb27df commit 25c97e2
Show file tree
Hide file tree
Showing 47 changed files with 4,305 additions and 2,034 deletions.
146 changes: 122 additions & 24 deletions src/cluster/generated/proto/kvpb/kv.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions src/cluster/generated/proto/kvpb/kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ syntax = "proto3";
package kvpb;

message KeyValueUpdate {
string key = 1;
string key = 1;
string value = 2;
bool commit = 3;
bool commit = 3;
}

message KeyValueUpdateResult {
Expand All @@ -34,13 +34,15 @@ message KeyValueUpdateResult {
}

message QueryLimits {
QueryLimit maxRecentlyQueriedSeriesBlocks = 1;
QueryLimit maxRecentlyQueriedSeriesBlocks = 1;
QueryLimit maxRecentlyQueriedSeriesDiskBytesRead = 2;
QueryLimit maxRecentlyQueriedSeriesDiskRead = 3;
QueryLimit maxRecentlyQueriedSeriesDiskRead = 3;
QueryLimit maxRecentlyQueriedMetadataRead = 4;
}

message QueryLimit {
int64 limit = 1;
int64 limit = 1;
int64 lookbackSeconds = 2;
bool forceExceeded = 3;
bool forceExceeded = 3;
bool forceWaited = 4;
}
7 changes: 7 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,13 @@ type IndexConfiguration struct {
// as they are very CPU-intensive (regex and FST matching).
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"`

// MaxWorkerTime is the maximum time a query can hold an index worker at once. If a query does not finish in this
// time it yields the worker and must wait again for another worker to resume. The number of workers available to
// all queries is defined by MaxQueryIDsConcurrency.
// Capping the maximum time per worker ensures a few large queries don't hold all the concurrent workers and lock
// out many small queries from running.
MaxWorkerTime time.Duration `yaml:"maxWorkerTime"`

// RegexpDFALimit is the limit on the max number of states used by a
// regexp deterministic finite automaton. Default is 10,000 states.
RegexpDFALimit *int `yaml:"regexpDFALimit"`
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func TestConfiguration(t *testing.T) {
expected := `db:
index:
maxQueryIDsConcurrency: 0
maxWorkerTime: 0s
regexpDFALimit: null
regexpFSALimit: null
forwardIndexProbability: 0
Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/network/server/tchannelthrift/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
package tchannelthrift

import (
stdctx "context"
"time"

"github.com/m3db/m3/src/x/context"

apachethrift "github.com/apache/thrift/lib/go/thrift"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
xnetcontext "golang.org/x/net/context"
)

const (
Expand All @@ -39,18 +39,19 @@ const (
func RegisterServer(channel *tchannel.Channel, service thrift.TChanServer, contextPool context.Pool) {
server := thrift.NewServer(channel)
server.Register(service, thrift.OptPostResponse(postResponseFn))
server.SetContextFn(func(ctx xnetcontext.Context, method string, headers map[string]string) thrift.Context {
server.SetContextFn(func(ctx stdctx.Context, method string, headers map[string]string) thrift.Context {
xCtx := contextPool.Get()
xCtx.SetGoContext(ctx)
ctxWithValue := xnetcontext.WithValue(ctx, contextKey, xCtx)
ctxWithValue := stdctx.WithValue(ctx, contextKey, xCtx) //nolint: staticcheck
return thrift.WithHeaders(ctxWithValue, headers)
})
}

// NewContext returns a new thrift context and cancel func with embedded M3DB context
func NewContext(timeout time.Duration) (thrift.Context, xnetcontext.CancelFunc) {
func NewContext(timeout time.Duration) (thrift.Context, stdctx.CancelFunc) {
tctx, cancel := thrift.NewContext(timeout)
ctxWithValue := xnetcontext.WithValue(tctx, contextKey, context.NewContext())
xCtx := context.NewWithGoContext(tctx)
ctxWithValue := stdctx.WithValue(tctx, contextKey, xCtx) //nolint: staticcheck
return thrift.WithHeaders(ctxWithValue, nil), cancel
}

Expand All @@ -59,7 +60,7 @@ func Context(ctx thrift.Context) context.Context {
return ctx.Value(contextKey).(context.Context)
}

func postResponseFn(ctx xnetcontext.Context, method string, response apachethrift.TStruct) {
func postResponseFn(ctx stdctx.Context, method string, response apachethrift.TStruct) {
value := ctx.Value(contextKey)
inner := value.(context.Context)
inner.Close()
Expand Down
Loading

0 comments on commit 25c97e2

Please sign in to comment.