Skip to content

Commit

Permalink
[dbnode] Re-add aggregator doc limit update (#3137)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Feb 9, 2021
1 parent ff99f05 commit b44bc36
Show file tree
Hide file tree
Showing 10 changed files with 913 additions and 700 deletions.
47 changes: 44 additions & 3 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ function test_query_limits_applied {
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 3" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/query?query=database_write_tagged_success) = "400" ]]'

# Test the default docs limit applied when directly querying
# Test the docs limit applied when directly querying
# coordinator (docs limit set by header)
echo "Test query docs limit with coordinator limit header"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
Expand Down Expand Up @@ -391,6 +391,46 @@ function test_series {
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]'
}

function test_label_query_limits_applied {
# Test that require exhaustive does nothing if limits are not hit
echo "Test label limits with require-exhaustive headers true (below limit therefore no error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]'

# the header takes precedence over the configured default series limit
echo "Test label series limit with coordinator limit header (default requires exhaustive so error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'

echo "Test label series limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]'

echo "Test label series limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]'

echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'

echo "Test label docs limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Docs: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]'

echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]'
}

function test_labels {
TAG_NAME_0="name_0" TAG_VALUE_0="value_0_1" \
TAG_NAME_1="name_1" TAG_VALUE_1="value_1_1" \
Expand Down Expand Up @@ -449,8 +489,9 @@ test_query_restrict_metrics_type
test_prometheus_query_native_timeout
test_query_restrict_tags
test_prometheus_remote_write_map_tags
test_series
test_labels
test_series
test_label_query_limits_applied
test_labels

echo "Running function correctness tests"
test_correctness
Expand Down
36 changes: 29 additions & 7 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,9 +1404,10 @@ func (i *nsIndex) AggregateQuery(
query index.Query,
opts index.AggregationOptions,
) (index.AggregateQueryResult, error) {
id := i.nsMetadata.ID()
logFields := []opentracinglog.Field{
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", i.nsMetadata.ID().String()),
opentracinglog.String("namespace", id.String()),
opentracinglog.Int("seriesLimit", opts.SeriesLimit),
opentracinglog.Int("docsLimit", opts.DocsLimit),
xopentracing.Time("queryStart", opts.StartInclusive),
Expand All @@ -1417,12 +1418,15 @@ func (i *nsIndex) AggregateQuery(
sp.LogFields(logFields...)
defer sp.Finish()

metrics := index.NewAggregateUsageMetrics(id, i.opts.InstrumentOptions())
// Get results and set the filters, namespace ID and size limit.
results := i.aggregateResultsPool.Get()
aopts := index.AggregateResultsOptions{
SizeLimit: opts.SeriesLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
SizeLimit: opts.SeriesLimit,
DocsLimit: opts.DocsLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
AggregateUsageMetrics: metrics,
}
ctx.RegisterFinalizer(results)
// use appropriate fn to query underlying blocks.
Expand All @@ -1441,7 +1445,7 @@ func (i *nsIndex) AggregateQuery(
}
}
aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe()
results.Reset(i.nsMetadata.ID(), aopts)
results.Reset(id, aopts)
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields)
if err != nil {
return index.AggregateQueryResult{}, err
Expand Down Expand Up @@ -1706,7 +1710,16 @@ func (i *nsIndex) execBlockQueryFn(
sp.LogFields(logFields...)
defer sp.Finish()

blockExhaustive, err := block.Query(ctx, cancellable, query, opts, results, logFields)
docResults, ok := results.(index.DocumentResults)
if !ok { // should never happen
state.Lock()
err := fmt.Errorf("unknown results type [%T] received during query", results)
state.multiErr = state.multiErr.Add(err)
state.Unlock()
return
}

blockExhaustive, err := block.Query(ctx, cancellable, query, opts, docResults, logFields)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
Expand Down Expand Up @@ -1744,7 +1757,16 @@ func (i *nsIndex) execBlockWideQueryFn(
sp.LogFields(logFields...)
defer sp.Finish()

_, err := block.Query(ctx, cancellable, query, opts, results, logFields)
docResults, ok := results.(index.DocumentResults)
if !ok { // should never happen
state.Lock()
err := fmt.Errorf("unknown results type [%T] received during wide query", results)
state.multiErr = state.multiErr.Add(err)
state.Unlock()
return
}

_, err := block.Query(ctx, cancellable, query, opts, docResults, logFields)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
Expand Down
Loading

0 comments on commit b44bc36

Please sign in to comment.