Skip to content

Commit

Permalink
[dbnode] Add ability to set aggregate limits (#3216)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Feb 17, 2021
1 parent e31d6dc commit 1778b9f
Show file tree
Hide file tree
Showing 20 changed files with 475 additions and 106 deletions.
87 changes: 87 additions & 0 deletions scripts/docker-integration-tests/prometheus/metadata-limits.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env bash

set -ex
M3_PATH=${M3_PATH:-$GOPATH/src/github.com/m3db/m3}
source "$M3_PATH"/scripts/docker-integration-tests/common.sh

COMPOSE_FILE="$M3_PATH"/scripts/docker-integration-tests/query_fanout/docker-compose.yml
HEADER_FILE=headers.out

function write_agg_metrics {
NUM=$1
echo "Writing $NUM metrics to [0.0.0.0:9003]"
set +x
for (( i=0; i<$NUM; i++ ))
do
curl -s -X POST 0.0.0.0:9003/writetagged -d '{
"namespace": "unagg",
"id": "{__name__=\"'$METRIC_NAME'\",'$METRIC_NAME'=\"'$i'\"}",
"tags": [
{
"name": "__name__",
"value": "'$METRIC_NAME'"
},
{
"name": "'$METRIC_NAME'",
"value": "'$i'"
}
],
"datapoint": {
"timestamp":'"$NOW"',
"value": '$i'
}
}' &> /dev/null
done

set -x
}

function test_correct_label_values {
RESULT=$(curl "http://0.0.0.0:7201/api/v1/label/${METRIC_NAME}/values" )
COUNT=$(echo $RESULT | jq .data[] | wc -l)
test $COUNT = 60
}

function test_failing_label_values {
RESULT=$(curl "http://0.0.0.0:7201/api/v1/label/${METRIC_NAME}/values" )
STATUS=$(echo $RESULT | jq .status)
test $STATUS = '"error"'
}

function test_query_succeeds {
RESULT=$(curl "http://0.0.0.0:7201/api/v1/query?query=sum($METRIC_NAME)&start=$NOW")
STATUS=$(echo $RESULT | jq .status)
test $STATUS = '"success"'
}

function test_global_aggregate_limits {
export NOW=$(date +"%s")
export METRIC_NAME="aggregate_limits_$NOW"

write_agg_metrics 60
curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{
"key": "m3db.query.limits",
"value":{
"maxRecentlyQueriedMetadataRead": {
"limit":150,
"lookbackSeconds":5,
"forceExceeded":false
}
},
"commit":true
}'

# Make sure any existing limit has expired before continuing.
ATTEMPTS=5 retry_with_backoff test_correct_label_values
ATTEMPTS=5 retry_with_backoff test_correct_label_values
ATTEMPTS=5 TIMEOUT=1 retry_with_backoff test_failing_label_values
# Make sure that a query is unaffected by the the metadata limits.
ATTEMPTS=2 retry_with_backoff test_query_succeeds
# Make sure the limit expires within 10 seconds and the query succeeds again.
ATTEMPTS=10 retry_with_backoff test_correct_label_values
curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{
"key": "m3db.query.limits",
"value":{},
"commit":true
}'
}
12 changes: 9 additions & 3 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

set -xe

source "$M3_PATH"/scripts/docker-integration-tests/common.sh
source "$M3_PATH"/scripts/docker-integration-tests/prometheus/test-correctness.sh
M3_PATH=${M3_PATH:-$GOPATH/src/github.com/m3db/m3}
TESTDIR="$M3_PATH"/scripts/docker-integration-tests/
source "$TESTDIR"/common.sh
source "$TESTDIR"/prometheus/test-correctness.sh
source "$TESTDIR"/prometheus/metadata-limits.sh
REVISION=$(git rev-parse HEAD)
COMPOSE_FILE="$M3_PATH"/scripts/docker-integration-tests/prometheus/docker-compose.yml
COMPOSE_FILE="$TESTDIR"/prometheus/docker-compose.yml
# quay.io/m3db/prometheus_remote_client_golang @ v0.4.3
PROMREMOTECLI_IMAGE=quay.io/m3db/prometheus_remote_client_golang@sha256:fc56df819bff9a5a087484804acf3a584dd4a78c68900c31a28896ed66ca7e7b
JQ_IMAGE=realguess/jq:1.4@sha256:300c5d9fb1d74154248d155ce182e207cf6630acccbaadd0168e18b15bfaa786
Expand Down Expand Up @@ -496,4 +499,7 @@ test_labels
echo "Running function correctness tests"
test_correctness

echo "Running aggregate limit tests"
test_global_aggregate_limits

TEST_SUCCESS=true
16 changes: 16 additions & 0 deletions site/content/operational_guide/resource_limits.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ limits:
# and read until the lookback period resets.
lookback: 15s

