Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query][coordinator] Add time range limit with M3-Limit-Max-Range to restrict time range of a query #3538

Merged
merged 14 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
networks:
- backend
image: "m3dbnode_integration:${REVISION}"
volumes:
- "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml"
coordinator01:
expose:
- "7201"
Expand Down
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/prometheus/m3dbnode.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
coordinator: {}
db: {}
36 changes: 36 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,42 @@ function test_query_limits_applied {
echo "Test query returned-series limit - below limit"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Returned-SeriesMetadata: 2" "0.0.0.0:7201/api/v1/label/metadata_test_label/values?match[]=metadata_test_series" | jq -r ".data | length") -eq 2 ]]'

# Test time range limits with query APIs.
query_url="0.0.0.0:7201/api/v1/query_range"
echo "Test query time range limit with coordinator defaults"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)" | jq -r ".data.result | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: false" "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)" | jq -r ".data.result | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)" | jq ."error" | grep "$QUERY_LIMIT_MESSAGE") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)") = "400" ]]'

# Test time range limits with metadata APIs.
meta_query_url="0.0.0.0:7201/api/v1/label/metadata_test_label/values"
echo "Test query time range limit with coordinator defaults"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)" | jq -r ".data | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: false" "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)" | jq -r ".data | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)" | jq ."error" | grep "$QUERY_LIMIT_MESSAGE") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)") = "400" ]]'
}

