diff --git a/CHANGELOG.md b/CHANGELOG.md index c6ddd95b5d..0be6daed1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - `-alertmanager.receivers-firewall.block.cidr-networks` renamed to `-alertmanager.receivers-firewall-block-cidr-networks` - `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses` * [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168 +* [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179 * [FEATURE] Alertmanager: Added rate-limits to notifiers. Rate limits used by all integrations can be configured using `-alertmanager.notification-rate-limit`, while per-integration rate limits can be specified via `-alertmanager.notification-rate-limit-per-integration` parameter. Both shared and per-integration limits can be overwritten using overrides mechanism. These limits are applied on individual (per-tenant) alertmanagers. Rate-limited notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 #4163 * [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149 * `cortex_alertmanager_state_fetch_replica_state_total` diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index d738225f26..ec2f882b36 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -483,15 +483,18 @@ Valid per-tenant limits are (with their corresponding flags for default values): Requires `-distributor.replication-factor`, `-distributor.shard-by-all-labels`, `-distributor.sharding-strategy` and `-distributor.zone-awareness-enabled` set for the ingesters too. - `max_series_per_query` / `-ingester.max-series-per-query` + - `max_samples_per_query` / `-ingester.max-samples-per-query` Limits on the number of timeseries and samples returns by a single ingester during a query. - `max_metadata_per_user` / `-ingester.max-metadata-per-user` - `max_metadata_per_metric` / `-ingester.max-metadata-per-metric` - Enforced by the ingesters; limits the number of active metadata a user (or a given metric) can have. When running with `-distributor.shard-by-all-labels=false` (the default), this limit will enforce the maximum number of metadata a metric can have 'globally', as all metadata for a single metric will be sent to the same replication set of ingesters. This is not the case when running with `-distributor.shard-by-all-labels=true`, so the actual limit will be N/RF times higher, where N is number of ingester replicas and RF is configured replication factor. +- `max_fetched_series_per_query` / `querier.max-fetched-series-per-query` + When running Cortex with blocks storage this limit is enforced in the queriers on unique series fetched from ingesters and store-gateways (long-term storage). + - `max_global_metadata_per_user` / `-ingester.max-global-metadata-per-user` - `max_global_metadata_per_metric` / `-ingester.max-global-metadata-per-metric` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 1c0283a05b..28b2222b41 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3940,7 +3940,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # The maximum number of series for which a query can fetch samples from each # ingester. This limit is enforced only in the ingesters (when querying samples # not flushed to the storage yet) and it's a per-instance limit. This limit is -# ignored when running the Cortex blocks storage. +# ignored when running the Cortex blocks storage. When running Cortex with +# blocks storage use -querier.max-fetched-series-per-query limit instead. # CLI flag: -ingester.max-series-per-query [max_series_per_query: | default = 100000] @@ -4012,6 +4013,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -querier.max-fetched-chunks-per-query [max_fetched_chunks_per_query: | default = 0] +# The maximum number of unique series for which a query can fetch samples from +# each ingesters and blocks storage. This limit is enforced in the querier only +# when running Cortex with blocks storage. 0 to disable +# CLI flag: -querier.max-fetched-series-per-query +[max_fetched_series_per_query: | default = 0] + # Limit how long back data (series and metadata) can be queried, up until # duration ago. This limit is enforced in the query-frontend, querier # and ruler. If the requested time range is outside the allowed range, the diff --git a/integration/querier_test.go b/integration/querier_test.go index cb84cb3e41..19ebdf0b8a 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -889,6 +889,85 @@ func TestQuerierWithChunksStorage(t *testing.T) { assertServiceMetricsPrefixes(t, TableManager, tableManager) } +func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-querier.ingester-streaming": "true", + "-querier.query-store-for-labels-enabled": "true", + "-querier.max-fetched-series-per-query": "3", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached)) + + // Add the memcached address to the flags. + flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") + storeGateway := e2ecortex.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway)) + + // Start the querier with configuring store-gateway addresses if sharding is disabled. + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","), + }) + + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(querier)) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3) + + series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) + series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) + series3, _ := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"}) + series4, _ := generateSeries("series_4", series4Timestamp, prompb.Label{Name: "series_4", Value: "series_4"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) + + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series4) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + _, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod) + require.Error(t, err) + assert.Contains(t, err.Error(), "max number of series limit") +} + func TestHashCollisionHandling(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 98364cc36f..e3ddaa6718 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -40,6 +40,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" util_math "github.com/cortexproject/cortex/pkg/util/math" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -945,6 +946,56 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac assert.Contains(t, err.Error(), "the query hit the max number of chunks limit") } +func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) { + const maxSeriesLimit = 10 + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit)) + // Prepare distributors. + ds, _, r, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + }) + defer stopAll(ds, r) + + // Push a number of series below the max series limit. + initialSeries := maxSeriesLimit + writeReq := makeWriteRequest(0, initialSeries, 0) + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } + + // Since the number of series is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, initialSeries) + + // Push more series to exceed the limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0), + ) + + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + // Since the number of series is exceeding the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Contains(t, err.Error(), "max number of series limit") +} + func TestDistributor_Push_LabelRemoval(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "user") @@ -1953,7 +2004,7 @@ func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLa Timeseries: []cortexpb.PreallocTimeseries{ { TimeSeries: &cortexpb.TimeSeries{ - //Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}}, + // Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}}, Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)), Exemplars: []cortexpb.Exemplar{ { diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 0b20d02312..5e36aec06f 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" grpc_util "github.com/cortexproject/cortex/pkg/util/grpc" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -187,8 +188,9 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re // queryIngesterStream queries the ingesters using the new streaming API. func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { var ( - chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) - chunksCount = atomic.Int32{} + chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) + chunksCount = atomic.Int32{} + queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) // Fetch samples from multiple ingesters @@ -230,6 +232,16 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit)) } } + for _, series := range resp.Chunkseries { + if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { + return nil, limitErr + } + } + for _, series := range resp.Timeseries { + if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { + return nil, limitErr + } + } result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) result.Timeseries = append(result.Timeseries, resp.Timeseries...) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 072647a486..6ed8030105 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -27,6 +27,7 @@ import ( "golang.org/x/sync/errgroup" grpc_metadata "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" @@ -37,6 +38,7 @@ import ( "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/math" "github.com/cortexproject/cortex/pkg/util/services" @@ -423,7 +425,6 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* if maxChunksLimit > 0 { leftChunksLimit -= numChunks } - resultMtx.Unlock() return queriedBlocks, nil @@ -563,6 +564,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( queriedBlocks = []ulid.ULID(nil) numChunks = atomic.NewInt32(0) spanLog = spanlogger.FromContext(ctx) + queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) // Concurrently fetch series from all clients. @@ -611,6 +613,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if s := resp.GetSeries(); s != nil { mySeries = append(mySeries, s) + // Add series fingerprint to query limiter; will return error if we are over the limit + limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels())) + if limitErr != nil { + return limitErr + } + // Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled). if maxChunksLimit > 0 { actual := numChunks.Add(int32(len(s.Chunks))) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index a777d5041c..d7bb456836 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -31,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -43,13 +44,14 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ) var ( - block1 = ulid.MustNew(1, nil) - block2 = ulid.MustNew(2, nil) - block3 = ulid.MustNew(3, nil) - block4 = ulid.MustNew(4, nil) - metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} - series1Label = labels.Label{Name: "series", Value: "1"} - series2Label = labels.Label{Name: "series", Value: "2"} + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} + series1Label = labels.Label{Name: "series", Value: "1"} + series2Label = labels.Label{Name: "series", Value: "2"} + noOpQueryLimiter = limiter.NewQueryLimiter(0) ) type valueResult struct { @@ -67,6 +69,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { finderErr error storeSetResponses []interface{} limits BlocksStoreLimits + queryLimiter *limiter.QueryLimiter expectedSeries []seriesResult expectedErr error expectedMetrics string @@ -74,12 +77,14 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { "no block in the storage matching the query time range": { finderResult: nil, limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedErr: nil, }, "error while finding blocks matching the query time range": { - finderErr: errors.New("unable to find blocks"), - limits: &blocksStoreLimitsMock{}, - expectedErr: errors.New("unable to find blocks"), + finderErr: errors.New("unable to find blocks"), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: errors.New("unable to find blocks"), }, "error while getting clients to query the store-gateway": { finderResult: bucketindex.Blocks{ @@ -89,8 +94,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ errors.New("no client found"), }, - limits: &blocksStoreLimitsMock{}, - expectedErr: errors.New("no client found"), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: errors.New("no client found"), }, "a single store-gateway instance holds the required blocks (single returned series)": { finderResult: bucketindex.Blocks{ @@ -106,7 +112,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{}, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -132,7 +139,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{}, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -165,7 +173,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block2}, }, }, - limits: &blocksStoreLimitsMock{}, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -194,7 +203,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block2}, }, }, - limits: &blocksStoreLimitsMock{}, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel), @@ -229,7 +239,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block3}, }, }, - limits: &blocksStoreLimitsMock{}, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -290,8 +301,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { // Second attempt returns an error because there are no other store-gateways left. errors.New("no store-gateway remaining after exclude"), }, - limits: &blocksStoreLimitsMock{}, - expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s", block2.String()), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s", block2.String()), }, "multiple store-gateway instances have some missing blocks (consistency check failed)": { finderResult: bucketindex.Blocks{ @@ -315,8 +327,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { // Second attempt returns an error because there are no other store-gateways left. errors.New("no store-gateway remaining after exclude"), }, - limits: &blocksStoreLimitsMock{}, - expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), }, "multiple store-gateway instances have some missing blocks but queried from a replica during subsequent attempts": { finderResult: bucketindex.Blocks{ @@ -352,7 +365,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block4}, }, }, - limits: &blocksStoreLimitsMock{}, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -410,7 +424,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + queryLimiter: noOpQueryLimiter, expectedSeries: []seriesResult{ { lbls: labels.New(metricNameLabel, series1Label), @@ -435,8 +450,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block1, block2}, }, }, - limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, - expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), + limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { finderResult: bucketindex.Blocks{ @@ -472,14 +488,33 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }}: {block4}, }, }, - limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, - expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + }, + "max series per query limit hit while fetching chunks": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 2), + mockHintsResponse(block1, block2), + }}: {block1, block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: limiter.NewQueryLimiter(1), + expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit (limit: %d)", 1)), }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - ctx := context.Background() + ctx := limiter.AddQueryLimiterToContext(context.Background(), testData.queryLimiter) reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index dc1e1e1f3a..3bd89a767f 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -29,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -155,7 +156,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor QueryStoreAfter: cfg.QueryStoreAfter, } } - queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits, tombstonesLoader) lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) { @@ -224,6 +224,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, return nil, err } + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID))) + mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { return storage.NoopQuerier(), nil diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go new file mode 100644 index 0000000000..9560c6c12e --- /dev/null +++ b/pkg/util/limiter/query_limiter.go @@ -0,0 +1,79 @@ +package limiter + +import ( + "context" + "fmt" + "sync" + + "github.com/prometheus/common/model" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +type queryLimiterCtxKey struct{} + +var ( + ctxKey = &queryLimiterCtxKey{} + errMaxSeriesHit = "The query hit the max number of series limit (limit: %d)" +) + +type QueryLimiter struct { + uniqueSeriesMx sync.Mutex + uniqueSeries map[model.Fingerprint]struct{} + + maxSeriesPerQuery int +} + +// NewQueryLimiter makes a new per-query limiter. Each query limiter +// is configured using the `maxSeriesPerQuery` limit. +func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter { + return &QueryLimiter{ + uniqueSeriesMx: sync.Mutex{}, + uniqueSeries: map[model.Fingerprint]struct{}{}, + + maxSeriesPerQuery: maxSeriesPerQuery, + } +} + +func AddQueryLimiterToContext(ctx context.Context, limiter *QueryLimiter) context.Context { + return context.WithValue(ctx, ctxKey, limiter) +} + +// QueryLimiterFromContextWithFallback returns a QueryLimiter from the current context. +// If there is not a QueryLimiter on the context it will return a new no-op limiter. +func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { + ql, ok := ctx.Value(ctxKey).(*QueryLimiter) + if !ok { + // If there's no limiter return a new unlimited limiter as a fallback + ql = NewQueryLimiter(0) + } + return ql +} + +// AddSeries adds the input series and returns an error if the limit is reached. +func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { + // If the max series is unlimited just return without managing map + if ql.maxSeriesPerQuery == 0 { + return nil + } + fingerprint := client.FastFingerprint(seriesLabels) + + ql.uniqueSeriesMx.Lock() + defer ql.uniqueSeriesMx.Unlock() + + ql.uniqueSeries[fingerprint] = struct{}{} + if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { + // Format error with max limit + return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, ql.maxSeriesPerQuery)) + } + return nil +} + +// uniqueSeriesCount returns the count of unique series seen by this query limiter. +func (ql *QueryLimiter) uniqueSeriesCount() int { + ql.uniqueSeriesMx.Lock() + defer ql.uniqueSeriesMx.Unlock() + return len(ql.uniqueSeries) +} diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go new file mode 100644 index 0000000000..2fafd6bac3 --- /dev/null +++ b/pkg/util/limiter/query_limiter_test.go @@ -0,0 +1,84 @@ +package limiter + +import ( + "fmt" + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing.T) { + const ( + metricName = "test_metric" + ) + + var ( + series1 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": "1", + }) + series2 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_2", + "series2": "1", + }) + limiter = NewQueryLimiter(100) + ) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + assert.NoError(t, err) + err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2)) + assert.NoError(t, err) + assert.Equal(t, 2, limiter.uniqueSeriesCount()) + + // Re-add previous series to make sure it's not double counted + err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + assert.NoError(t, err) + assert.Equal(t, 2, limiter.uniqueSeriesCount()) +} + +func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) { + const ( + metricName = "test_metric" + ) + + var ( + series1 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": "1", + }) + series2 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_2", + "series2": "1", + }) + limiter = NewQueryLimiter(1) + ) + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) + require.NoError(t, err) + err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2)) + require.Error(t, err) +} + +func BenchmarkQueryLimiter_AddSeries(b *testing.B) { + const ( + metricName = "test_metric" + ) + var series []labels.Labels + for i := 0; i < b.N; i++ { + series = append(series, + labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": fmt.Sprint(i), + })) + } + b.ResetTimer() + + limiter := NewQueryLimiter(b.N + 1) + for _, s := range series { + err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) + assert.NoError(b, err) + } + +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 7d70dd5cbe..a2117edfd6 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -25,7 +25,7 @@ const ( GlobalIngestionRateStrategy = "global" ) -//LimitError are errors that do not comply with the limits specified. +// LimitError are errors that do not comply with the limits specified. type LimitError string func (e LimitError) Error() string { @@ -74,6 +74,7 @@ type Limits struct { // Querier enforced limits. MaxChunksPerQueryFromStore int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` // TODO Remove in Cortex 1.12. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` + MaxFetchedSeriesPerQuery int `yaml:"max_fetched_series_per_query" json:"max_fetched_series_per_query"` MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` @@ -131,7 +132,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") - f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced only in the ingesters (when querying samples not flushed to the storage yet) and it's a per-instance limit. This limit is ignored when running the Cortex blocks storage.") + f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced only in the ingesters (when querying samples not flushed to the storage yet) and it's a per-instance limit. This limit is ignored when running the Cortex blocks storage. When running Cortex with blocks storage use -querier.max-fetched-series-per-query limit instead.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return. This limit only applies when running the Cortex chunks storage with -querier.ingester-streaming=false.") f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") @@ -145,6 +146,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") + f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.") @@ -380,12 +382,18 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore } -// MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching +// MaxChunksPerQueryFromIngesters returns the maximum number of chunks allowed per query when fetching // chunks from ingesters. func (o *Overrides) MaxChunksPerQueryFromIngesters(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQuery } +// MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching +// chunks from ingesters and blocks storage. +func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int { + return o.getOverridesForUser(userID).MaxFetchedSeriesPerQuery +} + // MaxQueryLookback returns the max lookback period of queries. func (o *Overrides) MaxQueryLookback(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxQueryLookback)