-
Notifications
You must be signed in to change notification settings - Fork 811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Max Series Per Query for block storage and streaming ingesters #4179
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! I did a very quick 1st pass review and I haven't reviewed tests yet. I also suggest you to double check if passing the limiter via the context is really required (I'm not sure).
Please add a CHANGELOG entry.
|
||
// NewPerQueryLimiter makes a new per-query rate limiter. Each per-query limiter | ||
// is configured using the `maxSeriesPerQuery` and `maxChunkBytesPerQuery` limits. | ||
func NewPerQueryLimiter(maxSeriesPerQuery int, maxChunkBytesPerQuery int) *PerQueryLimiter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would completely remove maxChunkBytesPerQuery
from this PR.
"sync" | ||
) | ||
|
||
type PerQueryLimiter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] We can simplify the name and remove the Per
prefix everywhere. QueryLimiter
looks self-explanatory. WDYT?
maxSeriesPerQuery int | ||
maxChunkBytesPerQuery int | ||
} | ||
type perQueryLimiterCtxMarker struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] I would call it ...CtxKey
instead of CtxMarker
given it's a context key.
} | ||
} | ||
|
||
func NewPerQueryLimiterOnContext(ctx context.Context, maxSeriesPerQuery int, maxChunkBytesPerQuery int) context.Context { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Do we need this function at all? The logic can be achieve concatenating the AddPerQueryLimiterToContext()
and NewPerQueryLimiter()
together.
// AddFingerPrint Add a label adapter fast fingerprint to the map of unique fingerprints. If the | ||
// added series label causes us to go over the limit of maxSeriesPerQuery we will | ||
// return a validation error | ||
func (pql *PerQueryLimiter) AddFingerPrint(labelAdapter []cortexpb.LabelAdapter, matchers []*labels.Matcher) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] I would name AddSeries()
. You're not adding a fingerprint, but a series. The fingerprint thing is an internal implementation that shouldn't be exposed to the caller.
@@ -3,6 +3,8 @@ package querier | |||
import ( | |||
"context" | |||
"fmt" | |||
"github.com/cortexproject/cortex/pkg/cortexpb" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// PerQueryLimiterFromContext returns a Per Query Limiter from the current context. | ||
// IF there is Per Query Limiter on the context we will return ??? | ||
func PerQueryLimiterFromContext(ctx context.Context) *PerQueryLimiter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this function please. Directly call FromContextWithFallback()
.
} | ||
|
||
pql.uniqueSeriesMx.Lock() | ||
//Unlock after return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for such comments :)
mapL := len(pql.uniqueSeries) | ||
return mapL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mapL := len(pql.uniqueSeries) | |
return mapL | |
return len(pql.uniqueSeries) |
…ingesters and update limits.go to reflect changes Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local>
Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local>
Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local>
Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local> Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good job! I left many nits but the logic looks good. However, while suggesting updated CLI flag I've just realised we're basically introducing a breaking change. Let's discuss about this offline.
pkg/util/validation/limits.go
Outdated
@@ -130,7 +130,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { | |||
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") | |||
f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") | |||
|
|||
f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester. This limit is enforced only in the ingesters (when querying samples not flushed to the storage yet) and it's a per-instance limit. This limit is ignored when running the Cortex blocks storage.") | |||
f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester and block storage. This limit is enforced in the ingesters and block storage and it's a per-instance limit.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While writing this I just realized we're introducing a breaking change. I need to discuss it next week with Peter/Goutham, but we may need to introduce a new limit for the blocks storage to avoid introducing a breaking change in the logic.
f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from each ingester and block storage. This limit is enforced in the ingesters and block storage and it's a per-instance limit.") | |
f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series for which a query can fetch samples from ingesters and storage. When running the chunks storage, this limit is enforced only in the ingesters and it's a per-instance limit. When running the blocks storage, this limit is enforced in the querier on samples read both from ingesters and storage and it's a per-query limit.") |
pkg/util/limiter/query_limiter.go
Outdated
// AddSeries Add labels for series to the count of unique series. If the | ||
// added series label causes us to go over the limit of maxSeriesPerQuery we will | ||
// return a validation error | ||
func (ql *QueryLimiter) AddSeries(labelAdapter []cortexpb.LabelAdapter, matchers []*labels.Matcher) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would simplify this and not pass matchers
. It's OK if you do not specify the matchers in the error.
for n := 0; n < b.N; n++ { | ||
limiter := NewQueryLimiter(10000) | ||
for _, s := range series { | ||
limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s), matchers) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you want to benchmark here? If it's the AddSeries()
performances, then b.N
should be the number of series you add in the test.
pkg/distributor/distributor_test.go
Outdated
|
||
limits := &validation.Limits{} | ||
flagext.DefaultValues(limits) | ||
limits.MaxSeriesPerQuery = maxSeriesLimit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can set this, but it's ignored, because what's used is the limiter in the context.
Co-authored-by: Marco Pracucci <marco@pracucci.com> Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
…ad of ingester.max-series-per-query for series per query limit Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! I can approve once last comments are addressed, thanks!
The PR is still in Draft. Can you switch it to "Ready to review", please?
Note to other reviewers: as discussed offline with Tyler, I think passing the limiter via context is not required (we should be able to workaround it with a custom interface extending storage.Querier
) but I suggested to attempt the refactoring in a follow-up PR.
pkg/util/validation/limits.go
Outdated
// chunks from ingesters. | ||
func (o *Overrides) MaxChunksPerQueryFromIngesters(userID string) int { | ||
return o.getOverridesForUser(userID).MaxChunksPerQuery | ||
} | ||
|
||
// MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching | ||
// chunks from ingesters and block storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// chunks from ingesters and block storage | |
// chunks from ingesters and blocks storage. |
pkg/util/validation/limits.go
Outdated
@@ -144,6 +145,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { | |||
f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") | |||
f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.") | |||
f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") | |||
f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-series-per-query", 0, "The maximum number of series for which a query can fetch samples from each ingesters and block storage. When running in block storage mode this limit is enforced on the querier and counts series returned from ingesters and block storage as a per-query limit.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By convention, new CLI flags have the same exact naming of the YAML config with _
replaced with -
. So the CLI flag name should be querier.max-fetched-series-per-query
.
Remember to update CHANGELOG and doc accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The maximum number of series for which a query can fetch samples from each ingesters and block storage. When running in block storage mode this limit is enforced on the querier and counts series returned from ingesters and block storage as a per-query limit.
Some little adjustments:
The maximum number of unique series for which a query can fetch samples from each ingesters and long-term storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also update the CLI flag description of ingester.max-series-per-query
to mention to use -querier.max-fetched-series-per-query
when running the blocks storage.
docs/configuration/arguments.md
Outdated
When running Cortex chunks storage: limit enforced in the ingesters only and it's a per-instance limit. | ||
When running Cortex blocks storage: limit enforced in the queriers both on samples fetched from ingesters and store-gateways (long-term storage). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no more true. Please update the doc accordingly.
pkg/distributor/distributor_test.go
Outdated
}) | ||
defer stopAll(ds, r) | ||
|
||
// Push a number of series below the max series limit. Each series has 1 sample. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] The "Each series has 1 sample." comment is irrelevant here.
// Push a number of series below the max series limit. Each series has 1 sample. | |
// Push a number of series below the max series limit. |
pkg/distributor/query.go
Outdated
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) | ||
chunksCount = atomic.Int32{} | ||
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) | ||
matchers, _ = ingester_client.FromLabelMatchers(req.Matchers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this back to the original place. No more required to have it here.
pkg/util/limiter/query_limiter.go
Outdated
|
||
var ( | ||
qlCtxKey = &queryLimiterCtxKey{} | ||
errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks (limit: %d)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] I would further simplify the error message. The final user has no idea what a chunk is. We should just say that the query hit the max number of series limit.
errMaxSeriesHit = "The query hit the max number of series limit while fetching chunks (limit: %d)" | |
errMaxSeriesHit = "The query hit the max number of series limit (limit: %d)" |
pkg/util/limiter/query_limiter.go
Outdated
|
||
ql.uniqueSeries[client.FastFingerprint(seriesLabels)] = struct{}{} | ||
if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { | ||
// Format error with query and max limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Not much useful comment and also "with query" is not more true.
// Format error with query and max limit |
pkg/util/limiter/query_limiter.go
Outdated
return nil | ||
} | ||
|
||
// UniqueSeries returns the count of unique series seen by this query limiter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// UniqueSeries returns the count of unique series seen by this query limiter. | |
// uniqueSeriesCount returns the count of unique series seen by this query limiter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, this looks good! I've left some nit comments, but nothing blocking.
pkg/util/limiter/query_limiter.go
Outdated
// is configured using the `maxSeriesPerQuery` limit. | ||
func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter { | ||
return &QueryLimiter{ | ||
uniqueSeriesMx: sync.RWMutex{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This line is not needed. uniqueSeriesMx
will be initialized to default value, which happens to be sync.RWMutex{}
.
pkg/util/limiter/query_limiter.go
Outdated
type queryLimiterCtxKey struct{} | ||
|
||
var ( | ||
qlCtxKey = &queryLimiterCtxKey{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: contextKey
would be just fine.
pkg/util/limiter/query_limiter.go
Outdated
) | ||
|
||
type QueryLimiter struct { | ||
uniqueSeriesMx sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that Read-lock is only used from tests and not production code, I'd suggest to use sync.Mutex
, which is faster than sync.RWMutex
. (In your benchmark by about 7.5%)
pkg/util/limiter/query_limiter.go
Outdated
ql.uniqueSeriesMx.Lock() | ||
defer ql.uniqueSeriesMx.Unlock() | ||
|
||
ql.uniqueSeries[client.FastFingerprint(seriesLabels)] = struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: To minimize time during which the lock is held, it is preferable to do computations that don't need the lock before locking. In this case it's call to client.FastFingerprint
. It's unlikely to be a big deal here, but still a good practice.
I also find it little strange that we always add to the map before checking the limit. If entries in the map were larger, it could lead to unexpected memory problems. Here it's not a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also find it little strange that we always add to the map before checking the limit.
I figured since we don't know if the series has a unique fingerprint and will put us over the limit, we can't check until after we've added/not added the passed in series fingerprint
pkg/querier/querier.go
Outdated
@@ -223,6 +223,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, | |||
if err != nil { | |||
return nil, err | |||
} | |||
// Take the set tenant limits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment. :(
pkg/distributor/query.go
Outdated
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) | ||
chunksCount = atomic.Int32{} | ||
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) | ||
matchers, _ = ingester_client.FromLabelMatchers(req.Matchers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line was moved from other place, but comment associated with the line was left at original place. Plus, as Marco points out, this is only used when building error message.
Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my feedback.
ql.uniqueSeries[fingerprint] = struct{}{} | ||
if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { | ||
// Format error with max limit | ||
return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, ql.maxSeriesPerQuery)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: translation to validation.LimitError
should not be a responsibility of query limiter, but code in the querier package. We only do that so that Prometheus API layer returns correct status code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to merge as is and then having Tyler addressing this in a follow-up PR.
Signed-off-by: Tyler Reid <tyler.reid@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
ql.uniqueSeries[fingerprint] = struct{}{} | ||
if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { | ||
// Format error with max limit | ||
return validation.LimitError(fmt.Sprintf(errMaxSeriesHit, ql.maxSeriesPerQuery)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to merge as is and then having Tyler addressing this in a follow-up PR.
Signed-off-by: Marco Pracucci <marco@pracucci.com>
* Add per-user query metrics for series and bytes returned Add stats included in query responses from the querier and distributor for measuring the number of series and bytes included in successful queries. These stats are emitted per-user as summaries from the query frontends. These stats are picked to add visibility into the same resources limited as part of #4179 and #4216. Fixes #4259 Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Formatting fix Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Fix changelog to match actual changes Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Typo Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Code review changes, rename things for clarity Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Apply suggestions from code review Co-authored-by: Marco Pracucci <marco@pracucci.com> Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Code review changes, remove superfluous summaries Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> Co-authored-by: Marco Pracucci <marco@pracucci.com>
…ct#4343) * Add per-user query metrics for series and bytes returned Add stats included in query responses from the querier and distributor for measuring the number of series and bytes included in successful queries. These stats are emitted per-user as summaries from the query frontends. These stats are picked to add visibility into the same resources limited as part of cortexproject#4179 and cortexproject#4216. Fixes cortexproject#4259 Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Formatting fix Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Fix changelog to match actual changes Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Typo Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Code review changes, rename things for clarity Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Apply suggestions from code review Co-authored-by: Marco Pracucci <marco@pracucci.com> Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Code review changes, remove superfluous summaries Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> Co-authored-by: Marco Pracucci <marco@pracucci.com> Signed-off-by: Alvin Lin <alvinlin@amazon.com>
What this PR does:
This PR adds support for
max_series_per_query
option for block storage and streaming ingestersWhich issue(s) this PR fixes:
Fixes #3669
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]