Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Re-add aggregator doc limit update #3137

Merged
merged 12 commits into from
Feb 9, 2021
47 changes: 44 additions & 3 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ function test_query_limits_applied {
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 3" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/query?query=database_write_tagged_success) = "400" ]]'

# Test the default docs limit applied when directly querying
# Test the docs limit applied when directly querying
# coordinator (docs limit set by header)
echo "Test query docs limit with coordinator limit header"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
Expand Down Expand Up @@ -391,6 +391,46 @@ function test_series {
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]'
}

function test_label_query_limits_applied {
# Test that require exhaustive does nothing if limits are not hit
echo "Test label limits with require-exhaustive headers true (below limit therefore no error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]'

# the header takes precedence over the configured default series limit
echo "Test label series limit with coordinator limit header (default requires exhaustive so error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'

echo "Test label series limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]'

echo "Test label series limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]'

echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'

echo "Test label docs limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Docs: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]'

echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]'
}

function test_labels {
TAG_NAME_0="name_0" TAG_VALUE_0="value_0_1" \
TAG_NAME_1="name_1" TAG_VALUE_1="value_1_1" \
Expand Down Expand Up @@ -449,8 +489,9 @@ test_query_restrict_metrics_type
test_prometheus_query_native_timeout
test_query_restrict_tags
test_prometheus_remote_write_map_tags
test_series
test_labels
test_series
test_label_query_limits_applied
test_labels

echo "Running function correctness tests"
test_correctness
Expand Down
36 changes: 29 additions & 7 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,9 +1404,10 @@ func (i *nsIndex) AggregateQuery(
query index.Query,
opts index.AggregationOptions,
) (index.AggregateQueryResult, error) {
id := i.nsMetadata.ID()
logFields := []opentracinglog.Field{
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", i.nsMetadata.ID().String()),
opentracinglog.String("namespace", id.String()),
opentracinglog.Int("seriesLimit", opts.SeriesLimit),
opentracinglog.Int("docsLimit", opts.DocsLimit),
xopentracing.Time("queryStart", opts.StartInclusive),
Expand All @@ -1417,12 +1418,15 @@ func (i *nsIndex) AggregateQuery(
sp.LogFields(logFields...)
defer sp.Finish()

metrics := index.NewAggregateUsageMetrics(id, i.opts.InstrumentOptions())
// Get results and set the filters, namespace ID and size limit.
results := i.aggregateResultsPool.Get()
aopts := index.AggregateResultsOptions{
SizeLimit: opts.SeriesLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
SizeLimit: opts.SeriesLimit,
DocsLimit: opts.DocsLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
AggregateUsageMetrics: metrics,
}
ctx.RegisterFinalizer(results)
// use appropriate fn to query underlying blocks.
Expand All @@ -1441,7 +1445,7 @@ func (i *nsIndex) AggregateQuery(
}
}
aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe()
results.Reset(i.nsMetadata.ID(), aopts)
results.Reset(id, aopts)
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields)
if err != nil {
return index.AggregateQueryResult{}, err
Expand Down Expand Up @@ -1706,7 +1710,16 @@ func (i *nsIndex) execBlockQueryFn(
sp.LogFields(logFields...)
defer sp.Finish()

blockExhaustive, err := block.Query(ctx, cancellable, query, opts, results, logFields)
docResults, ok := results.(index.DocumentResults)
if !ok { // should never happen
state.Lock()
err := fmt.Errorf("unknown results type [%T] received during query", results)
state.multiErr = state.multiErr.Add(err)
state.Unlock()
return
}

blockExhaustive, err := block.Query(ctx, cancellable, query, opts, docResults, logFields)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
Expand Down Expand Up @@ -1744,7 +1757,16 @@ func (i *nsIndex) execBlockWideQueryFn(
sp.LogFields(logFields...)
defer sp.Finish()

_, err := block.Query(ctx, cancellable, query, opts, results, logFields)
docResults, ok := results.(index.DocumentResults)
if !ok { // should never happen
state.Lock()
err := fmt.Errorf("unknown results type [%T] received during wide query", results)
state.multiErr = state.multiErr.Add(err)
state.Unlock()
return
}

_, err := block.Query(ctx, cancellable, query, opts, docResults, logFields)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
Expand Down
Loading