diff --git a/scripts/docker-integration-tests/prometheus/metadata-limits.sh b/scripts/docker-integration-tests/prometheus/metadata-limits.sh new file mode 100755 index 0000000000..d13adfadc6 --- /dev/null +++ b/scripts/docker-integration-tests/prometheus/metadata-limits.sh @@ -0,0 +1,87 @@ +#!/usr/bin/env bash + +set -ex +M3_PATH=${M3_PATH:-$GOPATH/src/github.com/m3db/m3} +source "$M3_PATH"/scripts/docker-integration-tests/common.sh + +COMPOSE_FILE="$M3_PATH"/scripts/docker-integration-tests/query_fanout/docker-compose.yml +HEADER_FILE=headers.out + +function write_agg_metrics { + NUM=$1 + echo "Writing $NUM metrics to [0.0.0.0:9003]" + set +x + for (( i=0; i<$NUM; i++ )) + do + curl -s -X POST 0.0.0.0:9003/writetagged -d '{ + "namespace": "unagg", + "id": "{__name__=\"'$METRIC_NAME'\",'$METRIC_NAME'=\"'$i'\"}", + "tags": [ + { + "name": "__name__", + "value": "'$METRIC_NAME'" + }, + { + "name": "'$METRIC_NAME'", + "value": "'$i'" + } + ], + "datapoint": { + "timestamp":'"$NOW"', + "value": '$i' + } + }' &> /dev/null + done + + set -x +} + +function test_correct_label_values { + RESULT=$(curl "http://0.0.0.0:7201/api/v1/label/${METRIC_NAME}/values" ) + COUNT=$(echo $RESULT | jq .data[] | wc -l) + test $COUNT = 60 +} + +function test_failing_label_values { + RESULT=$(curl "http://0.0.0.0:7201/api/v1/label/${METRIC_NAME}/values" ) + STATUS=$(echo $RESULT | jq .status) + test $STATUS = '"error"' +} + +function test_query_succeeds { + RESULT=$(curl "http://0.0.0.0:7201/api/v1/query?query=sum($METRIC_NAME)&start=$NOW") + STATUS=$(echo $RESULT | jq .status) + test $STATUS = '"success"' +} + +function test_global_aggregate_limits { + export NOW=$(date +"%s") + export METRIC_NAME="aggregate_limits_$NOW" + + write_agg_metrics 60 + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{ + "key": "m3db.query.limits", + "value":{ + "maxRecentlyQueriedMetadataRead": { + "limit":150, + "lookbackSeconds":5, + "forceExceeded":false + } + }, + "commit":true + }' + + # Make sure any existing limit has expired before continuing. + ATTEMPTS=5 retry_with_backoff test_correct_label_values + ATTEMPTS=5 retry_with_backoff test_correct_label_values + ATTEMPTS=5 TIMEOUT=1 retry_with_backoff test_failing_label_values + # Make sure that a query is unaffected by the the metadata limits. + ATTEMPTS=2 retry_with_backoff test_query_succeeds + # Make sure the limit expires within 10 seconds and the query succeeds again. + ATTEMPTS=10 retry_with_backoff test_correct_label_values + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{ + "key": "m3db.query.limits", + "value":{}, + "commit":true + }' +} diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index afa684feba..3806ea1c37 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -2,10 +2,13 @@ set -xe -source "$M3_PATH"/scripts/docker-integration-tests/common.sh -source "$M3_PATH"/scripts/docker-integration-tests/prometheus/test-correctness.sh +M3_PATH=${M3_PATH:-$GOPATH/src/github.com/m3db/m3} +TESTDIR="$M3_PATH"/scripts/docker-integration-tests/ +source "$TESTDIR"/common.sh +source "$TESTDIR"/prometheus/test-correctness.sh +source "$TESTDIR"/prometheus/metadata-limits.sh REVISION=$(git rev-parse HEAD) -COMPOSE_FILE="$M3_PATH"/scripts/docker-integration-tests/prometheus/docker-compose.yml +COMPOSE_FILE="$TESTDIR"/prometheus/docker-compose.yml # quay.io/m3db/prometheus_remote_client_golang @ v0.4.3 PROMREMOTECLI_IMAGE=quay.io/m3db/prometheus_remote_client_golang@sha256:fc56df819bff9a5a087484804acf3a584dd4a78c68900c31a28896ed66ca7e7b JQ_IMAGE=realguess/jq:1.4@sha256:300c5d9fb1d74154248d155ce182e207cf6630acccbaadd0168e18b15bfaa786 @@ -496,4 +499,7 @@ test_labels echo "Running function correctness tests" test_correctness +echo "Running aggregate limit tests" +test_global_aggregate_limits + TEST_SUCCESS=true diff --git a/site/content/operational_guide/resource_limits.md b/site/content/operational_guide/resource_limits.md index 3b6041388c..9ed8ea8e4c 100644 --- a/site/content/operational_guide/resource_limits.md +++ b/site/content/operational_guide/resource_limits.md @@ -103,6 +103,17 @@ limits: # and read until the lookback period resets. lookback: 15s + # If set, will enforce a maximum number of fields and terms matched for + # metadata queries, e.g. label//values + maxRecentlyQueriedMetadataRead: + # Value sets the maximum number of fields and terms matched. + value: 0 + # Lookback sets the time window that this limit is enforced over, every + # lookback period the global count is reset to zero and when the limit + # is reached it will reject any further time series blocks being matched + # and read until the lookback period resets. + lookback: 15s + # If set then will limit the number of parallel write batch requests to the # database and return errors if hit. maxOutstandingWriteRequests: 0 @@ -137,6 +148,11 @@ curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{ "limit":0, "lookbackSeconds":15, "forceExceeded":false + }, + "maxRecentlyQueriedMetadataRead": { + "limit":0, + "lookbackSeconds":15, + "forceExceeded":false } }, "commit":true diff --git a/src/cluster/generated/proto/kvpb/kv.pb.go b/src/cluster/generated/proto/kvpb/kv.pb.go index d3c7c43f87..0b021e655d 100644 --- a/src/cluster/generated/proto/kvpb/kv.pb.go +++ b/src/cluster/generated/proto/kvpb/kv.pb.go @@ -120,6 +120,7 @@ type QueryLimits struct { MaxRecentlyQueriedSeriesBlocks *QueryLimit `protobuf:"bytes,1,opt,name=maxRecentlyQueriedSeriesBlocks" json:"maxRecentlyQueriedSeriesBlocks,omitempty"` MaxRecentlyQueriedSeriesDiskBytesRead *QueryLimit `protobuf:"bytes,2,opt,name=maxRecentlyQueriedSeriesDiskBytesRead" json:"maxRecentlyQueriedSeriesDiskBytesRead,omitempty"` MaxRecentlyQueriedSeriesDiskRead *QueryLimit `protobuf:"bytes,3,opt,name=maxRecentlyQueriedSeriesDiskRead" json:"maxRecentlyQueriedSeriesDiskRead,omitempty"` + MaxRecentlyQueriedMetadataRead *QueryLimit `protobuf:"bytes,4,opt,name=maxRecentlyQueriedMetadataRead" json:"maxRecentlyQueriedMetadataRead,omitempty"` } func (m *QueryLimits) Reset() { *m = QueryLimits{} } @@ -148,6 +149,13 @@ func (m *QueryLimits) GetMaxRecentlyQueriedSeriesDiskRead() *QueryLimit { return nil } +func (m *QueryLimits) GetMaxRecentlyQueriedMetadataRead() *QueryLimit { + if m != nil { + return m.MaxRecentlyQueriedMetadataRead + } + return nil +} + type QueryLimit struct { Limit int64 `protobuf:"varint,1,opt,name=limit,proto3" json:"limit,omitempty"` LookbackSeconds int64 `protobuf:"varint,2,opt,name=lookbackSeconds,proto3" json:"lookbackSeconds,omitempty"` @@ -307,6 +315,16 @@ func (m *QueryLimits) MarshalTo(dAtA []byte) (int, error) { } i += n3 } + if m.MaxRecentlyQueriedMetadataRead != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintKv(dAtA, i, uint64(m.MaxRecentlyQueriedMetadataRead.Size())) + n4, err := m.MaxRecentlyQueriedMetadataRead.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 + } return i, nil } @@ -407,6 +425,10 @@ func (m *QueryLimits) Size() (n int) { l = m.MaxRecentlyQueriedSeriesDiskRead.Size() n += 1 + l + sovKv(uint64(l)) } + if m.MaxRecentlyQueriedMetadataRead != nil { + l = m.MaxRecentlyQueriedMetadataRead.Size() + n += 1 + l + sovKv(uint64(l)) + } return n } @@ -831,6 +853,39 @@ func (m *QueryLimits) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxRecentlyQueriedMetadataRead", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MaxRecentlyQueriedMetadataRead == nil { + m.MaxRecentlyQueriedMetadataRead = &QueryLimit{} + } + if err := m.MaxRecentlyQueriedMetadataRead.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipKv(dAtA[iNdEx:]) @@ -1070,28 +1125,29 @@ func init() { } var fileDescriptorKv = []byte{ - // 361 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xcf, 0x6e, 0xda, 0x30, - 0x18, 0x5f, 0xc8, 0x86, 0xc6, 0x87, 0xb6, 0x45, 0x16, 0x9a, 0x38, 0x45, 0x28, 0xda, 0x24, 0x4e, - 0xb1, 0x34, 0xae, 0x3b, 0xa1, 0xed, 0x54, 0x0e, 0xad, 0x51, 0xab, 0x1e, 0x7a, 0x49, 0xec, 0x0f, - 0x1a, 0xc5, 0x89, 0x51, 0xec, 0x50, 0xf2, 0x16, 0x7d, 0x91, 0xbe, 0x47, 0x8f, 0x7d, 0x84, 0x8a, - 0xbe, 0x48, 0x65, 0x83, 0x84, 0xa8, 0xa0, 0xf4, 0x12, 0x7d, 0xbf, 0x5f, 0x7e, 0x7f, 0xe2, 0x7c, - 0x86, 0xbf, 0xf3, 0xcc, 0xdc, 0xd6, 0x69, 0xcc, 0x55, 0x41, 0x8b, 0x91, 0x48, 0x69, 0x31, 0xa2, - 0xba, 0xe2, 0x94, 0xcb, 0x5a, 0x1b, 0xac, 0xe8, 0x1c, 0x4b, 0xac, 0x12, 0x83, 0x82, 0x2e, 0x2a, - 0x65, 0x14, 0xcd, 0x97, 0x8b, 0x94, 0xe6, 0xcb, 0xd8, 0x21, 0xf2, 0xd9, 0xc2, 0xe8, 0x1c, 0xbe, - 0x9f, 0x61, 0x73, 0x95, 0xc8, 0x1a, 0x2f, 0x17, 0x22, 0x31, 0x48, 0x02, 0xf0, 0x73, 0x6c, 0xfa, - 0xde, 0xc0, 0x1b, 0x76, 0x98, 0x1d, 0x49, 0x0f, 0xbe, 0x2c, 0xad, 0xa0, 0xdf, 0x72, 0xdc, 0x06, - 0x90, 0x9f, 0xd0, 0xe6, 0xaa, 0x28, 0x32, 0xd3, 0xf7, 0x07, 0xde, 0xf0, 0x2b, 0xdb, 0xa2, 0x68, - 0x02, 0xbd, 0xfd, 0x44, 0x86, 0xba, 0x96, 0xe6, 0x40, 0x6e, 0x00, 0xbe, 0x92, 0x62, 0x9b, 0x6a, - 0x47, 0xcb, 0x94, 0x78, 0xe7, 0x02, 0x3b, 0xcc, 0x8e, 0xd1, 0x43, 0x0b, 0xba, 0x17, 0x35, 0x56, - 0xcd, 0x24, 0x2b, 0x32, 0xa3, 0xc9, 0x35, 0x84, 0x45, 0xb2, 0x62, 0xc8, 0xb1, 0x34, 0xb2, 0xb1, - 0x6f, 0x32, 0x14, 0x53, 0xfb, 0xd4, 0x63, 0xa9, 0x78, 0xae, 0x5d, 0x41, 0xf7, 0x4f, 0x10, 0xdb, - 0xe3, 0xc5, 0x3b, 0x2b, 0x3b, 0xe1, 0x23, 0x33, 0xf8, 0x7d, 0x4c, 0xf1, 0x2f, 0xd3, 0xf9, 0xb8, - 0x31, 0xa8, 0x19, 0x26, 0x9b, 0xef, 0x3d, 0x54, 0xf0, 0x31, 0x3b, 0xb9, 0x81, 0xc1, 0x7b, 0x42, - 0x57, 0xe1, 0x1f, 0xa9, 0x38, 0xe9, 0x8c, 0x2a, 0x80, 0x9d, 0xde, 0x6e, 0x4e, 0xda, 0xc1, 0xfd, - 0x14, 0x9f, 0x6d, 0x00, 0x19, 0xc2, 0x0f, 0xa9, 0x54, 0x9e, 0x26, 0x3c, 0x9f, 0x22, 0x57, 0xa5, - 0xd0, 0xee, 0x4c, 0x3e, 0x7b, 0x4b, 0x93, 0x5f, 0xf0, 0x6d, 0xa6, 0x2a, 0x8e, 0xff, 0x57, 0x1c, - 0x51, 0xa0, 0xd8, 0xae, 0x7a, 0x9f, 0x1c, 0x07, 0x8f, 0xeb, 0xd0, 0x7b, 0x5a, 0x87, 0xde, 0xf3, - 0x3a, 0xf4, 0xee, 0x5f, 0xc2, 0x4f, 0x69, 0xdb, 0x5d, 0xb1, 0xd1, 0x6b, 0x00, 0x00, 0x00, 0xff, - 0xff, 0x4e, 0xb0, 0xcd, 0x62, 0xa2, 0x02, 0x00, 0x00, + // 375 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xcf, 0x6a, 0xe2, 0x40, + 0x18, 0xdf, 0x6c, 0x5c, 0x59, 0x47, 0x76, 0x37, 0x0c, 0xb2, 0x78, 0x0a, 0x12, 0x76, 0xc1, 0x53, + 0x06, 0xd6, 0xeb, 0x9e, 0xa4, 0x3d, 0xd5, 0x42, 0x3b, 0xd2, 0xd2, 0x43, 0x2f, 0x93, 0x99, 0x4f, + 0x1b, 0x32, 0xc9, 0x48, 0x66, 0x62, 0xcd, 0x13, 0xf4, 0xda, 0xc7, 0xea, 0xb1, 0x8f, 0x50, 0xec, + 0x8b, 0x94, 0x19, 0x05, 0x6b, 0xd1, 0xea, 0x25, 0x7c, 0xbf, 0x2f, 0xbf, 0x3f, 0xc9, 0x8f, 0x0f, + 0xfd, 0x9f, 0xa6, 0xe6, 0xae, 0x4a, 0x62, 0xae, 0x72, 0x92, 0x0f, 0x44, 0x42, 0xf2, 0x01, 0xd1, + 0x25, 0x27, 0x5c, 0x56, 0xda, 0x40, 0x49, 0xa6, 0x50, 0x40, 0xc9, 0x0c, 0x08, 0x32, 0x2b, 0x95, + 0x51, 0x24, 0x9b, 0xcf, 0x12, 0x92, 0xcd, 0x63, 0x87, 0x70, 0xc3, 0xc2, 0xe8, 0x02, 0xfd, 0x3c, + 0x83, 0xfa, 0x9a, 0xc9, 0x0a, 0xae, 0x66, 0x82, 0x19, 0xc0, 0x01, 0xf2, 0x33, 0xa8, 0xbb, 0x5e, + 0xcf, 0xeb, 0xb7, 0xa8, 0x1d, 0x71, 0x07, 0x7d, 0x9b, 0x5b, 0x42, 0xf7, 0xab, 0xdb, 0xad, 0x00, + 0xfe, 0x8d, 0x9a, 0x5c, 0xe5, 0x79, 0x6a, 0xba, 0x7e, 0xcf, 0xeb, 0x7f, 0xa7, 0x6b, 0x14, 0x8d, + 0x50, 0x67, 0xdb, 0x91, 0x82, 0xae, 0xa4, 0xd9, 0xe1, 0x1b, 0x20, 0x5f, 0x49, 0xb1, 0x76, 0xb5, + 0xa3, 0xdd, 0x14, 0x70, 0xef, 0x0c, 0x5b, 0xd4, 0x8e, 0xd1, 0x83, 0x8f, 0xda, 0x97, 0x15, 0x94, + 0xf5, 0x28, 0xcd, 0x53, 0xa3, 0xf1, 0x0d, 0x0a, 0x73, 0xb6, 0xa0, 0xc0, 0xa1, 0x30, 0xb2, 0xb6, + 0x6f, 0x52, 0x10, 0x63, 0xfb, 0xd4, 0x43, 0xa9, 0x78, 0xa6, 0x5d, 0x40, 0xfb, 0x5f, 0x10, 0xdb, + 0xdf, 0x8b, 0x37, 0x52, 0x7a, 0x40, 0x87, 0x27, 0xe8, 0xef, 0x3e, 0xc6, 0x49, 0xaa, 0xb3, 0x61, + 0x6d, 0x40, 0x53, 0x60, 0xab, 0xef, 0xdd, 0x15, 0x70, 0x9c, 0x1c, 0xdf, 0xa2, 0xde, 0x67, 0x44, + 0x17, 0xe1, 0xef, 0x89, 0x38, 0xa8, 0xdc, 0xdd, 0xcf, 0x39, 0x18, 0x26, 0x98, 0x61, 0xce, 0xbb, + 0x71, 0x7c, 0x3f, 0xef, 0x75, 0x51, 0x89, 0xd0, 0x86, 0x6d, 0x6f, 0x42, 0xda, 0xc1, 0xd5, 0xed, + 0xd3, 0x15, 0xc0, 0x7d, 0xf4, 0x4b, 0x2a, 0x95, 0x25, 0x8c, 0x67, 0x63, 0xe0, 0xaa, 0x10, 0xda, + 0xb5, 0xe5, 0xd3, 0x8f, 0x6b, 0xfc, 0x07, 0xfd, 0x98, 0xa8, 0x92, 0xc3, 0xe9, 0x82, 0x03, 0x08, + 0x10, 0xeb, 0x23, 0xda, 0x5e, 0x0e, 0x83, 0xa7, 0x65, 0xe8, 0x3d, 0x2f, 0x43, 0xef, 0x65, 0x19, + 0x7a, 0x8f, 0xaf, 0xe1, 0x97, 0xa4, 0xe9, 0x8e, 0x77, 0xf0, 0x16, 0x00, 0x00, 0xff, 0xff, 0xd3, + 0x38, 0x17, 0xa7, 0xfc, 0x02, 0x00, 0x00, } diff --git a/src/cluster/generated/proto/kvpb/kv.proto b/src/cluster/generated/proto/kvpb/kv.proto index ef2b2f60d5..3e1cccef1e 100644 --- a/src/cluster/generated/proto/kvpb/kv.proto +++ b/src/cluster/generated/proto/kvpb/kv.proto @@ -22,9 +22,9 @@ syntax = "proto3"; package kvpb; message KeyValueUpdate { - string key = 1; + string key = 1; string value = 2; - bool commit = 3; + bool commit = 3; } message KeyValueUpdateResult { @@ -34,13 +34,14 @@ message KeyValueUpdateResult { } message QueryLimits { - QueryLimit maxRecentlyQueriedSeriesBlocks = 1; + QueryLimit maxRecentlyQueriedSeriesBlocks = 1; QueryLimit maxRecentlyQueriedSeriesDiskBytesRead = 2; - QueryLimit maxRecentlyQueriedSeriesDiskRead = 3; + QueryLimit maxRecentlyQueriedSeriesDiskRead = 3; + QueryLimit maxRecentlyQueriedMetadataRead = 4; } message QueryLimit { - int64 limit = 1; + int64 limit = 1; int64 lookbackSeconds = 2; - bool forceExceeded = 3; + bool forceExceeded = 3; } diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index b13db65418..2a465803b6 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -734,6 +734,7 @@ func TestConfiguration(t *testing.T) { maxRecentlyQueriedSeriesDiskBytesRead: null maxRecentlyQueriedSeriesDiskRead: null maxRecentlyQueriedSeriesBlocks: null + maxRecentlyQueriedMetadata: null maxOutstandingWriteRequests: 0 maxOutstandingReadRequests: 0 maxOutstandingRepairedBytes: 0 diff --git a/src/cmd/services/m3dbnode/config/limits.go b/src/cmd/services/m3dbnode/config/limits.go index eb26cb0de8..2aad1758d4 100644 --- a/src/cmd/services/m3dbnode/config/limits.go +++ b/src/cmd/services/m3dbnode/config/limits.go @@ -40,6 +40,11 @@ type LimitsConfiguration struct { // max is surpassed encounter an error. MaxRecentlyQueriedSeriesBlocks *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"` + // MaxRecentlyQueriedMetadata sets the upper limit on metadata counts + // within a given lookback period. Metadata queries which are issued while + // this max is surpassed encounter an error. + MaxRecentlyQueriedMetadata *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedMetadata"` + // MaxOutstandingWriteRequests controls the maximum number of outstanding write requests // that the server will allow before it begins rejecting requests. Note that this value // is independent of the number of values that are being written (due to variable batch diff --git a/src/dbnode/integration/query_limit_test.go b/src/dbnode/integration/query_limit_test.go index f1ebf6daf2..a84066bb2a 100644 --- a/src/dbnode/integration/query_limit_test.go +++ b/src/dbnode/integration/query_limit_test.go @@ -55,6 +55,7 @@ func TestQueryLimitExceededError(t *testing.T) { start = end.Add(-time.Hour) query = index.Query{Query: idx.NewTermQuery([]byte("tag"), []byte("value"))} queryOpts = index.QueryOptions{StartInclusive: start, EndExclusive: end} + aggOpts = index.AggregationOptions{QueryOptions: queryOpts} ) session, err := testSetup.M3DBClient().DefaultSession() @@ -75,6 +76,11 @@ func TestQueryLimitExceededError(t *testing.T) { ns.ID(), query, queryOpts) require.True(t, client.IsResourceExhaustedError(err), "expected resource exhausted error, got: %v", err) + + _, _, err = session.Aggregate(ContextWithDefaultTimeout(), ns.ID(), query, aggOpts) + require.True(t, client.IsResourceExhaustedError(err), + "expected aggregate resource exhausted error, got: %v", err) + } func newTestOptionsWithIndexedNamespace(t *testing.T) (TestOptions, namespace.Metadata) { @@ -96,6 +102,7 @@ func newTestSetupWithQueryLimits(t *testing.T, opts TestOptions) TestSetup { limitOpts := limits.NewOptions(). SetBytesReadLimitOpts(queryLookback). SetDocsLimitOpts(queryLookback). + SetAggregateDocsLimitOpts(queryLookback). SetInstrumentOptions(storageOpts.InstrumentOptions()) queryLimits, err := limits.NewQueryLimits(limitOpts) require.NoError(t, err) diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 3be8fd16ef..8f725866ec 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -436,7 +436,7 @@ func (r *blockRetriever) fetchBatch( retrieverResources) var limitErr error - if err := r.queryLimits.AnyExceeded(); err != nil { + if err := r.queryLimits.AnyFetchExceeded(); err != nil { for _, req := range reqs { req.err = err } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index a6ba71c77e..13ab1ac9ef 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -428,9 +428,13 @@ func Run(runOpts RunOptions) { } // Setup query stats tracking. - docsLimit := limits.DefaultLookbackLimitOptions() - bytesReadLimit := limits.DefaultLookbackLimitOptions() - diskSeriesReadLimit := limits.DefaultLookbackLimitOptions() + var ( + docsLimit = limits.DefaultLookbackLimitOptions() + bytesReadLimit = limits.DefaultLookbackLimitOptions() + diskSeriesReadLimit = limits.DefaultLookbackLimitOptions() + aggDocsLimit = limits.DefaultLookbackLimitOptions() + ) + if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil { docsLimit.Limit = limitConfig.Value docsLimit.Lookback = limitConfig.Lookback @@ -443,10 +447,15 @@ func Run(runOpts RunOptions) { diskSeriesReadLimit.Limit = limitConfig.Value diskSeriesReadLimit.Lookback = limitConfig.Lookback } + if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedMetadata; limitConfig != nil { + aggDocsLimit.Limit = limitConfig.Value + aggDocsLimit.Lookback = limitConfig.Lookback + } limitOpts := limits.NewOptions(). SetDocsLimitOpts(docsLimit). SetBytesReadLimitOpts(bytesReadLimit). SetDiskSeriesReadLimitOpts(diskSeriesReadLimit). + SetAggregateDocsLimitOpts(aggDocsLimit). SetInstrumentOptions(iOpts) if builder := opts.SourceLoggerBuilder(); builder != nil { limitOpts = limitOpts.SetSourceLoggerBuilder(builder) @@ -1016,13 +1025,15 @@ func Run(runOpts RunOptions) { kvWatchEncodersPerBlockLimit(syncCfg.KVStore, logger, runtimeOptsMgr, cfg.Limits.MaxEncodersPerBlock) kvWatchQueryLimit(syncCfg.KVStore, logger, - queryLimits.DocsLimit(), + queryLimits.FetchDocsLimit(), queryLimits.BytesReadLimit(), // For backwards compatibility as M3 moves toward permits instead of time-based limits, // the series-read path uses permits which are implemented with limits, and so we support // dynamic updates to this limit-based permit still be passing downstream the limit itself. seriesReadPermits.Limit, - limitOpts) + queryLimits.AggregateDocsLimit(), + limitOpts, + ) }() // Wait for process interrupt. @@ -1201,6 +1212,7 @@ func kvWatchQueryLimit( docsLimit limits.LookbackLimit, bytesReadLimit limits.LookbackLimit, diskSeriesReadLimit limits.LookbackLimit, + aggregateDocsLimit limits.LookbackLimit, defaultOpts limits.Options, ) { value, err := store.Get(kvconfig.QueryLimits) @@ -1208,7 +1220,9 @@ func kvWatchQueryLimit( dynamicLimits := &kvpb.QueryLimits{} err = value.Unmarshal(dynamicLimits) if err == nil { - updateQueryLimits(logger, docsLimit, bytesReadLimit, diskSeriesReadLimit, dynamicLimits, defaultOpts) + updateQueryLimits( + logger, docsLimit, bytesReadLimit, diskSeriesReadLimit, + aggregateDocsLimit, dynamicLimits, defaultOpts) } } else if !errors.Is(err, kv.ErrNotFound) { logger.Warn("error resolving query limit", zap.Error(err)) @@ -1228,18 +1242,20 @@ func kvWatchQueryLimit( logger.Warn("unable to parse new query limits", zap.Error(err)) continue } - updateQueryLimits(logger, - docsLimit, bytesReadLimit, diskSeriesReadLimit, - dynamicLimits, defaultOpts) + updateQueryLimits( + logger, docsLimit, bytesReadLimit, diskSeriesReadLimit, + aggregateDocsLimit, dynamicLimits, defaultOpts) } } }() } -func updateQueryLimits(logger *zap.Logger, +func updateQueryLimits( + logger *zap.Logger, docsLimit limits.LookbackLimit, bytesReadLimit limits.LookbackLimit, diskSeriesReadLimit limits.LookbackLimit, + aggregateDocsLimit limits.LookbackLimit, dynamicOpts *kvpb.QueryLimits, configOpts limits.Options, ) { @@ -1249,6 +1265,7 @@ func updateQueryLimits(logger *zap.Logger, docsLimitOpts = configOpts.DocsLimitOpts() bytesReadLimitOpts = configOpts.BytesReadLimitOpts() diskSeriesReadLimitOpts = configOpts.DiskSeriesReadLimitOpts() + aggDocsLimitOpts = configOpts.AggregateDocsLimitOpts() ) if dynamicOpts != nil { if dynamicOpts.MaxRecentlyQueriedSeriesBlocks != nil { @@ -1260,6 +1277,9 @@ func updateQueryLimits(logger *zap.Logger, if dynamicOpts.MaxRecentlyQueriedSeriesDiskRead != nil { diskSeriesReadLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedSeriesDiskRead) } + if dynamicOpts.MaxRecentlyQueriedMetadataRead != nil { + aggDocsLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedMetadataRead) + } } if err := updateQueryLimit(docsLimit, docsLimitOpts); err != nil { @@ -1273,6 +1293,10 @@ func updateQueryLimits(logger *zap.Logger, if err := updateQueryLimit(diskSeriesReadLimit, diskSeriesReadLimitOpts); err != nil { logger.Error("error updating series read limit", zap.Error(err)) } + + if err := updateQueryLimit(aggregateDocsLimit, aggDocsLimitOpts); err != nil { + logger.Error("error updating metadata read limit", zap.Error(err)) + } } func updateQueryLimit( diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 30640c7161..d0933a21a6 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -874,7 +874,7 @@ func (d *db) QueryIDs( // Check if exceeding query limits at very beginning of // query path to abandon as early as possible. - if err := d.queryLimits.AnyExceeded(); err != nil { + if err := d.queryLimits.AnyFetchExceeded(); err != nil { return index.QueryResult{}, err } diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index e8ce05b2e1..d4c5131559 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -150,7 +150,7 @@ type block struct { blockOpts BlockOptions nsMD namespace.Metadata namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager - docsLimit limits.LookbackLimit + fetchDocsLimit limits.LookbackLimit aggDocsLimit limits.LookbackLimit metrics blockMetrics @@ -269,7 +269,7 @@ func NewBlock( namespaceRuntimeOptsMgr: namespaceRuntimeOptsMgr, metrics: newBlockMetrics(scope), logger: iopts.Logger(), - docsLimit: opts.QueryLimits().DocsLimit(), + fetchDocsLimit: opts.QueryLimits().FetchDocsLimit(), aggDocsLimit: opts.QueryLimits().AggregateDocsLimit(), } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator @@ -567,7 +567,7 @@ func (b *block) addQueryResults( ) ([]doc.Document, int, int, error) { // update recently queried docs to monitor memory. if results.EnforceLimits() { - if err := b.docsLimit.Inc(len(batch), source); err != nil { + if err := b.fetchDocsLimit.Inc(len(batch), source); err != nil { return batch, 0, 0, err } } @@ -683,7 +683,7 @@ func (b *block) aggregateWithSpan( defer func() { b.opts.AggregateResultsEntryArrayPool().Put(batch) if !iterClosed { - iter.Close(ctx) //nolint:errcheck + _ = iter.Close(ctx) } b.metrics.aggregateDocsMatched.RecordValue(float64(docsCount)) @@ -749,14 +749,14 @@ func (b *block) aggregateWithSpan( if lastField == nil { lastField = append(lastField, field...) batchedFields++ - if err := b.docsLimit.Inc(1, source); err != nil { + if err := b.fetchDocsLimit.Inc(1, source); err != nil { return false, err } } else if !bytes.Equal(lastField, field) { lastField = lastField[:0] lastField = append(lastField, field...) batchedFields++ - if err := b.docsLimit.Inc(1, source); err != nil { + if err := b.fetchDocsLimit.Inc(1, source); err != nil { return false, err } } @@ -766,7 +766,7 @@ func (b *block) aggregateWithSpan( // reflect the term appearing as both the last element of the previous // batch, as well as the first element in the next batch. if batchedFields > maxBatch { - if err := b.docsLimit.Inc(2, source); err != nil { + if err := b.fetchDocsLimit.Inc(2, source); err != nil { return false, err } @@ -912,9 +912,12 @@ func (b *block) addAggregateResults( aggDocs += len(batch[i].Terms) } - // NB: currently this is here to capture upper limits for these limits and will - // trip constantly; ignore any errors for now. - _ = b.aggDocsLimit.Inc(aggDocs, source) + // update recently queried docs to monitor memory. + if results.EnforceLimits() { + if err := b.aggDocsLimit.Inc(aggDocs, source); err != nil { + return batch, 0, 0, err + } + } // reset batch. var emptyField AggregateResultsEntry diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 073cbb4ebf..0c912eb930 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -45,6 +45,7 @@ import ( "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/tallytest" "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" @@ -213,6 +214,7 @@ func genField() gopter.Gen { type propTestSegment struct { metadata doc.Metadata exCount int64 + exCountAgg int64 segmentMap segmentMap } @@ -233,8 +235,10 @@ func genTestSegment() gopter.Gen { } } + aggLength := len(segMap) fields := make([]testFields, 0, len(input)) for name, valSet := range segMap { + aggLength += len(valSet) vals := make([]string, 0, len(valSet)) for val := range valSet { vals = append(vals, val) @@ -261,6 +265,7 @@ func genTestSegment() gopter.Gen { return propTestSegment{ metadata: doc.Metadata{Fields: docFields}, exCount: int64(len(segMap)), + exCountAgg: int64(aggLength), segmentMap: segMap, } }) @@ -327,7 +332,8 @@ func TestAggregateDocLimits(t *testing.T) { limitOpts := limits.NewOptions(). SetInstrumentOptions(iOpts). SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). - SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetAggregateDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) queryLimits, err := limits.NewQueryLimits((limitOpts)) require.NoError(t, err) testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits) @@ -368,18 +374,11 @@ func TestAggregateDocLimits(t *testing.T) { require.True(t, exhaustive, errors.New("not exhaustive")) verifyResults(t, results, testSegment.segmentMap) - found := false - for _, c := range scope.Snapshot().Counters() { - fmt.Println(c) - if c.Name() == "query-limit.total-docs-matched" && - c.Tags()["type"] == "fetch" { - require.Equal(t, testSegment.exCount, c.Value(), "docs count mismatch") - found = true - break - } - } - - require.True(t, found, "counter not found in metrics") + snap := scope.Snapshot() + tallytest.AssertCounterValue(t, testSegment.exCount, snap, + "query-limit.total-docs-matched", map[string]string{"type": "fetch"}) + tallytest.AssertCounterValue(t, testSegment.exCountAgg, snap, + "query-limit.total-docs-matched", map[string]string{"type": "aggregate"}) return true, nil }, genTestSegment(), diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index b520008eff..fad628b306 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -24,6 +24,7 @@ import ( stdlibctx "context" "fmt" "sort" + "strings" "testing" "time" @@ -1908,7 +1909,8 @@ func TestBlockAggregate(t *testing.T) { limitOpts := limits.NewOptions(). SetInstrumentOptions(iOpts). SetDocsLimitOpts(limits.LookbackLimitOptions{Limit: 50, Lookback: time.Minute}). - SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetAggregateDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) queryLimits, err := limits.NewQueryLimits((limitOpts)) require.NoError(t, err) testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits) @@ -1990,6 +1992,89 @@ func TestBlockAggregate(t *testing.T) { "query-limit.total-docs-matched", map[string]string{"type": "aggregate"}) } +func TestBlockAggregateWithAggregateLimits(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + seriesLimit := 100 + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(iOpts). + SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetAggregateDocsLimitOpts(limits.LookbackLimitOptions{ + Limit: int64(seriesLimit), + Lookback: time.Minute, + }) + queryLimits, err := limits.NewQueryLimits((limitOpts)) + require.NoError(t, err) + aggTestOpts := testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits) + + testMD := newTestNSMetadata(t) + start := time.Now().Truncate(time.Hour) + blk, err := NewBlock(start, testMD, BlockOptions{}, + namespace.NewRuntimeOptionsManager("foo"), aggTestOpts) + require.NoError(t, err) + + b, ok := blk.(*block) + require.True(t, ok) + + seg1 := segment.NewMockMutableSegment(ctrl) + reader := segment.NewMockReader(ctrl) + reader.EXPECT().Close().Return(nil) + seg1.EXPECT().Reader().Return(reader, nil) + + b.mutableSegments.foregroundSegments = []*readableSeg{newReadableSeg(seg1, aggTestOpts)} + iter := NewMockfieldsAndTermsIterator(ctrl) + b.newFieldsAndTermsIteratorFn = func( + _ context.Context, _ segment.Reader, opts fieldsAndTermsIteratorOpts) (fieldsAndTermsIterator, error) { + return iter, nil + } + results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + SizeLimit: seriesLimit, + Type: AggregateTagNamesAndValues, + }, aggTestOpts) + + ctx := context.NewBackground() + defer ctx.BlockingClose() + + // create initial span from a mock tracer and get ctx + mtr := mocktracer.New() + sp := mtr.StartSpan("root") + ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) + + iter.EXPECT().Reset(gomock.Any(), reader, gomock.Any()).Return(nil) + for i := 0; i < seriesLimit-1; i++ { + iter.EXPECT().Next().Return(true) + curr := []byte(fmt.Sprint(i)) + iter.EXPECT().Current().Return([]byte("f1"), curr) + } + iter.EXPECT().Close(gomock.Any()).Return(nil) + iter.EXPECT().SearchDuration().Return(time.Second) + + exhaustive, err := b.Aggregate( + ctx, + QueryOptions{SeriesLimit: seriesLimit}, + results, + emptyLogFields) + require.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "query aborted due to limit")) + require.False(t, exhaustive) + + sp.Finish() + spans := mtr.FinishedSpans() + require.Len(t, spans, 3) + require.Equal(t, tracepoint.NSIdxBlockAggregateQueryAddDocuments, spans[0].OperationName) + require.Equal(t, tracepoint.BlockAggregate, spans[1].OperationName) + + snap := scope.Snapshot() + tallytest.AssertCounterValue(t, 1, snap, + "query-limit.total-docs-matched", map[string]string{"type": "fetch"}) + tallytest.AssertCounterValue(t, int64(seriesLimit), snap, + "query-limit.total-docs-matched", map[string]string{"type": "aggregate"}) +} + func TestBlockAggregateNotExhaustive(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -2141,7 +2226,7 @@ func TestBlockE2EInsertAggregate(t *testing.T) { exhaustive, err := b.Aggregate( ctx, - QueryOptions{SeriesLimit: 10}, + QueryOptions{SeriesLimit: 1000}, results, emptyLogFields) require.NoError(t, err) @@ -2174,7 +2259,7 @@ func TestBlockE2EInsertAggregate(t *testing.T) { }, testOpts) exhaustive, err = b.Aggregate( ctx, - QueryOptions{SeriesLimit: 10}, + QueryOptions{SeriesLimit: 100}, results, emptyLogFields) require.NoError(t, err) @@ -2425,7 +2510,8 @@ func TestBlockAggregateBatching(t *testing.T) { limitOpts := limits.NewOptions(). SetInstrumentOptions(iOpts). SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). - SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetAggregateDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) queryLimits, err := limits.NewQueryLimits((limitOpts)) require.NoError(t, err) testOpts = optionsWithAggResultsPool(tt.batchSize). diff --git a/src/dbnode/storage/limits/noop_query_limits.go b/src/dbnode/storage/limits/noop_query_limits.go index 300dee005d..712043d7ec 100644 --- a/src/dbnode/storage/limits/noop_query_limits.go +++ b/src/dbnode/storage/limits/noop_query_limits.go @@ -36,7 +36,7 @@ func NoOpQueryLimits() QueryLimits { return &noOpQueryLimits{} } -func (q *noOpQueryLimits) DocsLimit() LookbackLimit { +func (q *noOpQueryLimits) FetchDocsLimit() LookbackLimit { return &noOpLookbackLimit{} } @@ -48,7 +48,7 @@ func (q *noOpQueryLimits) AggregateDocsLimit() LookbackLimit { return &noOpLookbackLimit{} } -func (q *noOpQueryLimits) AnyExceeded() error { +func (q *noOpQueryLimits) AnyFetchExceeded() error { return nil } diff --git a/src/dbnode/storage/limits/options.go b/src/dbnode/storage/limits/options.go index cc82c121aa..ec19b8404c 100644 --- a/src/dbnode/storage/limits/options.go +++ b/src/dbnode/storage/limits/options.go @@ -28,11 +28,12 @@ import ( ) type limitOpts struct { - iOpts instrument.Options - docsLimitOpts LookbackLimitOptions - bytesReadLimitOpts LookbackLimitOptions - diskSeriesReadLimitOpts LookbackLimitOptions - sourceLoggerBuilder SourceLoggerBuilder + iOpts instrument.Options + docsLimitOpts LookbackLimitOptions + bytesReadLimitOpts LookbackLimitOptions + diskSeriesReadLimitOpts LookbackLimitOptions + diskAggregateDocsLimitOpts LookbackLimitOptions + sourceLoggerBuilder SourceLoggerBuilder } // NewOptions creates limit options with default values. @@ -107,6 +108,18 @@ func (o *limitOpts) DiskSeriesReadLimitOpts() LookbackLimitOptions { return o.diskSeriesReadLimitOpts } +// SetDiskSeriesReadLimitOpts sets the disk ts read limit options. +func (o *limitOpts) SetAggregateDocsLimitOpts(value LookbackLimitOptions) Options { + opts := *o + opts.diskAggregateDocsLimitOpts = value + return &opts +} + +// DiskSeriesReadLimitOpts returns the disk ts read limit options. +func (o *limitOpts) AggregateDocsLimitOpts() LookbackLimitOptions { + return o.diskAggregateDocsLimitOpts +} + // SetSourceLoggerBuilder sets the source logger. func (o *limitOpts) SetSourceLoggerBuilder(value SourceLoggerBuilder) Options { opts := *o diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go index c087a09312..0ab99160f7 100644 --- a/src/dbnode/storage/limits/query_limits.go +++ b/src/dbnode/storage/limits/query_limits.go @@ -46,6 +46,7 @@ type queryLimits struct { type lookbackLimit struct { name string + started bool options LookbackLimitOptions metrics lookbackLimitMetrics logger *zap.Logger @@ -53,6 +54,7 @@ type lookbackLimit struct { stopCh chan struct{} stoppedCh chan struct{} lock sync.RWMutex + iOpts instrument.Options } type lookbackLimitMetrics struct { @@ -90,19 +92,21 @@ func NewQueryLimits(options Options) (QueryLimits, error) { iOpts = options.InstrumentOptions() docsLimitOpts = options.DocsLimitOpts() bytesReadLimitOpts = options.BytesReadLimitOpts() - aggDocsLimitOpts = options.DocsLimitOpts() + aggDocsLimitOpts = options.AggregateDocsLimitOpts() sourceLoggerBuilder = options.SourceLoggerBuilder() - docsMatched = "docs-matched" - docsLimit = newLookbackLimit( - iOpts, docsLimitOpts, docsMatched, + docsMatched = "docs-matched" + bytesRead = "disk-bytes-read" + aggregateMatched = "aggregate-matched" + docsLimit = newLookbackLimit( + iOpts, docsLimitOpts, docsMatched, docsMatched, sourceLoggerBuilder, map[string]string{"type": "fetch"}) bytesReadLimit = newLookbackLimit( - iOpts, bytesReadLimitOpts, "disk-bytes-read", + iOpts, bytesReadLimitOpts, bytesRead, bytesRead, sourceLoggerBuilder, nil) aggregatedDocsLimit = newLookbackLimit( - iOpts, aggDocsLimitOpts, docsMatched, + iOpts, aggDocsLimitOpts, docsMatched, aggregateMatched, sourceLoggerBuilder, map[string]string{"type": "aggregate"}) ) @@ -121,19 +125,20 @@ func NewLookbackLimit( sourceLoggerBuilder SourceLoggerBuilder, tags map[string]string, ) LookbackLimit { - return newLookbackLimit(instrumentOpts, opts, name, sourceLoggerBuilder, tags) + return newLookbackLimit(instrumentOpts, opts, name, name, sourceLoggerBuilder, tags) } func newLookbackLimit( instrumentOpts instrument.Options, opts LookbackLimitOptions, + metricName string, name string, sourceLoggerBuilder SourceLoggerBuilder, tags map[string]string, ) *lookbackLimit { metrics := newLookbackLimitMetrics( instrumentOpts, - name, + metricName, sourceLoggerBuilder, tags, ) @@ -146,6 +151,7 @@ func newLookbackLimit( recent: atomic.NewInt64(0), stopCh: make(chan struct{}), stoppedCh: make(chan struct{}), + iOpts: instrumentOpts, } } @@ -177,7 +183,7 @@ func newLookbackLimitMetrics( } } -func (q *queryLimits) DocsLimit() LookbackLimit { +func (q *queryLimits) FetchDocsLimit() LookbackLimit { return q.docsLimit } @@ -192,17 +198,20 @@ func (q *queryLimits) AggregateDocsLimit() LookbackLimit { func (q *queryLimits) Start() { q.docsLimit.Start() q.bytesReadLimit.Start() + q.aggregatedDocsLimit.Start() } func (q *queryLimits) Stop() { q.docsLimit.Stop() q.bytesReadLimit.Stop() + q.aggregatedDocsLimit.Stop() } -func (q *queryLimits) AnyExceeded() error { +func (q *queryLimits) AnyFetchExceeded() error { if err := q.docsLimit.exceeded(); err != nil { return err } + return q.bytesReadLimit.exceeded() } @@ -288,6 +297,7 @@ func (q *lookbackLimit) checkLimit(recent int64) error { "query aborted due to limit: name=%s, limit=%d, current=%d, within=%s", q.name, q.options.Limit, recent, q.options.Lookback))) } + return nil } @@ -308,6 +318,7 @@ func (q *lookbackLimit) Stop() { } func (q *lookbackLimit) start() { + q.started = true ticker := time.NewTicker(q.options.Lookback) go func() { q.logger.Info("query limit interval started", zap.String("name", q.name)) @@ -328,6 +339,16 @@ func (q *lookbackLimit) start() { } func (q *lookbackLimit) stop() { + if !q.started { + // NB: this lookback limit has not yet been started. + instrument.EmitAndLogInvariantViolation(q.iOpts, func(l *zap.Logger) { + l.With( + zap.Any("limit_name", q.name), + ).Error("cannot stop non-started lookback limit") + }) + return + } + close(q.stopCh) <-q.stoppedCh q.stopCh = make(chan struct{}) diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go index 4206120bea..d1d67372e9 100644 --- a/src/dbnode/storage/limits/query_limits_test.go +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -38,12 +38,14 @@ func testQueryLimitOptions( docOpts LookbackLimitOptions, bytesOpts LookbackLimitOptions, seriesOpts LookbackLimitOptions, + aggDocsOpts LookbackLimitOptions, iOpts instrument.Options, ) Options { return NewOptions(). SetDocsLimitOpts(docOpts). SetBytesReadLimitOpts(bytesOpts). SetDiskSeriesReadLimitOpts(seriesOpts). + SetAggregateDocsLimitOpts(aggDocsOpts). SetInstrumentOptions(iOpts) } @@ -61,44 +63,63 @@ func TestQueryLimits(t *testing.T) { Limit: l, Lookback: time.Second, } - opts := testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) + aggOpts := LookbackLimitOptions{ + Limit: l, + Lookback: time.Second, + } + opts := testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, aggOpts, instrument.NewOptions()) queryLimits, err := NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) // No error yet. - require.NoError(t, queryLimits.AnyExceeded()) + require.NoError(t, queryLimits.AnyFetchExceeded()) // Limit from docs. - require.Error(t, queryLimits.DocsLimit().Inc(2, nil)) - err = queryLimits.AnyExceeded() + require.Error(t, queryLimits.FetchDocsLimit().Inc(2, nil)) + err = queryLimits.AnyFetchExceeded() require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) require.True(t, IsQueryLimitExceededError(err)) - opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) + opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, aggOpts, instrument.NewOptions()) queryLimits, err = NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) // No error yet. - err = queryLimits.AnyExceeded() + err = queryLimits.AnyFetchExceeded() require.NoError(t, err) // Limit from bytes. require.Error(t, queryLimits.BytesReadLimit().Inc(2, nil)) - err = queryLimits.AnyExceeded() + err = queryLimits.AnyFetchExceeded() require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) require.True(t, IsQueryLimitExceededError(err)) - opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) + opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, aggOpts, instrument.NewOptions()) + queryLimits, err = NewQueryLimits(opts) + require.NoError(t, err) + require.NotNil(t, queryLimits) + + // No error yet. + err = queryLimits.AnyFetchExceeded() + require.NoError(t, err) + + // Limit from aggregate does not trip any fetched exceeded. + require.Error(t, queryLimits.AggregateDocsLimit().Inc(2, nil)) + err = queryLimits.AnyFetchExceeded() + require.NoError(t, err) + require.NotNil(t, queryLimits) + + opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, aggOpts, instrument.NewOptions()) queryLimits, err = NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) // No error yet. - err = queryLimits.AnyExceeded() + err = queryLimits.AnyFetchExceeded() require.NoError(t, err) } @@ -121,7 +142,7 @@ func TestLookbackLimit(t *testing.T) { ForceExceeded: test.forceExceeded, } name := "test" - limit := newLookbackLimit(iOpts, opts, name, &sourceLoggerBuilder{}, nil) + limit := newLookbackLimit(iOpts, opts, name, name, &sourceLoggerBuilder{}, nil) require.Equal(t, int64(0), limit.current()) @@ -235,7 +256,7 @@ func TestLookbackReset(t *testing.T) { Lookback: time.Millisecond * 100, } name := "test" - limit := newLookbackLimit(iOpts, opts, name, &sourceLoggerBuilder{}, nil) + limit := newLookbackLimit(iOpts, opts, name, name, &sourceLoggerBuilder{}, nil) err := limit.Inc(3, nil) require.NoError(t, err) @@ -347,7 +368,7 @@ func TestSourceLogger(t *testing.T) { } builder = &testBuilder{records: []testLoggerRecord{}} - opts = testQueryLimitOptions(noLimit, noLimit, noLimit, iOpts). + opts = testQueryLimitOptions(noLimit, noLimit, noLimit, noLimit, iOpts). SetSourceLoggerBuilder(builder) ) @@ -357,7 +378,7 @@ func TestSourceLogger(t *testing.T) { require.NoError(t, err) require.NotNil(t, queryLimits) - require.NoError(t, queryLimits.DocsLimit().Inc(100, []byte("docs"))) + require.NoError(t, queryLimits.FetchDocsLimit().Inc(100, []byte("docs"))) require.NoError(t, queryLimits.BytesReadLimit().Inc(200, []byte("bytes"))) assert.Equal(t, []testLoggerRecord{ diff --git a/src/dbnode/storage/limits/types.go b/src/dbnode/storage/limits/types.go index ac086409e2..62c02b76fa 100644 --- a/src/dbnode/storage/limits/types.go +++ b/src/dbnode/storage/limits/types.go @@ -35,17 +35,18 @@ const SourceContextKey Key = "source" // QueryLimits provides an interface for managing query limits. type QueryLimits interface { - // DocsLimit limits queries by a global concurrent count of index docs matched. - DocsLimit() LookbackLimit + // FetchDocsLimit limits queries by a global concurrent count of index docs matched. + FetchDocsLimit() LookbackLimit // BytesReadLimit limits queries by a global concurrent count of bytes read from disk. BytesReadLimit() LookbackLimit + // AnyFetchExceeded returns an error if any of the query limits are exceeded on + // a fetch query. + AnyFetchExceeded() error // AggregateDocsLimit limits aggregate queries by a global // concurrent count of index docs matched. AggregateDocsLimit() LookbackLimit - // AnyExceeded returns an error if any of the query limits are exceeded. - AnyExceeded() error // Start begins background resetting of the query limits. Start() // Stop end background resetting of the query limits. @@ -113,6 +114,12 @@ type Options interface { // BytesReadLimitOpts returns the byte read limit options. BytesReadLimitOpts() LookbackLimitOptions + // SetAggregateDocsLimitOpts sets the aggregate doc limit options. + SetAggregateDocsLimitOpts(LookbackLimitOptions) Options + + // AggregateDocsLimitOpts returns the aggregate doc limit options. + AggregateDocsLimitOpts() LookbackLimitOptions + // SetDiskSeriesReadLimitOpts sets the disk series read limit options. SetDiskSeriesReadLimitOpts(value LookbackLimitOptions) Options diff --git a/src/query/api/v1/handler/database/kvstore_test.go b/src/query/api/v1/handler/database/kvstore_test.go index cf986ad83a..1b1cca5127 100644 --- a/src/query/api/v1/handler/database/kvstore_test.go +++ b/src/query/api/v1/handler/database/kvstore_test.go @@ -68,6 +68,17 @@ func TestUpdateQueryLimits(t *testing.T) { }, commit: true, }, + { + name: `only metadata - commit`, + limits: &kvpb.QueryLimits{ + MaxRecentlyQueriedMetadataRead: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + }, + commit: true, + }, { name: `only block - no commit`, limits: &kvpb.QueryLimits{ @@ -97,6 +108,11 @@ func TestUpdateQueryLimits(t *testing.T) { LookbackSeconds: 15, ForceExceeded: true, }, + MaxRecentlyQueriedMetadataRead: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, }, commit: true, },