Skip to content

Commit

Permalink
Revert "Remove disk series read limit (#3174)" (#3193)
Browse files Browse the repository at this point in the history
* Revert "Remove disk series read limit (#3174)"

This reverts commit 267d654.

* Remove unnecessary blocksReadLimit pointer on idResult
  • Loading branch information
wesleyk authored Feb 8, 2021
1 parent fa20e1d commit 027b66b
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 29 deletions.
28 changes: 27 additions & 1 deletion site/content/operational_guide/resource_limits.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ per second safely with your deployment and you want to use the default lookback
of `15s` then you would multiply 10,000 by 15 to get 150,000 as a max value with
a 15s lookback.

The third limit `maxRecentlyQueriedSeriesDiskRead` caps the series IDs matched by incoming
queries. This originally was distinct from the limit `maxRecentlyQueriedSeriesBlocks`, which
also limits the memory cost of specific series matched, because of an inefficiency
in how allocations would occur even for series known to not be present on disk for a given
shard. This inefficiency has been resolved https://github.com/m3db/m3/pull/3103 and therefore
this limit should be tracking memory cost linearly relative to `maxRecentlyQueriedSeriesBlocks`.
It is recommended to defer to using `maxRecentlyQueriedSeriesBlocks` over
`maxRecentlyQueriedSeriesDiskRead` given both should cap the resources similarly.

### Annotated configuration

```yaml
Expand Down Expand Up @@ -82,6 +91,18 @@ limits:
# and read until the lookback period resets.
lookback: 15s

# If set, will enforce a maximum on the series read from disk.
# This limit can be used to ensure queries that match an extremely high
# volume of series can be limited before even reading the underlying series data from disk.
maxRecentlyQueriedSeriesDiskRead:
# Value sets the maximum number of series read from disk.
value: 0
# Lookback sets the time window that this limit is enforced over, every
# lookback period the global count is reset to zero and when the limit
# is reached it will reject any further time series blocks being matched
# and read until the lookback period resets.
lookback: 15s

# If set then will limit the number of parallel write batch requests to the
# database and return errors if hit.
maxOutstandingWriteRequests: 0
Expand Down Expand Up @@ -112,6 +133,11 @@ curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{
"lookbackSeconds":15,
"forceExceeded":false
},
"maxRecentlyQueriedSeriesDiskRead": {
"limit":0,
"lookbackSeconds":15,
"forceExceeded":false
}
},
"commit":true
}'
Expand Down Expand Up @@ -190,4 +216,4 @@ limits:
The following headers can also be used to override configured limits on a per request basis (to allow for different limits dependent on caller):


{{% fileinclude file="headers_optional_read_limits.md" %}}
{{% fileinclude file="headers_optional_read_limits.md" %}}
102 changes: 79 additions & 23 deletions src/cluster/generated/proto/kvpb/kv.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/cluster/generated/proto/kvpb/kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ message KeyValueUpdateResult {
message QueryLimits {
QueryLimit maxRecentlyQueriedSeriesBlocks = 1;
QueryLimit maxRecentlyQueriedSeriesDiskBytesRead = 2;
reserved 3;
QueryLimit maxRecentlyQueriedSeriesDiskRead = 3;
}

message QueryLimit {
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func TestConfiguration(t *testing.T) {
meta_event_reporting_enabled: false
limits:
maxRecentlyQueriedSeriesDiskBytesRead: null
maxRecentlyQueriedSeriesDiskRead: null
maxRecentlyQueriedSeriesBlocks: null
maxOutstandingWriteRequests: 0
maxOutstandingReadRequests: 0
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/services/m3dbnode/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type LimitsConfiguration struct {
// max is surpassed encounter an error.
MaxRecentlyQueriedSeriesDiskBytesRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskBytesRead"`

// MaxRecentlyQueriedSeriesDiskRead sets the upper limit on time series read from disk within a given lookback
// period. Queries which are issued while this max is surpassed encounter an error.
// This is the number of time series, which is different from the number of bytes controlled by
// MaxRecentlyQueriedSeriesDiskBytesRead.
MaxRecentlyQueriedSeriesDiskRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskRead"`

// MaxRecentlyQueriedSeriesBlocks sets the upper limit on time series blocks
// count within a given lookback period. Queries which are issued while this
// max is surpassed encounter an error.
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ func (s *service) fetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedReque
fetchStart: startTime,
dataReadMetrics: s.metrics.queryTimingDataRead,
totalMetrics: s.metrics.queryTimingFetchTagged,
blocksReadLimit: s.queryLimits.DiskSeriesReadLimit(),
}), nil
}

Expand Down Expand Up @@ -903,6 +904,7 @@ type fetchTaggedResultsIterOpts struct {
totalDocsCount int
dataReadMetrics index.QueryMetrics
totalMetrics index.QueryMetrics
blocksReadLimit limits.LookbackLimit
}

func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic
Expand Down Expand Up @@ -964,6 +966,10 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
for blockIter.Next(ctx) {
curr := blockIter.Current()
blockReaders = append(blockReaders, curr)
if err := i.blocksReadLimit.Inc(len(blockReaders), nil); err != nil {
i.err = err
return false
}
}
if blockIter.Err() != nil {
i.err = blockIter.Err()
Expand Down
11 changes: 10 additions & 1 deletion src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,11 @@ func TestServiceFetchTagged(t *testing.T) {
{
name: "happy path",
},
{
name: "block read limit",
blocksReadLimit: 1,
fetchErrMsg: "query aborted due to limit",
},
}

for _, tc := range testCases {
Expand All @@ -1572,7 +1577,11 @@ func TestServiceFetchTagged(t *testing.T) {
queryLimits, err := limits.NewQueryLimits(limits.NewOptions().
SetInstrumentOptions(testTChannelThriftOptions.InstrumentOptions()).
SetBytesReadLimitOpts(limits.DefaultLookbackLimitOptions()).
SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()))
SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()).
SetDiskSeriesReadLimitOpts(limits.LookbackLimitOptions{
Limit: tc.blocksReadLimit,
Lookback: time.Second * 1,
}))
require.NoError(t, err)
testTChannelThriftOptions = testTChannelThriftOptions.SetQueryLimits(queryLimits)

