Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for rejected queries in QFE #5356

Merged
merged 6 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
* [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338
* [FEATURE] Query Frontend: Add `cortex_queries_total` metric for total number of queries executed per user. #5360
* [FEATURE] Query Frontend: Add `cortex_discarded_queries_total` metric for throttled queries. #5356
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think cortex_failed_queries_total is a better name than cortex_discarded_queries_total based what we are doing in this PR. It doesn't make sense that a query can be "discarded".

using "failed" has the benefit that we can use the same metric name for 5xx later on -- just have different value for reason.

I would recommend to change this entry to
[FEATURE] Query Frontend: Add cortex_failed_queries_totalmetric for failed queries withreason label. #5356

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree cortex_discarded_queries_total is not a good name here. What I want to track is probably cortex_throttled_queries_total.
cortex_failed_queries_total is too general and I don't really want to track 5xx with this metric, 5xx can be of difference reasons and I feel it is more useful to track throttled queries for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yea cortex_throttled_queries_total make more sense. But "throttled" in this context is still weird because it's supposed to mean "flow control", but what we are doing here is not flow control when chunk is too big.

How about cortex_rejected_queries_total?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the name of cortex_rejected_queries_total!

* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
Expand Down
93 changes: 85 additions & 8 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

const (
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
// StatusClientClosedRequest is the status code for when a client request cancellation of a http request
StatusClientClosedRequest = 499
ServiceTimingHeaderName = "Server-Timing"
)
Expand All @@ -41,6 +41,33 @@ var (
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
)

const (
reasonRequestBodySizeExceeded = "request_body_size_exceeded"
reasonResponseBodySizeExceeded = "response_body_size_exceeded"
reasonTooManyRequests = "too_many_requests"
reasonTimeRangeExceeded = "time_range_exceeded"
reasonTooManySamples = "too_many_samples"
reasonSeriesFetched = "series_fetched"
reasonChunksFetched = "chunks_fetched"
reasonChunkBytesFetched = "chunk_bytes_fetched"
reasonDataBytesFetched = "data_bytes_fetched"
reasonSeriesLimitStoreGateway = "store_gateway_series_limit"
reasonChunksLimitStoreGateway = "store_gateway_chunks_limit"
reasonBytesLimitStoreGateway = "store_gateway_bytes_limit"

limitTooManySamples = `query processing would load too many samples into memory`
limitTimeRangeExceeded = `the query time range exceeds the limit`
limitSeriesFetched = `the query hit the max number of series limit`
limitChunksFetched = `the query hit the max number of chunks limit`
limitChunkBytesFetched = `the query hit the aggregated chunks size limit`
limitDataBytesFetched = `the query hit the aggregated data size limit`

// Store gateway limits.
limitSeriesStoreGateway = `exceeded series limit`
limitChunksStoreGateway = `exceeded chunks limit`
limitBytesStoreGateway = `exceeded bytes limit`
)

// Config for a Handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
Expand All @@ -67,6 +94,7 @@ type Handler struct {
querySeries *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

Expand Down Expand Up @@ -104,12 +132,23 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Size of all data fetched to execute a query in bytes.",
}, []string{"user"})

h.rejectedQueries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_rejected_queries_total",
Help: "The total number of queries that were rejected.",
},
[]string{"reason", "user"},
)

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.queriesCount.DeleteLabelValues(user)
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryChunkBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil {
level.Warn(log).Log("msg", "failed to remove cortex_rejected_queries_total metric for user", "user", user, "err", err)
}
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
Expand All @@ -124,6 +163,12 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
queryString url.Values
)

tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

// Initialise the stats in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
Expand All @@ -150,6 +195,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.URL.Path, "api/v1/read") {
if err := r.ParseForm(); err != nil {
writeError(w, err)
if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) {
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, userID).Inc()
}
return
}
r.Body = io.NopCloser(&buf)
Expand All @@ -168,7 +216,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if shouldReportSlowQuery {
f.reportSlowQuery(r, queryString, queryResponseTime)
}

if f.cfg.QueryStatsEnabled {
// Try to parse error and get status code.
var statusCode int
if err != nil {
statusCode = getStatusCodeFromError(err)
Expand All @@ -184,7 +234,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode, resp)
f.reportQueryStats(r, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
}

if err != nil {
Expand Down Expand Up @@ -239,12 +289,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numChunks := stats.LoadFetchedChunks()
Expand Down Expand Up @@ -311,6 +356,38 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
} else {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

var reason string
if statusCode == http.StatusTooManyRequests {
reason = reasonTooManyRequests
} else if statusCode == http.StatusRequestEntityTooLarge {
reason = reasonResponseBodySizeExceeded
} else if statusCode == http.StatusUnprocessableEntity {
errMsg := error.Error()
if strings.Contains(errMsg, limitTooManySamples) {
reason = reasonTooManySamples
} else if strings.Contains(errMsg, limitTimeRangeExceeded) {
reason = reasonTimeRangeExceeded
} else if strings.Contains(errMsg, limitSeriesFetched) {
reason = reasonSeriesFetched
} else if strings.Contains(errMsg, limitChunksFetched) {
reason = reasonChunksFetched
} else if strings.Contains(errMsg, limitChunkBytesFetched) {
reason = reasonChunkBytesFetched
} else if strings.Contains(errMsg, limitDataBytesFetched) {
reason = reasonDataBytesFetched
} else if strings.Contains(errMsg, limitSeriesStoreGateway) {
reason = reasonSeriesLimitStoreGateway
} else if strings.Contains(errMsg, limitChunksStoreGateway) {
reason = reasonChunksLimitStoreGateway
} else if strings.Contains(errMsg, limitBytesStoreGateway) {
reason = reasonBytesLimitStoreGateway
}
}
if len(reason) > 0 {
f.rejectedQueries.WithLabelValues(reason, userID).Inc()
stats.LimitHit = reason
}
}

func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
Expand Down
Loading