Skip to content

Commit

Permalink
[query][coordinator] Add time range limit with M3-Limit-Max-Range to …
Browse files Browse the repository at this point in the history
…restrict time range of a query (#3538)
  • Loading branch information
robskillington authored Jun 8, 2021
1 parent 9fbb3ed commit 6f70579
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
networks:
- backend
image: "m3dbnode_integration:${REVISION}"
volumes:
- "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml"
coordinator01:
expose:
- "7201"
Expand Down
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/prometheus/m3dbnode.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
coordinator: {}
db: {}
36 changes: 36 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,42 @@ function test_query_limits_applied {
echo "Test query returned-series limit - below limit"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Returned-SeriesMetadata: 2" "0.0.0.0:7201/api/v1/label/metadata_test_label/values?match[]=metadata_test_series" | jq -r ".data | length") -eq 2 ]]'

# Test time range limits with query APIs.
query_url="0.0.0.0:7201/api/v1/query_range"
echo "Test query time range limit with coordinator defaults"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)" | jq -r ".data.result | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: false" "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)" | jq -r ".data.result | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)" | jq ."error" | grep "$QUERY_LIMIT_MESSAGE") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${query_url}?query=database_write_tagged_success&step=15&start=0&end=$(date +%s)") = "400" ]]'

# Test time range limits with metadata APIs.
meta_query_url="0.0.0.0:7201/api/v1/label/metadata_test_label/values"
echo "Test query time range limit with coordinator defaults"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)" | jq -r ".data | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: false" "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)" | jq -r ".data | length") -gt 0 ]]'

echo "Test query time range limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)" | jq ."error" | grep "$QUERY_LIMIT_MESSAGE") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Range: 4h" -H "M3-Limit-Require-Exhaustive: true" "${meta_query_url}?match[]=metadata_test_series&start=0&end=$(date +%s)") = "400" ]]'
}

