Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Max Series Per Query for block storage and streaming ingesters #4179

Merged
merged 16 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses`
* [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168
* [FEATURE] Alertmanager: Added rate-limits to email notifier. Rate limits can be configured using `-alertmanager.email-notification-rate-limit` and `-alertmanager.email-notification-burst-size`. These limits are applied on individual alertmanagers. Rate-limited email notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135
* [FEATURE] Querier: Added new `-querier.max-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies both to series received from ingesters and store-gateway (long-term storage). #4179
* [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149
* `cortex_alertmanager_state_fetch_replica_state_total`
* `cortex_alertmanager_state_fetch_replica_state_failed_total`
Expand All @@ -25,7 +26,6 @@

* [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828


## 1.9.0 in progress

* [CHANGE] Fix for CVE-2021-31232: Local file disclosure vulnerability when `-experimental.alertmanager.enable-api` is used. The HTTP basic auth `password_file` can be used as an attack vector to send any file content via a webhook. The alertmanager templates can be used as an attack vector to send any file content because the alertmanager can load any text file specified in the templates list. #4129
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ Valid per-tenant limits are (with their corresponding flags for default values):
Requires `-distributor.replication-factor`, `-distributor.shard-by-all-labels`, `-distributor.sharding-strategy` and `-distributor.zone-awareness-enabled` set for the ingesters too.

- `max_series_per_query` / `-ingester.max-series-per-query`

When running Cortex chunks storage: limit enforced in the ingesters only and it's a per-instance limit.
When running Cortex blocks storage: limit enforced in the queriers both on samples fetched from ingesters and store-gateways (long-term storage).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no more true. Please update the doc accordingly.


- `max_samples_per_query` / `-ingester.max-samples-per-query`

Limits on the number of timeseries and samples returns by a single ingester during a query.
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4012,6 +4012,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -querier.max-fetched-chunks-per-query
[max_fetched_chunks_per_query: <int> | default = 0]

# The maximum number of series for which a query can fetch samples from each
# ingesters and block storage. When running in block storage mode this limit is
# enforced on the querier and counts series returned from ingesters and block
# storage as a per-query limit.
# CLI flag: -querier.max-series-per-query
[max_fetched_series_per_query: <int> | default = 0]

# Limit how long back data (series and metadata) can be queried, up until
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
# and ruler. If the requested time range is outside the allowed range, the
Expand Down
79 changes: 79 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,85 @@ func TestQuerierWithChunksStorage(t *testing.T) {
assertServiceMetricsPrefixes(t, TableManager, tableManager)
}

func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-querier.ingester-streaming": "true",
"-querier.query-store-for-labels-enabled": "true",
"-querier.max-series-per-query": "3",
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))

// Add the memcached address to the flags.
flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
storeGateway := e2ecortex.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway))

// Start the querier with configuring store-gateway addresses if sharding is disabled.
flags = mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
})

querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(querier))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3)

series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
series3, _ := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"})
series4, _ := generateSeries("series_4", series4Timestamp, prompb.Label{Name: "series_4", Value: "series_4"})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, result.Type())

res, err = c.Push(series3)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series4)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

result, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod)
require.Error(t, err)
assert.Contains(t, err.Error(), "max number of series limit while")
}

func TestHashCollisionHandling(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down
51 changes: 51 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
Expand Down Expand Up @@ -945,6 +946,56 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
assert.Contains(t, err.Error(), "the query hit the max number of chunks limit")
}

func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) {
const maxSeriesLimit = 10

limits := &validation.Limits{}
flagext.DefaultValues(limits)
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit))
// Prepare distributors.
ds, _, r, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
})
defer stopAll(ds, r)

// Push a number of series below the max series limit. Each series has 1 sample.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] The "Each series has 1 sample." comment is irrelevant here.

Suggested change
// Push a number of series below the max series limit. Each series has 1 sample.
// Push a number of series below the max series limit.

initialSeries := maxSeriesLimit
writeReq := makeWriteRequest(0, initialSeries, 0)
writeRes, err := ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

allSeriesMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
}

// Since the number of series is equal to the limit (but doesn't
// exceed it), we expect a query running on all series to succeed.
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.NoError(t, err)
assert.Len(t, queryRes.Chunkseries, initialSeries)

// Push more series to exceed the limit once we'll query back all series.
pracucci marked this conversation as resolved.
Show resolved Hide resolved
writeReq = &cortexpb.WriteRequest{}
writeReq.Timeseries = append(writeReq.Timeseries,
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series")}}, 0, 0),
)

writeRes, err = ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

// Since the number of series is exceeding the limit, we expect
// a query running on all series to fail.
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.Error(t, err)
assert.Contains(t, err.Error(), "max number of series limit while")
}

func TestDistributor_Push_LabelRemoval(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

Expand Down
18 changes: 15 additions & 3 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
grpc_util "github.com/cortexproject/cortex/pkg/util/grpc"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand Down Expand Up @@ -187,8 +188,10 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
chunksCount = atomic.Int32{}
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
chunksCount = atomic.Int32{}
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
matchers, _ = ingester_client.FromLabelMatchers(req.Matchers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this back to the original place. No more required to have it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line was moved from other place, but comment associated with the line was left at original place. Plus, as Marco points out, this is only used when building error message.

)

// Fetch samples from multiple ingesters
Expand Down Expand Up @@ -226,10 +229,19 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re
// We expect to be always able to convert the label matchers back to Prometheus ones.
// In case we fail (unexpected) the error will not include the matchers, but the core
// logic doesn't break.
matchers, _ := ingester_client.FromLabelMatchers(req.Matchers)
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
}
}
for _, series := range resp.Chunkseries {
pracucci marked this conversation as resolved.
Show resolved Hide resolved
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, limitErr
}
}
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, limitErr
}
}

result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
Expand Down
10 changes: 9 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/sync/errgroup"
grpc_metadata "google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -423,7 +425,6 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
if maxChunksLimit > 0 {
leftChunksLimit -= numChunks
}

resultMtx.Unlock()

return queriedBlocks, nil
Expand Down Expand Up @@ -563,6 +564,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
queriedBlocks = []ulid.ULID(nil)
numChunks = atomic.NewInt32(0)
spanLog = spanlogger.FromContext(ctx)
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)

// Concurrently fetch series from all clients.
Expand Down Expand Up @@ -611,6 +613,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if s := resp.GetSeries(); s != nil {
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()))
if limitErr != nil {
return limitErr
}

// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
if maxChunksLimit > 0 {
actual := numChunks.Add(int32(len(s.Chunks)))
Expand Down
Loading