Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Max Series Per Query for block storage and streaming ingesters #4179

Merged
merged 16 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses`
* [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168
* [FEATURE] Alertmanager: Added rate-limits to email notifier. Rate limits can be configured using `-alertmanager.email-notification-rate-limit` and `-alertmanager.email-notification-burst-size`. These limits are applied on individual alertmanagers. Rate-limited email notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135
* [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179
* [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149
* `cortex_alertmanager_state_fetch_replica_state_total`
* `cortex_alertmanager_state_fetch_replica_state_failed_total`
Expand Down
5 changes: 4 additions & 1 deletion docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,18 @@ Valid per-tenant limits are (with their corresponding flags for default values):
Requires `-distributor.replication-factor`, `-distributor.shard-by-all-labels`, `-distributor.sharding-strategy` and `-distributor.zone-awareness-enabled` set for the ingesters too.

- `max_series_per_query` / `-ingester.max-series-per-query`

- `max_samples_per_query` / `-ingester.max-samples-per-query`

Limits on the number of timeseries and samples returns by a single ingester during a query.

- `max_metadata_per_user` / `-ingester.max-metadata-per-user`
- `max_metadata_per_metric` / `-ingester.max-metadata-per-metric`

Enforced by the ingesters; limits the number of active metadata a user (or a given metric) can have. When running with `-distributor.shard-by-all-labels=false` (the default), this limit will enforce the maximum number of metadata a metric can have 'globally', as all metadata for a single metric will be sent to the same replication set of ingesters. This is not the case when running with `-distributor.shard-by-all-labels=true`, so the actual limit will be N/RF times higher, where N is number of ingester replicas and RF is configured replication factor.

- `max_fetched_series_per_query` / `querier.max-fetched-series-per-query`
When running Cortex with blocks storage this limit is enforced in the queriers on unique series fetched from ingesters and store-gateways (long-term storage).

- `max_global_metadata_per_user` / `-ingester.max-global-metadata-per-user`
- `max_global_metadata_per_metric` / `-ingester.max-global-metadata-per-metric`

Expand Down
9 changes: 8 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3940,7 +3940,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# The maximum number of series for which a query can fetch samples from each
# ingester. This limit is enforced only in the ingesters (when querying samples
# not flushed to the storage yet) and it's a per-instance limit. This limit is
# ignored when running the Cortex blocks storage.
# ignored when running the Cortex blocks storage. When running Cortex with
# blocks storage use -querier.max-fetched-series-per-query limit instead.
# CLI flag: -ingester.max-series-per-query
[max_series_per_query: <int> | default = 100000]

Expand Down Expand Up @@ -4012,6 +4013,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -querier.max-fetched-chunks-per-query
[max_fetched_chunks_per_query: <int> | default = 0]

# The maximum number of unique series for which a query can fetch samples from
# each ingesters and blocks storage. This limit is enforced in the querier only
# when running Cortex with blocks storage. 0 to disable
# CLI flag: -querier.max-fetched-series-per-query
[max_fetched_series_per_query: <int> | default = 0]

# Limit how long back data (series and metadata) can be queried, up until
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
# and ruler. If the requested time range is outside the allowed range, the
Expand Down
79 changes: 79 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,85 @@ func TestQuerierWithChunksStorage(t *testing.T) {
assertServiceMetricsPrefixes(t, TableManager, tableManager)
}

func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-querier.ingester-streaming": "true",
"-querier.query-store-for-labels-enabled": "true",
"-querier.max-fetched-series-per-query": "3",
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))

// Add the memcached address to the flags.
flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
storeGateway := e2ecortex.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway))

// Start the querier with configuring store-gateway addresses if sharding is disabled.
flags = mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
})

querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(querier))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3)

series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
series3, _ := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"})
series4, _ := generateSeries("series_4", series4Timestamp, prompb.Label{Name: "series_4", Value: "series_4"})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, result.Type())

res, err = c.Push(series3)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series4)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

_, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod)
require.Error(t, err)
assert.Contains(t, err.Error(), "max number of series limit")
}

func TestHashCollisionHandling(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down
53 changes: 52 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
Expand Down Expand Up @@ -945,6 +946,56 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
assert.Contains(t, err.Error(), "the query hit the max number of chunks limit")
}

func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) {
const maxSeriesLimit = 10

limits := &validation.Limits{}
flagext.DefaultValues(limits)
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit))
// Prepare distributors.
ds, _, r, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
})
defer stopAll(ds, r)

// Push a number of series below the max series limit.
initialSeries := maxSeriesLimit
writeReq := makeWriteRequest(0, initialSeries, 0)
writeRes, err := ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

allSeriesMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
}

// Since the number of series is equal to the limit (but doesn't
// exceed it), we expect a query running on all series to succeed.
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.NoError(t, err)
assert.Len(t, queryRes.Chunkseries, initialSeries)

// Push more series to exceed the limit once we'll query back all series.
pracucci marked this conversation as resolved.
Show resolved Hide resolved
writeReq = &cortexpb.WriteRequest{}
writeReq.Timeseries = append(writeReq.Timeseries,
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0),
)

writeRes, err = ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

// Since the number of series is exceeding the limit, we expect
// a query running on all series to fail.
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.Error(t, err)
assert.Contains(t, err.Error(), "max number of series limit")
}

func TestDistributor_Push_LabelRemoval(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

Expand Down Expand Up @@ -1953,7 +2004,7 @@ func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLa
Timeseries: []cortexpb.PreallocTimeseries{
{
TimeSeries: &cortexpb.TimeSeries{
//Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
// Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)),
Exemplars: []cortexpb.Exemplar{
{
Expand Down
16 changes: 14 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
grpc_util "github.com/cortexproject/cortex/pkg/util/grpc"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand Down Expand Up @@ -187,8 +188,9 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
chunksCount = atomic.Int32{}
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
chunksCount = atomic.Int32{}
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)

// Fetch samples from multiple ingesters
Expand Down Expand Up @@ -230,6 +232,16 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
}
}
for _, series := range resp.Chunkseries {
pracucci marked this conversation as resolved.
Show resolved Hide resolved
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, limitErr
}
}
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, limitErr
}
}

result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
Expand Down
10 changes: 9 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/sync/errgroup"
grpc_metadata "google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -423,7 +425,6 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
if maxChunksLimit > 0 {
leftChunksLimit -= numChunks
}

resultMtx.Unlock()

return queriedBlocks, nil
Expand Down Expand Up @@ -563,6 +564,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
queriedBlocks = []ulid.ULID(nil)
numChunks = atomic.NewInt32(0)
spanLog = spanlogger.FromContext(ctx)
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)

// Concurrently fetch series from all clients.
Expand Down Expand Up @@ -611,6 +613,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if s := resp.GetSeries(); s != nil {
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()))
if limitErr != nil {
return limitErr
}

// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
if maxChunksLimit > 0 {
actual := numChunks.Add(int32(len(s.Chunks)))
Expand Down
Loading