Expand Down
14 changes: 14 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ func Run(runOpts RunOptions) {
// Setup query stats tracking.
docsLimit := limits.DefaultLookbackLimitOptions()
bytesReadLimit := limits.DefaultLookbackLimitOptions()
diskSeriesReadLimit := limits.DefaultLookbackLimitOptions()
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil {
docsLimit.Limit = limitConfig.Value
docsLimit.Lookback = limitConfig.Lookback
Expand All @@ -459,9 +460,14 @@ func Run(runOpts RunOptions) {
bytesReadLimit.Limit = limitConfig.Value
bytesReadLimit.Lookback = limitConfig.Lookback
}
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskRead; limitConfig != nil {
diskSeriesReadLimit.Limit = limitConfig.Value
diskSeriesReadLimit.Lookback = limitConfig.Lookback
}
limitOpts := limits.NewOptions().
SetDocsLimitOpts(docsLimit).
SetBytesReadLimitOpts(bytesReadLimit).
SetDiskSeriesReadLimitOpts(diskSeriesReadLimit).
SetInstrumentOptions(iOpts)
if builder := opts.SourceLoggerBuilder(); builder != nil {
limitOpts = limitOpts.SetSourceLoggerBuilder(builder)
Expand Down Expand Up @@ -1208,12 +1214,16 @@ func updateQueryLimits(logger *zap.Logger,
// Default to the config-based limits if unset in dynamic limits.
// Otherwise, use the dynamic limit.
docsLimitOpts = configOpts.DocsLimitOpts()
diskSeriesReadLimitOpts = configOpts.DiskSeriesReadLimitOpts()
bytesReadLimitOpts = configOpts.BytesReadLimitOpts()
)
if dynamicOpts != nil {
if dynamicOpts.MaxRecentlyQueriedSeriesBlocks != nil {
docsLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedSeriesBlocks)
}
if dynamicOpts.MaxRecentlyQueriedSeriesDiskRead != nil {
diskSeriesReadLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedSeriesDiskRead)
}
if dynamicOpts.MaxRecentlyQueriedSeriesDiskBytesRead != nil {
bytesReadLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedSeriesDiskBytesRead)
}
Expand All @@ -1223,6 +1233,10 @@ func updateQueryLimits(logger *zap.Logger,
logger.Error("error updating docs limit", zap.Error(err))
}

if err := updateQueryLimit(queryLimits.DiskSeriesReadLimit(), diskSeriesReadLimitOpts); err != nil {
logger.Error("error updating series read limit", zap.Error(err))
}

if err := updateQueryLimit(queryLimits.BytesReadLimit(), bytesReadLimitOpts); err != nil {
logger.Error("error updating bytes read limit", zap.Error(err))
}
Expand Down
14 changes: 14 additions & 0 deletions src/dbnode/storage/limits/query_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
type queryLimits struct {
docsLimit *lookbackLimit
bytesReadLimit *lookbackLimit
seriesDiskReadLimit *lookbackLimit
}

type lookbackLimit struct {
Expand Down Expand Up @@ -89,17 +90,21 @@ func NewQueryLimits(options Options) (QueryLimits, error) {
iOpts = options.InstrumentOptions()
docsLimitOpts = options.DocsLimitOpts()
bytesReadLimitOpts = options.BytesReadLimitOpts()
diskSeriesReadLimitOpts = options.DiskSeriesReadLimitOpts()
sourceLoggerBuilder = options.SourceLoggerBuilder()

docsLimit = newLookbackLimit(
iOpts, docsLimitOpts, "docs-matched", sourceLoggerBuilder)
bytesReadLimit = newLookbackLimit(
iOpts, bytesReadLimitOpts, "disk-bytes-read", sourceLoggerBuilder)
seriesDiskReadLimit = newLookbackLimit(
iOpts, diskSeriesReadLimitOpts, "disk-series-read", sourceLoggerBuilder)
)

return &queryLimits{
docsLimit: docsLimit,
bytesReadLimit: bytesReadLimit,
seriesDiskReadLimit: seriesDiskReadLimit,
}, nil
}

Expand Down Expand Up @@ -152,24 +157,33 @@ func (q *queryLimits) BytesReadLimit() LookbackLimit {
return q.bytesReadLimit
}

func (q *queryLimits) DiskSeriesReadLimit() LookbackLimit {
return q.seriesDiskReadLimit
}

func (q *queryLimits) Start() {
// Lock on explicit start to avoid any collision with asynchronous updating
// which will call stop/start if the lookback has changed.
q.docsLimit.startWithLock()
q.seriesDiskReadLimit.startWithLock()
q.bytesReadLimit.startWithLock()
}

func (q *queryLimits) Stop() {
// Lock on explicit stop to avoid any collision with asynchronous updating
// which will call stop/start if the lookback has changed.
q.docsLimit.stopWithLock()
q.seriesDiskReadLimit.stopWithLock()
q.bytesReadLimit.stopWithLock()
}

func (q *queryLimits) AnyExceeded() error {
if err := q.docsLimit.exceeded(); err != nil {
return err
}
if err := q.seriesDiskReadLimit.exceeded(); err != nil {
return err
}
return q.bytesReadLimit.exceeded()
}

Expand Down
Loading

0 comments on commit 027b66b

Please sign in to comment.