function test_query_limits_global_applied {
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,11 @@ type PerQueryLimitsConfiguration struct {
// service.
MaxFetchedDocs int `yaml:"maxFetchedDocs"`

// MaxFetchedRange limits the time range of index documents matched for any given
// individual storage node per query, before returning result to query
// service.
MaxFetchedRange time.Duration `yaml:"maxFetchedRange"`

// RequireExhaustive results in an error if the query exceeds any limit.
RequireExhaustive *bool `yaml:"requireExhaustive"`
}
Expand All @@ -408,6 +413,7 @@ func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderLimitsOptions() handl
SeriesLimit: int(seriesLimit),
InstanceMultiple: l.InstanceMultiple,
DocsLimit: int(docsLimit),
RangeLimit: l.MaxFetchedRange,
RequireExhaustive: requireExhaustive,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type FetchOptionsBuilderLimitsOptions struct {
SeriesLimit int
InstanceMultiple float32
DocsLimit int
RangeLimit time.Duration
ReturnedSeriesLimit int
ReturnedDatapointsLimit int
ReturnedSeriesMetadataLimit int
Expand Down Expand Up @@ -134,6 +135,36 @@ func ParseLimit(req *http.Request, header, formValue string, defaultLimit int) (
return defaultLimit, nil
}

// ParseDurationLimit parses request limit from either header or query string.
func ParseDurationLimit(
req *http.Request,
header,
formValue string,
defaultLimit time.Duration,
) (time.Duration, error) {
if str := req.Header.Get(header); str != "" {
n, err := time.ParseDuration(str)
if err != nil {
err = fmt.Errorf(
"could not parse duration limit: input=%s, err=%w", str, err)
return 0, err
}
return n, nil
}

if str := req.FormValue(formValue); str != "" {
n, err := time.ParseDuration(str)
if err != nil {
err = fmt.Errorf(
"could not parse duration limit: input=%s, err=%w", str, err)
return 0, err
}
return n, nil
}

return defaultLimit, nil
}

// ParseInstanceMultiple parses request instance multiple from header.
func ParseInstanceMultiple(req *http.Request, defaultValue float32) (float32, error) {
if str := req.Header.Get(headers.LimitInstanceMultipleHeader); str != "" {
Expand Down Expand Up @@ -244,6 +275,14 @@ func (b fetchOptionsBuilder) newFetchOptions(

fetchOpts.DocsLimit = docsLimit

rangeLimit, err := ParseDurationLimit(req, headers.LimitMaxRangeHeader,
"rangeLimit", b.opts.Limits.RangeLimit)
if err != nil {
return nil, nil, err
}

fetchOpts.RangeLimit = rangeLimit

returnedSeriesLimit, err := ParseLimit(req, headers.LimitMaxReturnedSeriesHeader,
"returnedSeriesLimit", b.opts.Limits.ReturnedSeriesLimit)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func TestFetchOptionsBuilder(t *testing.T) {
tests := []struct {
name string
defaultLimit int
defaultRangeLimit time.Duration
defaultRestrictByTag *storage.RestrictByTag
headers map[string]string
query string
expectedLimit int
expectedRangeLimit time.Duration
expectedRestrict *storage.RestrictQueryOptions
expectedLookback *expectedLookback
expectedErr bool
Expand All @@ -76,13 +78,36 @@ func TestFetchOptionsBuilder(t *testing.T) {
expectedLimit: 4242,
},
{
name: "bad header",
name: "bad limit header",
defaultLimit: 42,
headers: map[string]string{
headers.LimitMaxSeriesHeader: "not_a_number",
},
expectedErr: true,
},
{
name: "default range limit with no headers",
defaultRangeLimit: 42 * time.Hour,
headers: map[string]string{},
expectedRangeLimit: 42 * time.Hour,
},
{
name: "range limit with header",
defaultRangeLimit: 42 * time.Hour,
headers: map[string]string{
headers.LimitMaxRangeHeader: "84h",
},
expectedRangeLimit: 84 * time.Hour,
},
{
name: "bad range limit header",
defaultRangeLimit: 42 * time.Hour,
headers: map[string]string{
// Not a parseable time range string.
headers.LimitMaxRangeHeader: "4242",
},
expectedErr: true,
},
{
name: "unaggregated metrics type",
headers: map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis

// aggregate takes a list of series and returns a new series containing the
// value aggregated across the series at each datapoint using the specified function.
// This function can be used with aggregation functionsL average (or avg), avg_zero,
// This function can be used with aggregation functions average (or avg), avg_zero,
// median, sum (or total), min, max, diff, stddev, count,
// range (or rangeOf), multiply & last (or current).
func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) {
Expand Down
61 changes: 54 additions & 7 deletions src/query/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ package storage

import (
"fmt"
"time"

"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)
Expand Down Expand Up @@ -69,17 +71,56 @@ func TagsToIdentTagIterator(tags models.Tags) ident.TagIterator {
}

// FetchOptionsToM3Options converts a set of coordinator options to M3 options.
func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) index.QueryOptions {
func FetchOptionsToM3Options(
fetchOptions *FetchOptions,
fetchQuery *FetchQuery,
) (index.QueryOptions, error) {
start, end, err := convertStartEndWithRangeLimit(fetchQuery.Start,
fetchQuery.End, fetchOptions)
if err != nil {
return index.QueryOptions{}, err
}

return index.QueryOptions{
SeriesLimit: fetchOptions.SeriesLimit,
InstanceMultiple: fetchOptions.InstanceMultiple,
DocsLimit: fetchOptions.DocsLimit,
RequireExhaustive: fetchOptions.RequireExhaustive,
RequireNoWait: fetchOptions.RequireNoWait,
Source: fetchOptions.Source,
StartInclusive: xtime.ToUnixNano(fetchQuery.Start),
EndExclusive: xtime.ToUnixNano(fetchQuery.End),
StartInclusive: xtime.ToUnixNano(start),
EndExclusive: xtime.ToUnixNano(end),
}, nil
}

func convertStartEndWithRangeLimit(
start, end time.Time,
fetchOptions *FetchOptions,
) (time.Time, time.Time, error) {
fetchRangeLimit := fetchOptions.RangeLimit
if fetchRangeLimit <= 0 {
return start, end, nil
}

fetchRange := end.Sub(start)
if fetchRange <= fetchRangeLimit {
return start, end, nil
}

if fetchOptions.RequireExhaustive {
// Fail the query.
msg := fmt.Sprintf("query exceeded limit: require_exhaustive=%v, "+
"range_limit=%s, range_matched=%s",
fetchOptions.RequireExhaustive,
fetchRangeLimit.String(),
fetchRange.String())
err := xerrors.NewInvalidParamsError(consolidators.NewLimitError(msg))
return time.Time{}, time.Time{}, err
}

// Truncate the range.
start = end.Add(-1 * fetchRangeLimit)
return start, end, nil
}

func convertAggregateQueryType(completeNameOnly bool) index.AggregationType {
Expand All @@ -95,20 +136,26 @@ func convertAggregateQueryType(completeNameOnly bool) index.AggregationType {
func FetchOptionsToAggregateOptions(
fetchOptions *FetchOptions,
tagQuery *CompleteTagsQuery,
) index.AggregationOptions {
) (index.AggregationOptions, error) {
start, end, err := convertStartEndWithRangeLimit(tagQuery.Start.ToTime(),
tagQuery.End.ToTime(), fetchOptions)
if err != nil {
return index.AggregationOptions{}, err
}

return index.AggregationOptions{
QueryOptions: index.QueryOptions{
SeriesLimit: fetchOptions.SeriesLimit,
DocsLimit: fetchOptions.DocsLimit,
Source: fetchOptions.Source,
RequireExhaustive: fetchOptions.RequireExhaustive,
RequireNoWait: fetchOptions.RequireNoWait,
StartInclusive: tagQuery.Start,
EndExclusive: tagQuery.End,
StartInclusive: xtime.ToUnixNano(start),
EndExclusive: xtime.ToUnixNano(end),
},
FieldFilter: tagQuery.FilterNameTags,
Type: convertAggregateQueryType(tagQuery.CompleteNameOnly),
}
}, nil
}

// FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query.
Expand Down
Loading

0 comments on commit 6f70579

Please sign in to comment.