From 12a12b717acd62f3d40ac90e9fdb43263b3fbbf5 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Wed, 12 May 2021 17:17:06 -0500 Subject: [PATCH 01/13] Add support for Max Series Per Query for block storage and streaming ingesters and update limits.go to reflect changes Signed-off-by: Tyler Reid --- pkg/distributor/distributor_test.go | 54 ++++++++++ pkg/distributor/query.go | 15 ++- pkg/querier/blocks_store_queryable.go | 10 +- pkg/querier/blocks_store_queryable_test.go | 51 +++++++-- pkg/querier/querier.go | 6 +- pkg/util/limiter/per_query_limiter.go | 119 +++++++++++++++++++++ pkg/util/limiter/per_query_limiter_test.go | 103 ++++++++++++++++++ pkg/util/validation/limits.go | 2 +- 8 files changed, 347 insertions(+), 13 deletions(-) create mode 100644 pkg/util/limiter/per_query_limiter.go create mode 100644 pkg/util/limiter/per_query_limiter_test.go diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 98364cc36f..0bb3102b0d 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/cortexproject/cortex/pkg/util/limiter" "io" "math" "net/http" @@ -945,6 +946,59 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac assert.Contains(t, err.Error(), "the query hit the max number of chunks limit") } +func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) { + const maxSeriesLimit = 10 + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.MaxSeriesPerQuery = maxSeriesLimit + ctx = limiter.NewPerQueryLimiterOnContext(ctx, maxSeriesLimit, 0) + // Prepare distributors. + ds, _, r, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + }) + defer stopAll(ds, r) + + // Push a number of series below the max series limit. Each series has 1 sample, + initialSeries := maxSeriesLimit + writeReq := makeWriteRequest(0, initialSeries, 0) + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } + + // Since the number of series is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, initialSeries) + + // Push more series to exceed the limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + for i := 0; i < initialSeries; i++ { + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0), + ) + } + + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + // Since the number of series is exceeding the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Contains(t, err.Error(), "max number of series limit while") +} + func TestDistributor_Push_LabelRemoval(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "user") diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 0b20d02312..6ce9b16290 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -3,6 +3,7 @@ package distributor import ( "context" "fmt" + "github.com/cortexproject/cortex/pkg/util/limiter" "io" "time" @@ -187,8 +188,10 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re // queryIngesterStream queries the ingesters using the new streaming API. func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { var ( - chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) - chunksCount = atomic.Int32{} + chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) + chunksCount = atomic.Int32{} + queryLimiter = limiter.PerQueryLimiterFromContext(ctx) + matchers, _ = ingester_client.FromLabelMatchers(req.Matchers) ) // Fetch samples from multiple ingesters @@ -226,10 +229,16 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re // We expect to be always able to convert the label matchers back to Prometheus ones. // In case we fail (unexpected) the error will not include the matchers, but the core // logic doesn't break. - matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit)) } } + for _, series := range resp.Chunkseries { + //Add series, with fingerprint inside of limiter + limitErr := queryLimiter.AddFingerPrint(series.Labels, matchers) + if limitErr != nil { + return nil, limitErr + } + } result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) result.Timeseries = append(result.Timeseries, resp.Timeseries...) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 072647a486..ac8c2200c5 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -3,6 +3,8 @@ package querier import ( "context" "fmt" + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util/limiter" "io" "sort" "strings" @@ -423,7 +425,6 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* if maxChunksLimit > 0 { leftChunksLimit -= numChunks } - resultMtx.Unlock() return queriedBlocks, nil @@ -563,6 +564,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( queriedBlocks = []ulid.ULID(nil) numChunks = atomic.NewInt32(0) spanLog = spanlogger.FromContext(ctx) + queryLimiter = limiter.PerQueryLimiterFromContext(ctx) ) // Concurrently fetch series from all clients. @@ -611,6 +613,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if s := resp.GetSeries(); s != nil { mySeries = append(mySeries, s) + //Add series fingerprint to query limiter; will return error if we are over the limit + limitErr := queryLimiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()), matchers) + if limitErr != nil { + return limitErr + } + // Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled). if maxChunksLimit > 0 { actual := numChunks.Add(int32(len(s.Chunks))) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index a777d5041c..71169a7798 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + "github.com/cortexproject/cortex/pkg/util/limiter" "io" "sort" "strings" @@ -43,13 +44,14 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ) var ( - block1 = ulid.MustNew(1, nil) - block2 = ulid.MustNew(2, nil) - block3 = ulid.MustNew(3, nil) - block4 = ulid.MustNew(4, nil) - metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} - series1Label = labels.Label{Name: "series", Value: "1"} - series2Label = labels.Label{Name: "series", Value: "2"} + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} + series1Label = labels.Label{Name: "series", Value: "1"} + series2Label = labels.Label{Name: "series", Value: "2"} + noOpPerQueryLimiter = limiter.NewPerQueryLimiter(0, 0) ) type valueResult struct { @@ -67,6 +69,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { finderErr error storeSetResponses []interface{} limits BlocksStoreLimits + perQueryLimiter *limiter.PerQueryLimiter expectedSeries []seriesResult expectedErr error expectedMetrics string @@ -74,11 +77,13 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { "no block in the storage matching the query time range": { finderResult: nil, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedErr: nil, }, "error while finding blocks matching the query time range": { finderErr: errors.New("unable to find blocks"), limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedErr: errors.New("unable to find blocks"), }, "error while getting clients to query the store-gateway": { @@ -90,6 +95,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { errors.New("no client found"), }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedErr: errors.New("no client found"), }, "a single store-gateway instance holds the required blocks (single returned series)": { @@ -107,6 +113,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -133,6 +140,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -166,6 +174,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -195,6 +204,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -230,6 +240,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -291,6 +302,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { errors.New("no store-gateway remaining after exclude"), }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s", block2.String()), }, "multiple store-gateway instances have some missing blocks (consistency check failed)": { @@ -316,6 +328,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { errors.New("no store-gateway remaining after exclude"), }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), }, "multiple store-gateway instances have some missing blocks but queried from a replica during subsequent attempts": { @@ -353,6 +366,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, + perQueryLimiter: noOpPerQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -411,6 +425,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + perQueryLimiter: noOpPerQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -436,6 +451,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, + perQueryLimiter: noOpPerQueryLimiter, expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { @@ -473,13 +489,34 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + perQueryLimiter: noOpPerQueryLimiter, expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), }, + "max series per query limit hit while fetching chunks": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 2), + mockHintsResponse(block1, block2), + }}: {block1, block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + perQueryLimiter: limiter.NewPerQueryLimiter(1, 0), + expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit while fetching chunks %s (limit: %d)", fmt.Sprintf("{__name__=%q}", metricName), 1)), + }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { ctx := context.Background() + ctx = limiter.AddPerQueryLimiterToContext(ctx, testData.perQueryLimiter) + reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index dc1e1e1f3a..0e7c74ba3b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "github.com/cortexproject/cortex/pkg/util/limiter" "strings" "sync" "time" @@ -155,7 +156,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor QueryStoreAfter: cfg.QueryStoreAfter, } } - + // queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits, tombstonesLoader) lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) { @@ -223,6 +224,9 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, if err != nil { return nil, err } + //Take the set tenant limits + //TODO When Chunk Bytes per Query Limit is created take that in here (Currently Unlimited) + ctx = limiter.NewPerQueryLimiterOnContext(ctx, limits.MaxSeriesPerQuery(userID), 0) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/util/limiter/per_query_limiter.go b/pkg/util/limiter/per_query_limiter.go new file mode 100644 index 0000000000..91d50b5ed1 --- /dev/null +++ b/pkg/util/limiter/per_query_limiter.go @@ -0,0 +1,119 @@ +package limiter + +import ( + "context" + "fmt" + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "go.uber.org/atomic" + "sync" +) + +type PerQueryLimiter struct { + uniqueSeriesMx sync.RWMutex + uniqueSeries map[model.Fingerprint]struct{} + + chunkBytesCount *atomic.Int32 + maxSeriesPerQuery int + maxChunkBytesPerQuery int +} +type perQueryLimiterCtxMarker struct{} + +var ( + pqlCtxKey = &perQueryLimiterCtxMarker{} + errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks %s (limit: %d)" + errMaxChunkBytesHit = "The query hit the max number of chunk bytes limit while fetching chunks %s (limit: %d)" +) + +// NewPerQueryLimiter makes a new per-query rate limiter. Each per-query limiter +// is configured using the `maxSeriesPerQuery` and `maxChunkBytesPerQuery` limits. +func NewPerQueryLimiter(maxSeriesPerQuery int, maxChunkBytesPerQuery int) *PerQueryLimiter { + return &PerQueryLimiter{ + uniqueSeriesMx: sync.RWMutex{}, + uniqueSeries: map[model.Fingerprint]struct{}{}, + + chunkBytesCount: atomic.NewInt32(0), + + maxChunkBytesPerQuery: maxChunkBytesPerQuery, + maxSeriesPerQuery: maxSeriesPerQuery, + } +} + +func NewPerQueryLimiterOnContext(ctx context.Context, maxSeriesPerQuery int, maxChunkBytesPerQuery int) context.Context { + return context.WithValue(ctx, pqlCtxKey, NewPerQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery)) +} + +func AddPerQueryLimiterToContext(ctx context.Context, limiter *PerQueryLimiter) context.Context { + return context.WithValue(ctx, pqlCtxKey, limiter) +} + +// PerQueryLimiterFromContext returns a Per Query Limiter from the current context. +// IF there is Per Query Limiter on the context we will return ??? +func PerQueryLimiterFromContext(ctx context.Context) *PerQueryLimiter { + //Create fallback limiter of a new limiter?? + return FromContextWithFallback(ctx) +} + +// FromContextWithFallback returns a Per Query Limiter from the current context. +// IF there is Per Query Limiter on the context we will return ??? +func FromContextWithFallback(ctx context.Context) *PerQueryLimiter { + pql, ok := ctx.Value(pqlCtxKey).(*PerQueryLimiter) + if !ok { + //If there's no limiter return a new unlimited limiter as a fallback + pql = NewPerQueryLimiter(0, 0) + } + return pql +} + +// AddFingerPrint Add a label adapter fast fingerprint to the map of unique fingerprints. If the +// added series label causes us to go over the limit of maxSeriesPerQuery we will +// return a validation error +func (pql *PerQueryLimiter) AddFingerPrint(labelAdapter []cortexpb.LabelAdapter, matchers []*labels.Matcher) error { + //If the max series is unlimited just return without managing map + if pql.maxSeriesPerQuery == 0 { + return nil + } + + pql.uniqueSeriesMx.Lock() + //Unlock after return + defer pql.uniqueSeriesMx.Unlock() + + pql.uniqueSeries[client.FastFingerprint(labelAdapter)] = struct{}{} + if len(pql.uniqueSeries) > pql.maxSeriesPerQuery { + //Format error with query and max limit + return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, util.LabelMatchersToString(matchers), pql.maxSeriesPerQuery)) + } + return nil +} + +// UniqueFingerPrints returns the count of unique series fingerprints seen by this per query limiter +func (pql *PerQueryLimiter) UniqueFingerPrints() int { + pql.uniqueSeriesMx.RLock() + defer pql.uniqueSeriesMx.RUnlock() + mapL := len(pql.uniqueSeries) + return mapL +} + +// AddChunkBytes add number of chunk bytes to running query total of bytes +func (pql *PerQueryLimiter) AddChunkBytes(bytesCount int32) error { + if pql.maxChunkBytesPerQuery == 0 { + return nil + } + + totalChunkBytes := pql.chunkBytesCount.Add(bytesCount) + if totalChunkBytes > int32(pql.maxChunkBytesPerQuery) { + //TODO format real error message here + return validation.LimitError("Too many samples") + } else { + return nil + } +} + +func (pql *PerQueryLimiter) ChunkBytesCount() int32 { + //Is there a better way to get a value here? + return pql.chunkBytesCount.Add(0) +} diff --git a/pkg/util/limiter/per_query_limiter_test.go b/pkg/util/limiter/per_query_limiter_test.go new file mode 100644 index 0000000000..2025c6934d --- /dev/null +++ b/pkg/util/limiter/per_query_limiter_test.go @@ -0,0 +1,103 @@ +package limiter + +import ( + "fmt" + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestPerQueryLimiter_AddFingerPrint(t *testing.T) { + const ( + metricName = "test_metric" + ) + + var ( + series1 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": "1", + }) + series2 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_2", + "series2": "1", + }) + matchers = []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + limiter = NewPerQueryLimiter(100, 100) + ) + limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series1), matchers) + err := limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series2), matchers) + assert.Equal(t, 2, limiter.UniqueFingerPrints()) + assert.Nil(t, err) + +} + +func TestPerQueryLimiter_AddFingerPrintExceedLimit(t *testing.T) { + const ( + metricName = "test_metric" + ) + + var ( + series1 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": "1", + }) + series2 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_2", + "series2": "1", + }) + matchers = []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + limiter = NewPerQueryLimiter(1, 1) + ) + err := limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series1), matchers) + assert.Equal(t, nil, err) + err = limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series2), matchers) + require.Error(t, err) +} + +func BenchmarkPerQueryLimiter_AddFingerPrint(b *testing.B) { + const ( + metricName = "test_metric" + ) + var series []labels.Labels + for i := 0; i < 10000; i++ { + series = append(series, + labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": fmt.Sprint(i), + })) + } + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + b.ResetTimer() + + for n := 0; n < b.N; n++ { + limiter := NewPerQueryLimiter(10000, 10000) + for _, s := range series { + limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(s), matchers) + } + } + +} + +func TestPerQueryLimiter_AddChunkBytes(t *testing.T) { + limiter := NewPerQueryLimiter(100, 100) + err := limiter.AddChunkBytes(int32(10)) + assert.Equal(t, int32(10), limiter.ChunkBytesCount()) + assert.Nil(t, err) +} + +func TestPerQueryLimiter_AddChunkBytesExceedLimit(t *testing.T) { + limiter := NewPerQueryLimiter(100, 10) + err := limiter.AddChunkBytes(int32(11)) + assert.Equal(t, int32(11), limiter.ChunkBytesCount()) + require.Error(t, err) +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index acd21eaf1a..c30bf75164 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -130,7 +130,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") - f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced only in the ingesters (when querying samples not flushed to the storage yet) and it's a per-instance limit. This limit is ignored when running the Cortex blocks storage.") + f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced in the ingesters and it's a per-instance limit.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return. This limit only applies when running the Cortex chunks storage with -querier.ingester-streaming=false.") f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") From 9148855e479077785722a2f4273e0176b967b524 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Wed, 12 May 2021 17:35:46 -0500 Subject: [PATCH 02/13] Fix spacing issues Signed-off-by: Tyler Reid --- pkg/querier/blocks_store_queryable_test.go | 4 +--- pkg/querier/querier.go | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 71169a7798..f395209300 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -514,9 +514,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - ctx := context.Background() - ctx = limiter.AddPerQueryLimiterToContext(ctx, testData.perQueryLimiter) - + ctx := limiter.AddPerQueryLimiterToContext(context.Background(), testData.perQueryLimiter) reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 0e7c74ba3b..7c30ae0297 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -156,7 +156,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor QueryStoreAfter: cfg.QueryStoreAfter, } } - // queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits, tombstonesLoader) lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) { From 60571d26fb69da193015268e767e9e7ac5a9725b Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Thu, 13 May 2021 16:20:58 -0500 Subject: [PATCH 03/13] Review and linter changes Signed-off-by: Tyler Reid --- pkg/distributor/distributor_test.go | 4 +- pkg/distributor/query.go | 13 +- pkg/querier/blocks_store_queryable.go | 10 +- pkg/querier/blocks_store_queryable_test.go | 96 +++++++------- pkg/querier/querier.go | 7 +- pkg/util/limiter/per_query_limiter.go | 119 ------------------ pkg/util/limiter/query_limiter.go | 89 +++++++++++++ ..._limiter_test.go => query_limiter_test.go} | 38 ++---- 8 files changed, 169 insertions(+), 207 deletions(-) delete mode 100644 pkg/util/limiter/per_query_limiter.go create mode 100644 pkg/util/limiter/query_limiter.go rename pkg/util/limiter/{per_query_limiter_test.go => query_limiter_test.go} (62%) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 0bb3102b0d..d4eb2e778a 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/cortexproject/cortex/pkg/util/limiter" "io" "math" "net/http" @@ -41,6 +40,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" util_math "github.com/cortexproject/cortex/pkg/util/math" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -952,7 +952,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac limits := &validation.Limits{} flagext.DefaultValues(limits) limits.MaxSeriesPerQuery = maxSeriesLimit - ctx = limiter.NewPerQueryLimiterOnContext(ctx, maxSeriesLimit, 0) + ctx = limiter.NewQueryLimiterOnContext(ctx, maxSeriesLimit) // Prepare distributors. ds, _, r, _ := prepare(t, prepConfig{ numIngesters: 3, diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 6ce9b16290..1c5ca60172 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -3,7 +3,6 @@ package distributor import ( "context" "fmt" - "github.com/cortexproject/cortex/pkg/util/limiter" "io" "time" @@ -20,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" grpc_util "github.com/cortexproject/cortex/pkg/util/grpc" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -190,7 +190,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re var ( chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) chunksCount = atomic.Int32{} - queryLimiter = limiter.PerQueryLimiterFromContext(ctx) + queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) matchers, _ = ingester_client.FromLabelMatchers(req.Matchers) ) @@ -233,8 +233,13 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re } } for _, series := range resp.Chunkseries { - //Add series, with fingerprint inside of limiter - limitErr := queryLimiter.AddFingerPrint(series.Labels, matchers) + limitErr := queryLimiter.AddSeries(series.Labels, matchers) + if limitErr != nil { + return nil, limitErr + } + } + for _, series := range resp.Timeseries { + limitErr := queryLimiter.AddSeries(series.Labels, matchers) if limitErr != nil { return nil, limitErr } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index ac8c2200c5..6e69111738 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -3,8 +3,6 @@ package querier import ( "context" "fmt" - "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/util/limiter" "io" "sort" "strings" @@ -29,6 +27,7 @@ import ( "golang.org/x/sync/errgroup" grpc_metadata "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" @@ -39,6 +38,7 @@ import ( "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/math" "github.com/cortexproject/cortex/pkg/util/services" @@ -564,7 +564,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( queriedBlocks = []ulid.ULID(nil) numChunks = atomic.NewInt32(0) spanLog = spanlogger.FromContext(ctx) - queryLimiter = limiter.PerQueryLimiterFromContext(ctx) + queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) // Concurrently fetch series from all clients. @@ -613,8 +613,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if s := resp.GetSeries(); s != nil { mySeries = append(mySeries, s) - //Add series fingerprint to query limiter; will return error if we are over the limit - limitErr := queryLimiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()), matchers) + // Add series fingerprint to query limiter; will return error if we are over the limit + limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()), matchers) if limitErr != nil { return limitErr } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index f395209300..f03a022c2b 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -3,7 +3,6 @@ package querier import ( "context" "fmt" - "github.com/cortexproject/cortex/pkg/util/limiter" "io" "sort" "strings" @@ -32,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -44,14 +44,14 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ) var ( - block1 = ulid.MustNew(1, nil) - block2 = ulid.MustNew(2, nil) - block3 = ulid.MustNew(3, nil) - block4 = ulid.MustNew(4, nil) - metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} - series1Label = labels.Label{Name: "series", Value: "1"} - series2Label = labels.Label{Name: "series", Value: "2"} - noOpPerQueryLimiter = limiter.NewPerQueryLimiter(0, 0) + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} + series1Label = labels.Label{Name: "series", Value: "1"} + series2Label = labels.Label{Name: "series", Value: "2"} + noOpQueryLimiter = limiter.NewQueryLimiter(0) ) type valueResult struct { @@ -69,7 +69,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { finderErr error storeSetResponses []interface{} limits BlocksStoreLimits - perQueryLimiter *limiter.PerQueryLimiter + queryLimiter *limiter.QueryLimiter expectedSeries []seriesResult expectedErr error expectedMetrics string @@ -77,14 +77,14 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { "no block in the storage matching the query time range": { finderResult: nil, limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, + queryLimiter: noOpQueryLimiter, expectedErr: nil, }, "error while finding blocks matching the query time range": { - finderErr: errors.New("unable to find blocks"), - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, - expectedErr: errors.New("unable to find blocks"), + finderErr: errors.New("unable to find blocks"), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: errors.New("unable to find blocks"), }, "error while getting clients to query the store-gateway": { finderResult: bucketindex.Blocks{ @@ -94,9 +94,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ errors.New("no client found"), }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, - expectedErr: errors.New("no client found"), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: errors.New("no client found"), }, "a single store-gateway instance holds the required blocks (single returned series)": { finderResult: bucketindex.Blocks{ @@ -112,8 +112,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -139,8 +139,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -173,8 +173,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block2}, }, }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -203,8 +203,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block2}, }, }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -239,8 +239,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block3}, }, }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -301,9 +301,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { // Second attempt returns an error because there are no other store-gateways left. errors.New("no store-gateway remaining after exclude"), }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, - expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s", block2.String()), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s", block2.String()), }, "multiple store-gateway instances have some missing blocks (consistency check failed)": { finderResult: bucketindex.Blocks{ @@ -327,9 +327,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { // Second attempt returns an error because there are no other store-gateways left. errors.New("no store-gateway remaining after exclude"), }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, - expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), }, "multiple store-gateway instances have some missing blocks but queried from a replica during subsequent attempts": { finderResult: bucketindex.Blocks{ @@ -365,8 +365,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block4}, }, }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: noOpPerQueryLimiter, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -424,8 +424,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, - perQueryLimiter: noOpPerQueryLimiter, + limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -450,9 +450,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, - perQueryLimiter: noOpPerQueryLimiter, - expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), + limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { finderResult: bucketindex.Blocks{ @@ -488,9 +488,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block4}, }, }, - limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, - perQueryLimiter: noOpPerQueryLimiter, - expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), }, "max series per query limit hit while fetching chunks": { finderResult: bucketindex.Blocks{ @@ -506,15 +506,15 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{}, - perQueryLimiter: limiter.NewPerQueryLimiter(1, 0), - expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit while fetching chunks %s (limit: %d)", fmt.Sprintf("{__name__=%q}", metricName), 1)), + limits: &blocksStoreLimitsMock{}, + queryLimiter: limiter.NewQueryLimiter(1), + expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit while fetching chunks %s (limit: %d)", fmt.Sprintf("{__name__=%q}", metricName), 1)), }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - ctx := limiter.AddPerQueryLimiterToContext(context.Background(), testData.perQueryLimiter) + ctx := limiter.AddQueryLimiterToContext(context.Background(), testData.queryLimiter) reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 7c30ae0297..ba14a1a034 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -5,7 +5,6 @@ import ( "errors" "flag" "fmt" - "github.com/cortexproject/cortex/pkg/util/limiter" "strings" "sync" "time" @@ -30,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -223,9 +223,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, if err != nil { return nil, err } - //Take the set tenant limits - //TODO When Chunk Bytes per Query Limit is created take that in here (Currently Unlimited) - ctx = limiter.NewPerQueryLimiterOnContext(ctx, limits.MaxSeriesPerQuery(userID), 0) + // Take the set tenant limits + ctx = limiter.NewQueryLimiterOnContext(ctx, limits.MaxSeriesPerQuery(userID)) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/util/limiter/per_query_limiter.go b/pkg/util/limiter/per_query_limiter.go deleted file mode 100644 index 91d50b5ed1..0000000000 --- a/pkg/util/limiter/per_query_limiter.go +++ /dev/null @@ -1,119 +0,0 @@ -package limiter - -import ( - "context" - "fmt" - "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" - "go.uber.org/atomic" - "sync" -) - -type PerQueryLimiter struct { - uniqueSeriesMx sync.RWMutex - uniqueSeries map[model.Fingerprint]struct{} - - chunkBytesCount *atomic.Int32 - maxSeriesPerQuery int - maxChunkBytesPerQuery int -} -type perQueryLimiterCtxMarker struct{} - -var ( - pqlCtxKey = &perQueryLimiterCtxMarker{} - errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks %s (limit: %d)" - errMaxChunkBytesHit = "The query hit the max number of chunk bytes limit while fetching chunks %s (limit: %d)" -) - -// NewPerQueryLimiter makes a new per-query rate limiter. Each per-query limiter -// is configured using the `maxSeriesPerQuery` and `maxChunkBytesPerQuery` limits. -func NewPerQueryLimiter(maxSeriesPerQuery int, maxChunkBytesPerQuery int) *PerQueryLimiter { - return &PerQueryLimiter{ - uniqueSeriesMx: sync.RWMutex{}, - uniqueSeries: map[model.Fingerprint]struct{}{}, - - chunkBytesCount: atomic.NewInt32(0), - - maxChunkBytesPerQuery: maxChunkBytesPerQuery, - maxSeriesPerQuery: maxSeriesPerQuery, - } -} - -func NewPerQueryLimiterOnContext(ctx context.Context, maxSeriesPerQuery int, maxChunkBytesPerQuery int) context.Context { - return context.WithValue(ctx, pqlCtxKey, NewPerQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery)) -} - -func AddPerQueryLimiterToContext(ctx context.Context, limiter *PerQueryLimiter) context.Context { - return context.WithValue(ctx, pqlCtxKey, limiter) -} - -// PerQueryLimiterFromContext returns a Per Query Limiter from the current context. -// IF there is Per Query Limiter on the context we will return ??? -func PerQueryLimiterFromContext(ctx context.Context) *PerQueryLimiter { - //Create fallback limiter of a new limiter?? - return FromContextWithFallback(ctx) -} - -// FromContextWithFallback returns a Per Query Limiter from the current context. -// IF there is Per Query Limiter on the context we will return ??? -func FromContextWithFallback(ctx context.Context) *PerQueryLimiter { - pql, ok := ctx.Value(pqlCtxKey).(*PerQueryLimiter) - if !ok { - //If there's no limiter return a new unlimited limiter as a fallback - pql = NewPerQueryLimiter(0, 0) - } - return pql -} - -// AddFingerPrint Add a label adapter fast fingerprint to the map of unique fingerprints. If the -// added series label causes us to go over the limit of maxSeriesPerQuery we will -// return a validation error -func (pql *PerQueryLimiter) AddFingerPrint(labelAdapter []cortexpb.LabelAdapter, matchers []*labels.Matcher) error { - //If the max series is unlimited just return without managing map - if pql.maxSeriesPerQuery == 0 { - return nil - } - - pql.uniqueSeriesMx.Lock() - //Unlock after return - defer pql.uniqueSeriesMx.Unlock() - - pql.uniqueSeries[client.FastFingerprint(labelAdapter)] = struct{}{} - if len(pql.uniqueSeries) > pql.maxSeriesPerQuery { - //Format error with query and max limit - return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, util.LabelMatchersToString(matchers), pql.maxSeriesPerQuery)) - } - return nil -} - -// UniqueFingerPrints returns the count of unique series fingerprints seen by this per query limiter -func (pql *PerQueryLimiter) UniqueFingerPrints() int { - pql.uniqueSeriesMx.RLock() - defer pql.uniqueSeriesMx.RUnlock() - mapL := len(pql.uniqueSeries) - return mapL -} - -// AddChunkBytes add number of chunk bytes to running query total of bytes -func (pql *PerQueryLimiter) AddChunkBytes(bytesCount int32) error { - if pql.maxChunkBytesPerQuery == 0 { - return nil - } - - totalChunkBytes := pql.chunkBytesCount.Add(bytesCount) - if totalChunkBytes > int32(pql.maxChunkBytesPerQuery) { - //TODO format real error message here - return validation.LimitError("Too many samples") - } else { - return nil - } -} - -func (pql *PerQueryLimiter) ChunkBytesCount() int32 { - //Is there a better way to get a value here? - return pql.chunkBytesCount.Add(0) -} diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go new file mode 100644 index 0000000000..688651e4e1 --- /dev/null +++ b/pkg/util/limiter/query_limiter.go @@ -0,0 +1,89 @@ +package limiter + +import ( + "context" + "fmt" + "sync" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +type QueryLimiter struct { + uniqueSeriesMx sync.RWMutex + uniqueSeries map[model.Fingerprint]struct{} + + chunkBytesCount *atomic.Int32 + maxSeriesPerQuery int +} +type queryLimiterCtxKey struct{} + +var ( + qlCtxKey = &queryLimiterCtxKey{} + errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks %s (limit: %d)" +) + +// NewQueryLimiter makes a new per-query rate limiter. Each per-query limiter +// is configured using the `maxSeriesPerQuery` and `maxChunkBytesPerQuery` limits. +func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter { + return &QueryLimiter{ + uniqueSeriesMx: sync.RWMutex{}, + uniqueSeries: map[model.Fingerprint]struct{}{}, + + chunkBytesCount: atomic.NewInt32(0), + + maxSeriesPerQuery: maxSeriesPerQuery, + } +} + +func NewQueryLimiterOnContext(ctx context.Context, maxSeriesPerQuery int) context.Context { + return context.WithValue(ctx, qlCtxKey, NewQueryLimiter(maxSeriesPerQuery)) +} + +func AddQueryLimiterToContext(ctx context.Context, limiter *QueryLimiter) context.Context { + return context.WithValue(ctx, qlCtxKey, limiter) +} + +// QueryLimiterFromContextWithFallback returns a Query Limiter from the current context. +// IF there is Per Query Limiter on the context we will return a new no-op limiter +func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { + ql, ok := ctx.Value(qlCtxKey).(*QueryLimiter) + if !ok { + // If there's no limiter return a new unlimited limiter as a fallback + ql = NewQueryLimiter(0) + } + return ql +} + +// AddSeries Add labels for series to the count of unique series. If the +// added series label causes us to go over the limit of maxSeriesPerQuery we will +// return a validation error +func (ql *QueryLimiter) AddSeries(labelAdapter []cortexpb.LabelAdapter, matchers []*labels.Matcher) error { + // If the max series is unlimited just return without managing map + if ql.maxSeriesPerQuery == 0 { + return nil + } + + ql.uniqueSeriesMx.Lock() + defer ql.uniqueSeriesMx.Unlock() + + ql.uniqueSeries[client.FastFingerprint(labelAdapter)] = struct{}{} + if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { + // Format error with query and max limit + return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, util.LabelMatchersToString(matchers), ql.maxSeriesPerQuery)) + } + return nil +} + +// UniqueSeries returns the count of unique series seen by this query limiter. +func (ql *QueryLimiter) UniqueSeries() int { + ql.uniqueSeriesMx.RLock() + defer ql.uniqueSeriesMx.RUnlock() + return len(ql.uniqueSeries) +} diff --git a/pkg/util/limiter/per_query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go similarity index 62% rename from pkg/util/limiter/per_query_limiter_test.go rename to pkg/util/limiter/query_limiter_test.go index 2025c6934d..a52f3bc813 100644 --- a/pkg/util/limiter/per_query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -2,11 +2,13 @@ package limiter import ( "fmt" - "github.com/cortexproject/cortex/pkg/cortexpb" + "testing" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" + + "github.com/cortexproject/cortex/pkg/cortexpb" ) func TestPerQueryLimiter_AddFingerPrint(t *testing.T) { @@ -27,11 +29,11 @@ func TestPerQueryLimiter_AddFingerPrint(t *testing.T) { labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), } - limiter = NewPerQueryLimiter(100, 100) + limiter = NewQueryLimiter(100) ) - limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series1), matchers) - err := limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series2), matchers) - assert.Equal(t, 2, limiter.UniqueFingerPrints()) + limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1), matchers) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2), matchers) + assert.Equal(t, 2, limiter.UniqueSeries()) assert.Nil(t, err) } @@ -54,11 +56,11 @@ func TestPerQueryLimiter_AddFingerPrintExceedLimit(t *testing.T) { labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), } - limiter = NewPerQueryLimiter(1, 1) + limiter = NewQueryLimiter(1) ) - err := limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series1), matchers) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1), matchers) assert.Equal(t, nil, err) - err = limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(series2), matchers) + err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2), matchers) require.Error(t, err) } @@ -80,24 +82,10 @@ func BenchmarkPerQueryLimiter_AddFingerPrint(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - limiter := NewPerQueryLimiter(10000, 10000) + limiter := NewQueryLimiter(10000) for _, s := range series { - limiter.AddFingerPrint(cortexpb.FromLabelsToLabelAdapters(s), matchers) + limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s), matchers) } } } - -func TestPerQueryLimiter_AddChunkBytes(t *testing.T) { - limiter := NewPerQueryLimiter(100, 100) - err := limiter.AddChunkBytes(int32(10)) - assert.Equal(t, int32(10), limiter.ChunkBytesCount()) - assert.Nil(t, err) -} - -func TestPerQueryLimiter_AddChunkBytesExceedLimit(t *testing.T) { - limiter := NewPerQueryLimiter(100, 10) - err := limiter.AddChunkBytes(int32(11)) - assert.Equal(t, int32(11), limiter.ChunkBytesCount()) - require.Error(t, err) -} From ad0048560d2b76f8b7be74a433e27215b8511fd3 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Thu, 13 May 2021 16:54:25 -0500 Subject: [PATCH 04/13] Update docs Signed-off-by: Tyler Reid Signed-off-by: Tyler Reid --- docs/configuration/arguments.md | 4 ++++ docs/configuration/config-file-reference.md | 5 ++--- pkg/util/validation/limits.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index d738225f26..971f0a851a 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -483,6 +483,10 @@ Valid per-tenant limits are (with their corresponding flags for default values): Requires `-distributor.replication-factor`, `-distributor.shard-by-all-labels`, `-distributor.sharding-strategy` and `-distributor.zone-awareness-enabled` set for the ingesters too. - `max_series_per_query` / `-ingester.max-series-per-query` + + When running Cortex chunks storage: limit enforced when fetching metrics from ingesters only. + When running Cortex blocks storage: limit enforced when fetching metrics both from ingesters and store-gateways. + - `max_samples_per_query` / `-ingester.max-samples-per-query` Limits on the number of timeseries and samples returns by a single ingester during a query. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 240e81e2df..acc9cd433d 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3938,9 +3938,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s [metric_relabel_configs: | default = ] # The maximum number of series for which a query can fetch samples from each -# ingester. This limit is enforced only in the ingesters (when querying samples -# not flushed to the storage yet) and it's a per-instance limit. This limit is -# ignored when running the Cortex blocks storage. +# ingester and block storage. This limit is enforced in the ingesters and block +# storage and it's a per-instance limit. # CLI flag: -ingester.max-series-per-query [max_series_per_query: | default = 100000] diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index c30bf75164..ccb040187f 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -130,7 +130,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") - f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced in the ingesters and it's a per-instance limit.") + f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester and block storage. This limit is enforced in the ingesters and block storage and it's a per-instance limit.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return. This limit only applies when running the Cortex chunks storage with -querier.ingester-streaming=false.") f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") From 01e59eabc8804805fe19484b1724fc1fa1ebb229 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Thu, 13 May 2021 17:15:54 -0500 Subject: [PATCH 05/13] Add changelog for series per query limit Signed-off-by: Tyler Reid --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2df96e80b..45896edf29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ * `cortex_alertmanager_state_persist_failed_total` * [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124 * [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151 +* [ENHANCEMENT] Ingester/Block Storage: Added ingester and block storage support for `max_series_per_query` / `-ingester.max-series-per-query` + If cortex is running in chunk mode the series limit is only supported for fetched series from the ingester. If cortex is in blocks mode this + this limit is for fetched series from block storage and ingester. #4179 * [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128 * [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176 @@ -25,7 +28,6 @@ * [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828 - ## 1.9.0 in progress * [CHANGE] Fix for CVE-2021-31232: Local file disclosure vulnerability when `-experimental.alertmanager.enable-api` is used. The HTTP basic auth `password_file` can be used as an attack vector to send any file content via a webhook. The alertmanager templates can be used as an attack vector to send any file content because the alertmanager can load any text file specified in the templates list. #4129 From 4f31df86ff7ecb3f66af88afe54cf838629430f1 Mon Sep 17 00:00:00 2001 From: treid314 Date: Fri, 14 May 2021 12:43:41 -0500 Subject: [PATCH 06/13] Apply suggestions from code review Co-authored-by: Marco Pracucci Signed-off-by: Tyler Reid --- CHANGELOG.md | 4 +--- docs/configuration/arguments.md | 4 ++-- pkg/distributor/distributor_test.go | 2 +- pkg/util/limiter/query_limiter.go | 10 ++++------ pkg/util/limiter/query_limiter_test.go | 9 ++++----- 5 files changed, 12 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45896edf29..56fd1bdbf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,9 +18,7 @@ * `cortex_alertmanager_state_persist_failed_total` * [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124 * [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151 -* [ENHANCEMENT] Ingester/Block Storage: Added ingester and block storage support for `max_series_per_query` / `-ingester.max-series-per-query` - If cortex is running in chunk mode the series limit is only supported for fetched series from the ingester. If cortex is in blocks mode this - this limit is for fetched series from block storage and ingester. #4179 +* [ENHANCEMENT] Querier: Added `-ingester.max-series-per-query` support for blocks storage. When Cortex is running with blocks storage, the limit is enforced in the querier and applies both to data received from ingesters and store-gateway (long-term storage). #4179 * [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128 * [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 971f0a851a..04b27781b3 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -484,8 +484,8 @@ Valid per-tenant limits are (with their corresponding flags for default values): - `max_series_per_query` / `-ingester.max-series-per-query` - When running Cortex chunks storage: limit enforced when fetching metrics from ingesters only. - When running Cortex blocks storage: limit enforced when fetching metrics both from ingesters and store-gateways. + When running Cortex chunks storage: limit enforced in the ingesters only and it's a per-instance limit. + When running Cortex blocks storage: limit enforced in the queriers both on samples fetched from ingesters and store-gateways (long-term storage). - `max_samples_per_query` / `-ingester.max-samples-per-query` diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index d4eb2e778a..afee8c4969 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -963,7 +963,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac }) defer stopAll(ds, r) - // Push a number of series below the max series limit. Each series has 1 sample, + // Push a number of series below the max series limit. Each series has 1 sample. initialSeries := maxSeriesLimit writeReq := makeWriteRequest(0, initialSeries, 0) writeRes, err := ds[0].Push(ctx, writeReq) diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 688651e4e1..0ffd48a246 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -29,7 +29,7 @@ var ( errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks %s (limit: %d)" ) -// NewQueryLimiter makes a new per-query rate limiter. Each per-query limiter +// NewQueryLimiter makes a new per-query limiter. Each per-query limiter // is configured using the `maxSeriesPerQuery` and `maxChunkBytesPerQuery` limits. func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter { return &QueryLimiter{ @@ -50,8 +50,8 @@ func AddQueryLimiterToContext(ctx context.Context, limiter *QueryLimiter) contex return context.WithValue(ctx, qlCtxKey, limiter) } -// QueryLimiterFromContextWithFallback returns a Query Limiter from the current context. -// IF there is Per Query Limiter on the context we will return a new no-op limiter +// QueryLimiterFromContextWithFallback returns a QueryLimiter from the current context. +// If there is not a QueryLimiter on the context it will return a new no-op limiter. func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { ql, ok := ctx.Value(qlCtxKey).(*QueryLimiter) if !ok { @@ -61,9 +61,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { return ql } -// AddSeries Add labels for series to the count of unique series. If the -// added series label causes us to go over the limit of maxSeriesPerQuery we will -// return a validation error +// AddSeries adds the input series and returns an error if the limit is reached. func (ql *QueryLimiter) AddSeries(labelAdapter []cortexpb.LabelAdapter, matchers []*labels.Matcher) error { // If the max series is unlimited just return without managing map if ql.maxSeriesPerQuery == 0 { diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index a52f3bc813..72942433c9 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -11,7 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" ) -func TestPerQueryLimiter_AddFingerPrint(t *testing.T) { +func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing.T) { const ( metricName = "test_metric" ) @@ -35,10 +35,9 @@ func TestPerQueryLimiter_AddFingerPrint(t *testing.T) { err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2), matchers) assert.Equal(t, 2, limiter.UniqueSeries()) assert.Nil(t, err) - } -func TestPerQueryLimiter_AddFingerPrintExceedLimit(t *testing.T) { +func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) { const ( metricName = "test_metric" ) @@ -59,12 +58,12 @@ func TestPerQueryLimiter_AddFingerPrintExceedLimit(t *testing.T) { limiter = NewQueryLimiter(1) ) err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1), matchers) - assert.Equal(t, nil, err) + require.NoError(t, err) err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2), matchers) require.Error(t, err) } -func BenchmarkPerQueryLimiter_AddFingerPrint(b *testing.B) { +func BenchmarkQueryLimiter_AddSeries(b *testing.B) { const ( metricName = "test_metric" ) From f62d32d811771fd508e0c983a6db2ff245ebb136 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Fri, 14 May 2021 15:36:21 -0500 Subject: [PATCH 07/13] address review comments Signed-off-by: Tyler Reid --- pkg/distributor/distributor_test.go | 11 +++---- pkg/distributor/query.go | 6 ++-- pkg/querier/blocks_store_queryable.go | 2 +- pkg/querier/blocks_store_queryable_test.go | 2 +- pkg/querier/querier.go | 2 +- pkg/util/limiter/query_limiter.go | 35 ++++++++------------ pkg/util/limiter/query_limiter_test.go | 38 +++++++++------------- pkg/util/validation/limits.go | 3 +- 8 files changed, 39 insertions(+), 60 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index afee8c4969..f11898f614 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -951,8 +951,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac limits := &validation.Limits{} flagext.DefaultValues(limits) - limits.MaxSeriesPerQuery = maxSeriesLimit - ctx = limiter.NewQueryLimiterOnContext(ctx, maxSeriesLimit) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit)) // Prepare distributors. ds, _, r, _ := prepare(t, prepConfig{ numIngesters: 3, @@ -982,11 +981,9 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac // Push more series to exceed the limit once we'll query back all series. writeReq = &cortexpb.WriteRequest{} - for i := 0; i < initialSeries; i++ { - writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0), - ) - } + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series")}}, 0, 0), + ) writeRes, err = ds[0].Push(ctx, writeReq) assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 1c5ca60172..0641f05b27 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -233,14 +233,12 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re } } for _, series := range resp.Chunkseries { - limitErr := queryLimiter.AddSeries(series.Labels, matchers) - if limitErr != nil { + if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { return nil, limitErr } } for _, series := range resp.Timeseries { - limitErr := queryLimiter.AddSeries(series.Labels, matchers) - if limitErr != nil { + if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { return nil, limitErr } } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 6e69111738..6ed8030105 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -614,7 +614,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( mySeries = append(mySeries, s) // Add series fingerprint to query limiter; will return error if we are over the limit - limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()), matchers) + limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels())) if limitErr != nil { return limitErr } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index f03a022c2b..541a4c7b05 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -508,7 +508,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, limits: &blocksStoreLimitsMock{}, queryLimiter: limiter.NewQueryLimiter(1), - expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit while fetching chunks %s (limit: %d)", fmt.Sprintf("{__name__=%q}", metricName), 1)), + expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit while fetching chunks (limit: %d)", 1)), }, } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ba14a1a034..56ecd36274 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -224,7 +224,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, return nil, err } // Take the set tenant limits - ctx = limiter.NewQueryLimiterOnContext(ctx, limits.MaxSeriesPerQuery(userID)) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxSeriesPerQuery(userID))) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 0ffd48a246..f95adab56d 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -6,46 +6,37 @@ import ( "sync" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" - "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" ) +type queryLimiterCtxKey struct{} + +var ( + qlCtxKey = &queryLimiterCtxKey{} + errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks (limit: %d)" +) + type QueryLimiter struct { uniqueSeriesMx sync.RWMutex uniqueSeries map[model.Fingerprint]struct{} - chunkBytesCount *atomic.Int32 maxSeriesPerQuery int } -type queryLimiterCtxKey struct{} -var ( - qlCtxKey = &queryLimiterCtxKey{} - errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks %s (limit: %d)" -) - -// NewQueryLimiter makes a new per-query limiter. Each per-query limiter -// is configured using the `maxSeriesPerQuery` and `maxChunkBytesPerQuery` limits. +// NewQueryLimiter makes a new per-query limiter. Each query limiter +// is configured using the `maxSeriesPerQuery` limit. func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter { return &QueryLimiter{ uniqueSeriesMx: sync.RWMutex{}, uniqueSeries: map[model.Fingerprint]struct{}{}, - chunkBytesCount: atomic.NewInt32(0), - maxSeriesPerQuery: maxSeriesPerQuery, } } -func NewQueryLimiterOnContext(ctx context.Context, maxSeriesPerQuery int) context.Context { - return context.WithValue(ctx, qlCtxKey, NewQueryLimiter(maxSeriesPerQuery)) -} - func AddQueryLimiterToContext(ctx context.Context, limiter *QueryLimiter) context.Context { return context.WithValue(ctx, qlCtxKey, limiter) } @@ -62,7 +53,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { } // AddSeries adds the input series and returns an error if the limit is reached. -func (ql *QueryLimiter) AddSeries(labelAdapter []cortexpb.LabelAdapter, matchers []*labels.Matcher) error { +func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { // If the max series is unlimited just return without managing map if ql.maxSeriesPerQuery == 0 { return nil @@ -71,16 +62,16 @@ func (ql *QueryLimiter) AddSeries(labelAdapter []cortexpb.LabelAdapter, matchers ql.uniqueSeriesMx.Lock() defer ql.uniqueSeriesMx.Unlock() - ql.uniqueSeries[client.FastFingerprint(labelAdapter)] = struct{}{} + ql.uniqueSeries[client.FastFingerprint(seriesLabels)] = struct{}{} if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { // Format error with query and max limit - return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, util.LabelMatchersToString(matchers), ql.maxSeriesPerQuery)) + return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, ql.maxSeriesPerQuery)) } return nil } // UniqueSeries returns the count of unique series seen by this query limiter. -func (ql *QueryLimiter) UniqueSeries() int { +func (ql *QueryLimiter) uniqueSeriesCount() int { ql.uniqueSeriesMx.RLock() defer ql.uniqueSeriesMx.RUnlock() return len(ql.uniqueSeries) diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 72942433c9..aec1a24ab8 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -25,16 +25,17 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing labels.MetricName: metricName + "_2", "series2": "1", }) - matchers = []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), - } - limiter = NewQueryLimiter(100) ) - limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1), matchers) - err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2), matchers) - assert.Equal(t, 2, limiter.UniqueSeries()) - assert.Nil(t, err) + limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2)) + assert.NoError(t, err) + assert.Equal(t, 2, limiter.uniqueSeriesCount()) + + // Re-add previous series to make sure it's not double counted + limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + assert.NoError(t, err) + assert.Equal(t, 2, limiter.uniqueSeriesCount()) } func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) { @@ -51,15 +52,11 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) labels.MetricName: metricName + "_2", "series2": "1", }) - matchers = []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), - } - limiter = NewQueryLimiter(1) ) - err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1), matchers) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) require.NoError(t, err) - err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2), matchers) + err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2)) require.Error(t, err) } @@ -68,23 +65,18 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) { metricName = "test_metric" ) var series []labels.Labels - for i := 0; i < 10000; i++ { + for i := 0; i < b.N; i++ { series = append(series, labels.FromMap(map[string]string{ labels.MetricName: metricName + "_1", "series1": fmt.Sprint(i), })) } - matchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), - } b.ResetTimer() - for n := 0; n < b.N; n++ { - limiter := NewQueryLimiter(10000) - for _, s := range series { - limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s), matchers) - } + limiter := NewQueryLimiter(b.N + 1) + for _, s := range series { + limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) } } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index ccb040187f..2997531e8e 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -130,7 +130,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") - f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester and block storage. This limit is enforced in the ingesters and block storage and it's a per-instance limit.") + f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingesters and block storage. When running in chunk storage this limit is enforced in the ingesters and it's a per-instance limit.. " + + "When running in block storage mode this limit is enforced on the querier and counts series returned from ingesters and block storage as a per-query limit.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return. This limit only applies when running the Cortex chunks storage with -querier.ingester-streaming=false.") f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") From e2e1c6c0120c1b9fa54a61dd414988d086204889 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Mon, 17 May 2021 10:00:02 -0500 Subject: [PATCH 08/13] Added integration test for series per query limit Signed-off-by: Tyler Reid --- integration/querier_test.go | 78 +++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/integration/querier_test.go b/integration/querier_test.go index cb84cb3e41..1f5d91d817 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -889,6 +889,84 @@ func TestQuerierWithChunksStorage(t *testing.T) { assertServiceMetricsPrefixes(t, TableManager, tableManager) } +func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. TODO Add new flags + flags := mergeFlags(map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-querier.ingester-streaming": strconv.FormatBool(true), + "-querier.query-store-for-labels-enabled": "true", + "-ingester.max-series-per-query": "3", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached)) + + // Add the memcached address to the flags. + flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") + storeGateway := e2ecortex.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway)) + + // Start the querier with configuring store-gateway addresses if sharding is disabled. + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","), + }) + + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(querier)) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + series1Timestamp := time.Now() + series2Timestamp := time.Now() + series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series4Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + + series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) + series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) + series3, _ := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"}) + series4, _ := generateSeries("series_4", series4Timestamp, prompb.Label{Name: "series_4", Value: "series_4"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + result, err := c.QueryRange("series_*", time.Unix(0, 0), series3Timestamp.Add(1*time.Hour), blockRangePeriod) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series4) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + result, err = c.QueryRange("series_*", time.Unix(0, 0), series3Timestamp.Add(1*time.Hour), blockRangePeriod) + require.Error(t, err) +} + func TestHashCollisionHandling(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) From 04d51cd9ba219c60a27ff936096712f5fdb133e0 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Mon, 17 May 2021 14:34:22 -0500 Subject: [PATCH 09/13] Fix up integration test Signed-off-by: Tyler Reid --- integration/querier_test.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/integration/querier_test.go b/integration/querier_test.go index 1f5d91d817..15087585b8 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -897,13 +897,13 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { defer s.Close() // Configure the blocks storage to frequently compact TSDB head - // and ship blocks to the storage. TODO Add new flags - flags := mergeFlags(map[string]string{ + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), "-blocks-storage.tsdb.ship-interval": "1s", "-blocks-storage.bucket-store.sync-interval": "1s", "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), - "-querier.ingester-streaming": strconv.FormatBool(true), + "-querier.ingester-streaming": "true", "-querier.query-store-for-labels-enabled": "true", "-ingester.max-series-per-query": "3", }) @@ -936,9 +936,9 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { // Push some series to Cortex. series1Timestamp := time.Now() - series2Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2) - series4Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3) series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) @@ -948,23 +948,24 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { res, err := c.Push(series1) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - res, err = c.Push(series3) + res, err = c.Push(series2) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - result, err := c.QueryRange("series_*", time.Unix(0, 0), series3Timestamp.Add(1*time.Hour), blockRangePeriod) + result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod) require.NoError(t, err) - require.Equal(t, model.ValVector, result.Type()) + require.Equal(t, model.ValMatrix, result.Type()) - res, err = c.Push(series2) + res, err = c.Push(series3) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) res, err = c.Push(series4) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - result, err = c.QueryRange("series_*", time.Unix(0, 0), series3Timestamp.Add(1*time.Hour), blockRangePeriod) + result, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod) require.Error(t, err) + assert.Contains(t, err.Error(), "max number of series limit while") } func TestHashCollisionHandling(t *testing.T) { From 38005ddac3fc77924a29132c0b24b8e4db509821 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Mon, 17 May 2021 17:31:55 -0500 Subject: [PATCH 10/13] Add new config option querier.max-series-per-query and use that instead of ingester.max-series-per-query for series per query limit Signed-off-by: Tyler Reid --- CHANGELOG.md | 2 +- docs/configuration/config-file-reference.md | 12 ++++++++++-- integration/querier_test.go | 2 +- pkg/querier/querier.go | 2 +- pkg/util/validation/limits.go | 15 +++++++++++---- 5 files changed, 24 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56fd1bdbf8..58f8f662af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses` * [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168 * [FEATURE] Alertmanager: Added rate-limits to email notifier. Rate limits can be configured using `-alertmanager.email-notification-rate-limit` and `-alertmanager.email-notification-burst-size`. These limits are applied on individual alertmanagers. Rate-limited email notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 +* [FEATURE] Querier: Added new `-querier.max-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies both to series received from ingesters and store-gateway (long-term storage). #4179 * [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149 * `cortex_alertmanager_state_fetch_replica_state_total` * `cortex_alertmanager_state_fetch_replica_state_failed_total` @@ -18,7 +19,6 @@ * `cortex_alertmanager_state_persist_failed_total` * [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124 * [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151 -* [ENHANCEMENT] Querier: Added `-ingester.max-series-per-query` support for blocks storage. When Cortex is running with blocks storage, the limit is enforced in the querier and applies both to data received from ingesters and store-gateway (long-term storage). #4179 * [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128 * [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index acc9cd433d..d32494e7ad 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3938,8 +3938,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s [metric_relabel_configs: | default = ] # The maximum number of series for which a query can fetch samples from each -# ingester and block storage. This limit is enforced in the ingesters and block -# storage and it's a per-instance limit. +# ingester. This limit is enforced only in the ingesters (when querying samples +# not flushed to the storage yet) and it's a per-instance limit. This limit is +# ignored when running the Cortex blocks storage. # CLI flag: -ingester.max-series-per-query [max_series_per_query: | default = 100000] @@ -4011,6 +4012,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -querier.max-fetched-chunks-per-query [max_fetched_chunks_per_query: | default = 0] +# The maximum number of series for which a query can fetch samples from each +# ingesters and block storage. When running in block storage mode this limit is +# enforced on the querier and counts series returned from ingesters and block +# storage as a per-query limit. +# CLI flag: -querier.max-series-per-query +[max_fetched_series_per_query: | default = 0] + # Limit how long back data (series and metadata) can be queried, up until # duration ago. This limit is enforced in the query-frontend, querier # and ruler. If the requested time range is outside the allowed range, the diff --git a/integration/querier_test.go b/integration/querier_test.go index 15087585b8..4e080b748d 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -905,7 +905,7 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), "-querier.ingester-streaming": "true", "-querier.query-store-for-labels-enabled": "true", - "-ingester.max-series-per-query": "3", + "-querier.max-series-per-query": "3", }) // Start dependencies. diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 56ecd36274..0838bd6208 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -224,7 +224,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, return nil, err } // Take the set tenant limits - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxSeriesPerQuery(userID))) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID))) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 2997531e8e..132c2c486c 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -24,7 +24,7 @@ const ( GlobalIngestionRateStrategy = "global" ) -//LimitError are errors that do not comply with the limits specified. +// LimitError are errors that do not comply with the limits specified. type LimitError string func (e LimitError) Error() string { @@ -73,6 +73,7 @@ type Limits struct { // Querier enforced limits. MaxChunksPerQueryFromStore int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` // TODO Remove in Cortex 1.12. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` + MaxFetchedSeriesPerQuery int `yaml:"max_fetched_series_per_query" json:"max_fetched_series_per_query"` MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` @@ -130,8 +131,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") - f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingesters and block storage. When running in chunk storage this limit is enforced in the ingesters and it's a per-instance limit.. " + - "When running in block storage mode this limit is enforced on the querier and counts series returned from ingesters and block storage as a per-query limit.") + f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced only in the ingesters (when querying samples not flushed to the storage yet) and it's a per-instance limit. This limit is ignored when running the Cortex blocks storage.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return. This limit only applies when running the Cortex chunks storage with -querier.ingester-streaming=false.") f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") @@ -145,6 +145,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") + f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-series-per-query", 0, "The maximum number of series for which a query can fetch samples from each ingesters and block storage. When running in block storage mode this limit is enforced on the querier and counts series returned from ingesters and block storage as a per-query limit.") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.") @@ -364,12 +365,18 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore } -// MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching +// MaxChunksPerQueryFromIngesters returns the maximum number of chunks allowed per query when fetching // chunks from ingesters. func (o *Overrides) MaxChunksPerQueryFromIngesters(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQuery } +// MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching +// chunks from ingesters and block storage +func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int { + return o.getOverridesForUser(userID).MaxFetchedSeriesPerQuery +} + // MaxQueryLookback returns the max lookback period of queries. func (o *Overrides) MaxQueryLookback(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxQueryLookback) From 6ec0b09100f1aa76086914b3db45b31f31e71530 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Tue, 18 May 2021 12:31:02 -0500 Subject: [PATCH 11/13] Update docs to show new cli flag and other review changes Signed-off-by: Tyler Reid --- CHANGELOG.md | 2 +- docs/configuration/arguments.md | 7 +++---- docs/configuration/config-file-reference.md | 12 +++++------ integration/querier_test.go | 6 +++--- pkg/distributor/distributor_test.go | 8 +++---- pkg/distributor/query.go | 2 +- pkg/querier/blocks_store_queryable_test.go | 2 +- pkg/querier/querier.go | 2 +- pkg/util/limiter/query_limiter.go | 23 +++++++++++---------- pkg/util/limiter/query_limiter_test.go | 9 ++++---- pkg/util/validation/limits.go | 6 +++--- 11 files changed, 40 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58f8f662af..44039c630a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ - `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses` * [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168 * [FEATURE] Alertmanager: Added rate-limits to email notifier. Rate limits can be configured using `-alertmanager.email-notification-rate-limit` and `-alertmanager.email-notification-burst-size`. These limits are applied on individual alertmanagers. Rate-limited email notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 -* [FEATURE] Querier: Added new `-querier.max-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies both to series received from ingesters and store-gateway (long-term storage). #4179 +* [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179 * [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149 * `cortex_alertmanager_state_fetch_replica_state_total` * `cortex_alertmanager_state_fetch_replica_state_failed_total` diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 04b27781b3..73157318ba 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -484,18 +484,17 @@ Valid per-tenant limits are (with their corresponding flags for default values): - `max_series_per_query` / `-ingester.max-series-per-query` - When running Cortex chunks storage: limit enforced in the ingesters only and it's a per-instance limit. - When running Cortex blocks storage: limit enforced in the queriers both on samples fetched from ingesters and store-gateways (long-term storage). - - `max_samples_per_query` / `-ingester.max-samples-per-query` Limits on the number of timeseries and samples returns by a single ingester during a query. - `max_metadata_per_user` / `-ingester.max-metadata-per-user` - `max_metadata_per_metric` / `-ingester.max-metadata-per-metric` - Enforced by the ingesters; limits the number of active metadata a user (or a given metric) can have. When running with `-distributor.shard-by-all-labels=false` (the default), this limit will enforce the maximum number of metadata a metric can have 'globally', as all metadata for a single metric will be sent to the same replication set of ingesters. This is not the case when running with `-distributor.shard-by-all-labels=true`, so the actual limit will be N/RF times higher, where N is number of ingester replicas and RF is configured replication factor. +- `max_fetched_series_per_query` / `querier.max-fetched-series-per-query` + When running Cortex with blocks storage this limit is enforced in the queriers on unique series fetched from ingesters and store-gateways (long-term storage). + - `max_global_metadata_per_user` / `-ingester.max-global-metadata-per-user` - `max_global_metadata_per_metric` / `-ingester.max-global-metadata-per-metric` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d32494e7ad..daf146b645 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3940,7 +3940,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # The maximum number of series for which a query can fetch samples from each # ingester. This limit is enforced only in the ingesters (when querying samples # not flushed to the storage yet) and it's a per-instance limit. This limit is -# ignored when running the Cortex blocks storage. +# ignored when running the Cortex blocks storage. When running Cortex with +# blocks storage use -querier.max-fetched-series-per-query limit instead. # CLI flag: -ingester.max-series-per-query [max_series_per_query: | default = 100000] @@ -4012,11 +4013,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -querier.max-fetched-chunks-per-query [max_fetched_chunks_per_query: | default = 0] -# The maximum number of series for which a query can fetch samples from each -# ingesters and block storage. When running in block storage mode this limit is -# enforced on the querier and counts series returned from ingesters and block -# storage as a per-query limit. -# CLI flag: -querier.max-series-per-query +# The maximum number of unique series for which a query can fetch samples from +# each ingesters and blocks storage. This limit is enforced in the querier only +# when running Cortex with blocks storage. 0 to disable +# CLI flag: -querier.max-fetched-series-per-query [max_fetched_series_per_query: | default = 0] # Limit how long back data (series and metadata) can be queried, up until diff --git a/integration/querier_test.go b/integration/querier_test.go index 4e080b748d..19ebdf0b8a 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -905,7 +905,7 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), "-querier.ingester-streaming": "true", "-querier.query-store-for-labels-enabled": "true", - "-querier.max-series-per-query": "3", + "-querier.max-fetched-series-per-query": "3", }) // Start dependencies. @@ -963,9 +963,9 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - result, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod) + _, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod) require.Error(t, err) - assert.Contains(t, err.Error(), "max number of series limit while") + assert.Contains(t, err.Error(), "max number of series limit") } func TestHashCollisionHandling(t *testing.T) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index f11898f614..e3ddaa6718 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -962,7 +962,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac }) defer stopAll(ds, r) - // Push a number of series below the max series limit. Each series has 1 sample. + // Push a number of series below the max series limit. initialSeries := maxSeriesLimit writeReq := makeWriteRequest(0, initialSeries, 0) writeRes, err := ds[0].Push(ctx, writeReq) @@ -982,7 +982,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac // Push more series to exceed the limit once we'll query back all series. writeReq = &cortexpb.WriteRequest{} writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series")}}, 0, 0), + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0), ) writeRes, err = ds[0].Push(ctx, writeReq) @@ -993,7 +993,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac // a query running on all series to fail. _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) require.Error(t, err) - assert.Contains(t, err.Error(), "max number of series limit while") + assert.Contains(t, err.Error(), "max number of series limit") } func TestDistributor_Push_LabelRemoval(t *testing.T) { @@ -2004,7 +2004,7 @@ func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLa Timeseries: []cortexpb.PreallocTimeseries{ { TimeSeries: &cortexpb.TimeSeries{ - //Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}}, + // Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}}, Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)), Exemplars: []cortexpb.Exemplar{ { diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 0641f05b27..5e36aec06f 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -191,7 +191,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) chunksCount = atomic.Int32{} queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) - matchers, _ = ingester_client.FromLabelMatchers(req.Matchers) ) // Fetch samples from multiple ingesters @@ -229,6 +228,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re // We expect to be always able to convert the label matchers back to Prometheus ones. // In case we fail (unexpected) the error will not include the matchers, but the core // logic doesn't break. + matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit)) } } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 541a4c7b05..d7bb456836 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -508,7 +508,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, limits: &blocksStoreLimitsMock{}, queryLimiter: limiter.NewQueryLimiter(1), - expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit while fetching chunks (limit: %d)", 1)), + expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit (limit: %d)", 1)), }, } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 0838bd6208..3bd89a767f 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -223,7 +223,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, if err != nil { return nil, err } - // Take the set tenant limits + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID))) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index f95adab56d..9560c6c12e 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -15,12 +15,12 @@ import ( type queryLimiterCtxKey struct{} var ( - qlCtxKey = &queryLimiterCtxKey{} - errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks (limit: %d)" + ctxKey = &queryLimiterCtxKey{} + errMaxSeriesHit = "The query hit the max number of series limit (limit: %d)" ) type QueryLimiter struct { - uniqueSeriesMx sync.RWMutex + uniqueSeriesMx sync.Mutex uniqueSeries map[model.Fingerprint]struct{} maxSeriesPerQuery int @@ -30,7 +30,7 @@ type QueryLimiter struct { // is configured using the `maxSeriesPerQuery` limit. func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter { return &QueryLimiter{ - uniqueSeriesMx: sync.RWMutex{}, + uniqueSeriesMx: sync.Mutex{}, uniqueSeries: map[model.Fingerprint]struct{}{}, maxSeriesPerQuery: maxSeriesPerQuery, @@ -38,13 +38,13 @@ func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter { } func AddQueryLimiterToContext(ctx context.Context, limiter *QueryLimiter) context.Context { - return context.WithValue(ctx, qlCtxKey, limiter) + return context.WithValue(ctx, ctxKey, limiter) } // QueryLimiterFromContextWithFallback returns a QueryLimiter from the current context. // If there is not a QueryLimiter on the context it will return a new no-op limiter. func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { - ql, ok := ctx.Value(qlCtxKey).(*QueryLimiter) + ql, ok := ctx.Value(ctxKey).(*QueryLimiter) if !ok { // If there's no limiter return a new unlimited limiter as a fallback ql = NewQueryLimiter(0) @@ -58,21 +58,22 @@ func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { if ql.maxSeriesPerQuery == 0 { return nil } + fingerprint := client.FastFingerprint(seriesLabels) ql.uniqueSeriesMx.Lock() defer ql.uniqueSeriesMx.Unlock() - ql.uniqueSeries[client.FastFingerprint(seriesLabels)] = struct{}{} + ql.uniqueSeries[fingerprint] = struct{}{} if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { - // Format error with query and max limit + // Format error with max limit return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, ql.maxSeriesPerQuery)) } return nil } -// UniqueSeries returns the count of unique series seen by this query limiter. +// uniqueSeriesCount returns the count of unique series seen by this query limiter. func (ql *QueryLimiter) uniqueSeriesCount() int { - ql.uniqueSeriesMx.RLock() - defer ql.uniqueSeriesMx.RUnlock() + ql.uniqueSeriesMx.Lock() + defer ql.uniqueSeriesMx.Unlock() return len(ql.uniqueSeries) } diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index aec1a24ab8..2ba521e669 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -27,13 +27,13 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing }) limiter = NewQueryLimiter(100) ) - limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) - err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2)) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2)) assert.NoError(t, err) assert.Equal(t, 2, limiter.uniqueSeriesCount()) // Re-add previous series to make sure it's not double counted - limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) assert.NoError(t, err) assert.Equal(t, 2, limiter.uniqueSeriesCount()) } @@ -76,7 +76,8 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) { limiter := NewQueryLimiter(b.N + 1) for _, s := range series { - limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) + assert.NoError(b, err) } } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 132c2c486c..ec08343d88 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -131,7 +131,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") - f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced only in the ingesters (when querying samples not flushed to the storage yet) and it's a per-instance limit. This limit is ignored when running the Cortex blocks storage.") + f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced only in the ingesters (when querying samples not flushed to the storage yet) and it's a per-instance limit. This limit is ignored when running the Cortex blocks storage. When running Cortex with blocks storage use -querier.max-fetched-series-per-query limit instead.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return. This limit only applies when running the Cortex chunks storage with -querier.ingester-streaming=false.") f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") @@ -145,7 +145,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") - f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-series-per-query", 0, "The maximum number of series for which a query can fetch samples from each ingesters and block storage. When running in block storage mode this limit is enforced on the querier and counts series returned from ingesters and block storage as a per-query limit.") + f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.") @@ -372,7 +372,7 @@ func (o *Overrides) MaxChunksPerQueryFromIngesters(userID string) int { } // MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching -// chunks from ingesters and block storage +// chunks from ingesters and blocks storage. func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxFetchedSeriesPerQuery } From 2b8cb1de5146278c0a6331572961b97afa1d420a Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Wed, 19 May 2021 08:59:07 -0500 Subject: [PATCH 12/13] Check error to resolve linter issue Signed-off-by: Tyler Reid --- pkg/util/limiter/query_limiter_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 2ba521e669..2fafd6bac3 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -28,6 +28,7 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing limiter = NewQueryLimiter(100) ) err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + assert.NoError(t, err) err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2)) assert.NoError(t, err) assert.Equal(t, 2, limiter.uniqueSeriesCount()) From 29a0fcffdc8ebc19bdb8d87213b1a19a1ba6212a Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 20 May 2021 12:40:16 +0200 Subject: [PATCH 13/13] Clean white noise Signed-off-by: Marco Pracucci --- docs/configuration/arguments.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 73157318ba..ec2f882b36 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -483,7 +483,7 @@ Valid per-tenant limits are (with their corresponding flags for default values): Requires `-distributor.replication-factor`, `-distributor.shard-by-all-labels`, `-distributor.sharding-strategy` and `-distributor.zone-awareness-enabled` set for the ingesters too. - `max_series_per_query` / `-ingester.max-series-per-query` - + - `max_samples_per_query` / `-ingester.max-samples-per-query` Limits on the number of timeseries and samples returns by a single ingester during a query.