# If set, will enforce a maximum number of fields and terms matched for
# metadata queries, e.g. label/<foo>/values
maxRecentlyQueriedMetadataRead:
# Value sets the maximum number of fields and terms matched.
value: 0
# Lookback sets the time window that this limit is enforced over, every
# lookback period the global count is reset to zero and when the limit
# is reached it will reject any further time series blocks being matched
# and read until the lookback period resets.
lookback: 15s

# If set then will limit the number of parallel write batch requests to the
# database and return errors if hit.
maxOutstandingWriteRequests: 0
Expand Down Expand Up @@ -137,6 +148,11 @@ curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{
"limit":0,
"lookbackSeconds":15,
"forceExceeded":false
},
"maxRecentlyQueriedMetadataRead": {
"limit":0,
"lookbackSeconds":15,
"forceExceeded":false
}
},
"commit":true
Expand Down
104 changes: 80 additions & 24 deletions src/cluster/generated/proto/kvpb/kv.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions src/cluster/generated/proto/kvpb/kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ syntax = "proto3";
package kvpb;

message KeyValueUpdate {
string key = 1;
string key = 1;
string value = 2;
bool commit = 3;
bool commit = 3;
}

message KeyValueUpdateResult {
Expand All @@ -34,13 +34,14 @@ message KeyValueUpdateResult {
}

message QueryLimits {
QueryLimit maxRecentlyQueriedSeriesBlocks = 1;
QueryLimit maxRecentlyQueriedSeriesBlocks = 1;
QueryLimit maxRecentlyQueriedSeriesDiskBytesRead = 2;
QueryLimit maxRecentlyQueriedSeriesDiskRead = 3;
QueryLimit maxRecentlyQueriedSeriesDiskRead = 3;
QueryLimit maxRecentlyQueriedMetadataRead = 4;
}

message QueryLimit {
int64 limit = 1;
int64 limit = 1;
int64 lookbackSeconds = 2;
bool forceExceeded = 3;
bool forceExceeded = 3;
}
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ func TestConfiguration(t *testing.T) {
maxRecentlyQueriedSeriesDiskBytesRead: null
maxRecentlyQueriedSeriesDiskRead: null
maxRecentlyQueriedSeriesBlocks: null
maxRecentlyQueriedMetadata: null
maxOutstandingWriteRequests: 0
maxOutstandingReadRequests: 0
maxOutstandingRepairedBytes: 0
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/services/m3dbnode/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type LimitsConfiguration struct {
// max is surpassed encounter an error.
MaxRecentlyQueriedSeriesBlocks *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"`

// MaxRecentlyQueriedMetadata sets the upper limit on metadata counts
// within a given lookback period. Metadata queries which are issued while
// this max is surpassed encounter an error.
MaxRecentlyQueriedMetadata *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedMetadata"`

// MaxOutstandingWriteRequests controls the maximum number of outstanding write requests
// that the server will allow before it begins rejecting requests. Note that this value
// is independent of the number of values that are being written (due to variable batch
Expand Down
7 changes: 7 additions & 0 deletions src/dbnode/integration/query_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestQueryLimitExceededError(t *testing.T) {
start = end.Add(-time.Hour)
query = index.Query{Query: idx.NewTermQuery([]byte("tag"), []byte("value"))}
queryOpts = index.QueryOptions{StartInclusive: start, EndExclusive: end}
aggOpts = index.AggregationOptions{QueryOptions: queryOpts}
)

session, err := testSetup.M3DBClient().DefaultSession()
Expand All @@ -75,6 +76,11 @@ func TestQueryLimitExceededError(t *testing.T) {
ns.ID(), query, queryOpts)
require.True(t, client.IsResourceExhaustedError(err),
"expected resource exhausted error, got: %v", err)

_, _, err = session.Aggregate(ContextWithDefaultTimeout(), ns.ID(), query, aggOpts)
require.True(t, client.IsResourceExhaustedError(err),
"expected aggregate resource exhausted error, got: %v", err)

}

func newTestOptionsWithIndexedNamespace(t *testing.T) (TestOptions, namespace.Metadata) {
Expand All @@ -96,6 +102,7 @@ func newTestSetupWithQueryLimits(t *testing.T, opts TestOptions) TestSetup {
limitOpts := limits.NewOptions().
SetBytesReadLimitOpts(queryLookback).
SetDocsLimitOpts(queryLookback).
SetAggregateDocsLimitOpts(queryLookback).
SetInstrumentOptions(storageOpts.InstrumentOptions())
queryLimits, err := limits.NewQueryLimits(limitOpts)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (r *blockRetriever) fetchBatch(
retrieverResources)

var limitErr error
if err := r.queryLimits.AnyExceeded(); err != nil {
if err := r.queryLimits.AnyFetchExceeded(); err != nil {
for _, req := range reqs {
req.err = err
}
Expand Down
Loading

0 comments on commit 1778b9f

Please sign in to comment.