function test_query_limits_global_applied {
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,11 @@ type PerQueryLimitsConfiguration struct {
// service.
MaxFetchedDocs int `yaml:"maxFetchedDocs"`

// MaxFetchedRange limits the time range of index documents matched for any given
// individual storage node per query, before returning result to query
// service.
MaxFetchedRange time.Duration `yaml:"maxFetchedRange"`

// RequireExhaustive results in an error if the query exceeds any limit.
RequireExhaustive *bool `yaml:"requireExhaustive"`
}
Expand All @@ -408,6 +413,7 @@ func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderLimitsOptions() handl
SeriesLimit: int(seriesLimit),
InstanceMultiple: l.InstanceMultiple,
DocsLimit: int(docsLimit),
RangeLimit: l.MaxFetchedRange,
RequireExhaustive: requireExhaustive,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type FetchOptionsBuilderLimitsOptions struct {
SeriesLimit int
InstanceMultiple float32
DocsLimit int
RangeLimit time.Duration
ReturnedSeriesLimit int
ReturnedDatapointsLimit int
ReturnedSeriesMetadataLimit int
Expand Down Expand Up @@ -134,6 +135,36 @@ func ParseLimit(req *http.Request, header, formValue string, defaultLimit int) (
return defaultLimit, nil
}

// ParseDurationLimit parses request limit from either header or query string.
func ParseDurationLimit(
req *http.Request,
header,
formValue string,
defaultLimit time.Duration,
) (time.Duration, error) {
if str := req.Header.Get(header); str != "" {
n, err := time.ParseDuration(str)
if err != nil {
err = fmt.Errorf(
"could not parse duration limit: input=%s, err=%w", str, err)
return 0, err
}
return n, nil
}

if str := req.FormValue(formValue); str != "" {
n, err := time.ParseDuration(str)
if err != nil {
err = fmt.Errorf(
"could not parse duration limit: input=%s, err=%w", str, err)
return 0, err
}
return n, nil
}

return defaultLimit, nil
}

// ParseInstanceMultiple parses request instance multiple from header.
func ParseInstanceMultiple(req *http.Request, defaultValue float32) (float32, error) {
if str := req.Header.Get(headers.LimitInstanceMultipleHeader); str != "" {
Expand Down Expand Up @@ -244,6 +275,14 @@ func (b fetchOptionsBuilder) newFetchOptions(

fetchOpts.DocsLimit = docsLimit

rangeLimit, err := ParseDurationLimit(req, headers.LimitMaxRangeHeader,
"rangeLimit", b.opts.Limits.RangeLimit)
if err != nil {
return nil, nil, err
}

fetchOpts.RangeLimit = rangeLimit

returnedSeriesLimit, err := ParseLimit(req, headers.LimitMaxReturnedSeriesHeader,
"returnedSeriesLimit", b.opts.Limits.ReturnedSeriesLimit)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func TestFetchOptionsBuilder(t *testing.T) {
tests := []struct {
name string
defaultLimit int
defaultRangeLimit time.Duration
defaultRestrictByTag *storage.RestrictByTag
headers map[string]string
query string
expectedLimit int
expectedRangeLimit time.Duration
expectedRestrict *storage.RestrictQueryOptions
expectedLookback *expectedLookback
expectedErr bool
Expand All @@ -76,13 +78,36 @@ func TestFetchOptionsBuilder(t *testing.T) {
expectedLimit: 4242,
},
{
name: "bad header",
name: "bad limit header",
defaultLimit: 42,
headers: map[string]string{
headers.LimitMaxSeriesHeader: "not_a_number",
},
expectedErr: true,
},
{
name: "default range limit with no headers",
defaultRangeLimit: 42 * time.Hour,
headers: map[string]string{},
expectedRangeLimit: 42 * time.Hour,
},
{
name: "range limit with header",
defaultRangeLimit: 42 * time.Hour,
headers: map[string]string{
headers.LimitMaxRangeHeader: "84h",
},
expectedRangeLimit: 84 * time.Hour,
},
{
name: "bad range limit header",
defaultRangeLimit: 42 * time.Hour,
headers: map[string]string{
// Not a parseable time range string.
headers.LimitMaxRangeHeader: "4242",
},
expectedErr: true,
},
{
name: "unaggregated metrics type",
headers: map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis

// aggregate takes a list of series and returns a new series containing the
// value aggregated across the series at each datapoint using the specified function.
// This function can be used with aggregation functionsL average (or avg), avg_zero,
// This function can be used with aggregation functions average (or avg), avg_zero,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, how did you even find this heh

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had touched this file by accident and linter had gone off, then I reverted but the fix stayed. Good old linter hah.

// median, sum (or total), min, max, diff, stddev, count,
// range (or rangeOf), multiply & last (or current).
func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) {
Expand Down
61 changes: 54 additions & 7 deletions src/query/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ package storage

import (
"fmt"
"time"

"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)
Expand Down Expand Up @@ -69,17 +71,56 @@ func TagsToIdentTagIterator(tags models.Tags) ident.TagIterator {
}

// FetchOptionsToM3Options converts a set of coordinator options to M3 options.
func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) index.QueryOptions {
func FetchOptionsToM3Options(
fetchOptions *FetchOptions,
fetchQuery *FetchQuery,
) (index.QueryOptions, error) {
start, end, err := convertStartEndWithRangeLimit(fetchQuery.Start,
fetchQuery.End, fetchOptions)
if err != nil {
return index.QueryOptions{}, err
}

return index.QueryOptions{
SeriesLimit: fetchOptions.SeriesLimit,
InstanceMultiple: fetchOptions.InstanceMultiple,
DocsLimit: fetchOptions.DocsLimit,
RequireExhaustive: fetchOptions.RequireExhaustive,
RequireNoWait: fetchOptions.RequireNoWait,
Source: fetchOptions.Source,
StartInclusive: xtime.ToUnixNano(fetchQuery.Start),
EndExclusive: xtime.ToUnixNano(fetchQuery.End),
StartInclusive: xtime.ToUnixNano(start),
EndExclusive: xtime.ToUnixNano(end),
}, nil
}

func convertStartEndWithRangeLimit(
start, end time.Time,
fetchOptions *FetchOptions,
) (time.Time, time.Time, error) {
fetchRangeLimit := fetchOptions.RangeLimit
if fetchRangeLimit <= 0 {
return start, end, nil
}

fetchRange := end.Sub(start)
if fetchRange <= fetchRangeLimit {
return start, end, nil
}

if fetchOptions.RequireExhaustive {
// Fail the query.
msg := fmt.Sprintf("query exceeded limit: require_exhaustive=%v, "+
"range_limit=%s, range_matched=%s",
fetchOptions.RequireExhaustive,
fetchRangeLimit.String(),
fetchRange.String())
err := xerrors.NewInvalidParamsError(consolidators.NewLimitError(msg))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, should this emit a metric too to match the other limit exceeded errors?

return time.Time{}, time.Time{}, err
}

// Truncate the range.
start = end.Add(-1 * fetchRangeLimit)
return start, end, nil
}

func convertAggregateQueryType(completeNameOnly bool) index.AggregationType {
Expand All @@ -95,20 +136,26 @@ func convertAggregateQueryType(completeNameOnly bool) index.AggregationType {
func FetchOptionsToAggregateOptions(
fetchOptions *FetchOptions,
tagQuery *CompleteTagsQuery,
) index.AggregationOptions {
) (index.AggregationOptions, error) {
start, end, err := convertStartEndWithRangeLimit(tagQuery.Start.ToTime(),
tagQuery.End.ToTime(), fetchOptions)
if err != nil {
return index.AggregationOptions{}, err
}

return index.AggregationOptions{
QueryOptions: index.QueryOptions{
SeriesLimit: fetchOptions.SeriesLimit,
DocsLimit: fetchOptions.DocsLimit,
Source: fetchOptions.Source,
RequireExhaustive: fetchOptions.RequireExhaustive,
RequireNoWait: fetchOptions.RequireNoWait,
StartInclusive: tagQuery.Start,
EndExclusive: tagQuery.End,
StartInclusive: xtime.ToUnixNano(start),
EndExclusive: xtime.ToUnixNano(end),
},
FieldFilter: tagQuery.FilterNameTags,
Type: convertAggregateQueryType(tagQuery.CompleteNameOnly),
}
}, nil
}

// FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query.
Expand Down
Loading