Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time check in query priority now considers overall data select time window (including range selectors, modifiers and lookback delta) #5758

Merged
merged 14 commits into from
Feb 6, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686
* [ENHANCEMENT] Query Frontend: Log number of split queries in `query stats` log. #5703
* [ENHANCEMENT] Logging: Added new options for logging HTTP request headers: `-server.log-request-headers` enables logging HTTP request headers, `-server.log-request-headers-exclude-list` allows users to specify headers which should not be logged. #5744
* [ENHANCEMENT] Query Frontend/Scheduler: Time check in query priority now considers overall data select time window (including range selectors, modifiers and lookback delta). #5758
* [ENHANCEMENT] Querier: Added `querier.store-gateway-query-stats-enabled` to enable or disable store gateway query stats log. #5749
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
Expand Down
14 changes: 9 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5095,14 +5095,18 @@ otel:
# Regex that the query string should match. If not set, it won't be checked.
[regex: <string> | default = ""]

# Time window that the query should be within. If not set, it won't be checked.
# Overall data select time window (including range selectors, modifiers and
# lookback delta) that the query should be within. If not set, it won't be
# checked.
time_window:
# Start of the time window that the query should be within. If set to 0, it
# won't be checked.
# Start of the data select time window (including range selectors, modifiers
# and lookback delta) that the query should be within. If set to 0, it won't
# be checked.
[start: <int> | default = 0]

# End of the time window that the query should be within. If set to 0, it
# won't be checked.
# End of the data select time window (including range selectors, modifiers and
# lookback delta) that the query should be within. If set to 0, it won't be
# checked.
[end: <int> | default = 0]
```

Expand Down
12 changes: 10 additions & 2 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
numChunkBytes := stats.LoadFetchedChunkBytes()
numDataBytes := stats.LoadFetchedDataBytes()
splitQueries := stats.LoadSplitQueries()
dataSelectMaxTime := stats.LoadDataSelectMaxTime()
dataSelectMinTime := stats.LoadDataSelectMinTime()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
Expand Down Expand Up @@ -339,14 +341,20 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
logMessage = append(logMessage, "content_encoding", encoding)
}

if dataSelectMaxTime > 0 {
logMessage = append(logMessage, "data_select_max_time", util.FormatMillisToSeconds(dataSelectMaxTime))
}
if dataSelectMinTime > 0 {
logMessage = append(logMessage, "data_select_min_time", util.FormatMillisToSeconds(dataSelectMinTime))
}
if query := queryString.Get("query"); len(query) > 0 {
logMessage = append(logMessage, "query_length", len(query))
}
if ua := r.Header.Get("User-Agent"); len(ua) > 0 {
logMessage = append(logMessage, "user_agent", ua)
}
if queryPriority := r.Header.Get(util.QueryPriorityHeaderKey); len(queryPriority) > 0 {
logMessage = append(logMessage, "priority", queryPriority)
if priority, ok := stats.LoadPriority(); ok {
logMessage = append(logMessage, "priority", priority)
}
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved

if error != nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/weaveworks/common/user"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
)

type roundTripperFunc func(*http.Request) (*http.Response, error)
Expand Down Expand Up @@ -348,9 +347,20 @@ func TestReportQueryStatsFormat(t *testing.T) {
},
"should include query priority": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
header: http.Header{util.QueryPriorityHeaderKey: []string{"99"}},
queryStats: &querier_stats.QueryStats{
Priority: 99,
PriorityAssigned: true,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`,
},
"should include data fetch min and max time": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
queryStats: &querier_stats.QueryStats{
DataSelectMaxTime: 1704153600000,
DataSelectMinTime: 1704067200000,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
},
}

for testName, testData := range tests {
Expand Down
5 changes: 2 additions & 3 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
"fmt"
"net/http"
"strconv"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -100,8 +99,8 @@ type request struct {
}

func (r request) Priority() int64 {
priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*r.request, util.QueryPriorityHeaderKey), 10, 64)
if err != nil {
priority, ok := stats.FromContext(r.originalCtx).LoadPriority()
if !ok {
return 0
}

Expand Down
58 changes: 57 additions & 1 deletion pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ var ctxKey = contextKey(0)

type QueryStats struct {
Stats
m sync.Mutex
PriorityAssigned bool
Priority int64
DataSelectMaxTime int64
DataSelectMinTime int64
m sync.Mutex
}

// ContextWithEmptyStats returns a context with empty stats.
Expand Down Expand Up @@ -196,6 +200,58 @@ func (s *QueryStats) LoadSplitQueries() uint64 {
return atomic.LoadUint64(&s.SplitQueries)
}

func (s *QueryStats) SetPriority(priority int64) {
if s == nil {
return
}

if !s.PriorityAssigned {
s.PriorityAssigned = true
}

atomic.StoreInt64(&s.Priority, priority)
}

func (s *QueryStats) LoadPriority() (int64, bool) {
if s == nil {
return 0, false
}

return atomic.LoadInt64(&s.Priority), s.PriorityAssigned
}

func (s *QueryStats) SetDataSelectMaxTime(dataSelectMaxTime int64) {
if s == nil {
return
}

atomic.StoreInt64(&s.DataSelectMaxTime, dataSelectMaxTime)
}

func (s *QueryStats) LoadDataSelectMaxTime() int64 {
if s == nil {
return 0
}

return atomic.LoadInt64(&s.DataSelectMaxTime)
}

func (s *QueryStats) SetDataSelectMinTime(dataSelectMinTime int64) {
if s == nil {
return
}

atomic.StoreInt64(&s.DataSelectMinTime, dataSelectMinTime)
}

func (s *QueryStats) LoadDataSelectMinTime() int64 {
if s == nil {
return 0
}

return atomic.LoadInt64(&s.DataSelectMinTime)
}

// Merge the provided Stats into this one.
func (s *QueryStats) Merge(other *QueryStats) {
if s == nil || other == nil {
Expand Down
70 changes: 7 additions & 63 deletions pkg/querier/tripperware/priority.go
Original file line number Diff line number Diff line change
@@ -1,66 +1,16 @@
package tripperware

import (
"errors"
"net/http"
"strings"
"time"

"github.com/prometheus/prometheus/promql/parser"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
errParseExpr = errors.New("failed to parse expr")
)

func GetPriority(r *http.Request, userID string, limits Limits, now time.Time, lookbackDelta time.Duration) (int64, error) {
isQuery := strings.HasSuffix(r.URL.Path, "/query")
isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range")
queryPriority := limits.QueryPriority(userID)
query := r.FormValue("query")

if (!isQuery && !isQueryRange) || !queryPriority.Enabled || query == "" {
return 0, nil
}

expr, err := parser.ParseExpr(query)
if err != nil {
// If query fails to be parsed, we throw a simple parse error
// and fail query later on querier.
return 0, errParseExpr
}

if len(queryPriority.Priorities) == 0 {
return queryPriority.DefaultPriority, nil
}

var startTime, endTime int64
if isQuery {
if t, err := util.ParseTimeParam(r, "time", now.Unix()); err == nil {
startTime = t
endTime = t
}
} else if isQueryRange {
if st, err := util.ParseTime(r.FormValue("start")); err == nil {
if et, err := util.ParseTime(r.FormValue("end")); err == nil {
startTime = st
endTime = et
}
}
func GetPriority(query string, minTime, maxTime int64, now time.Time, queryPriority validation.QueryPriority) int64 {
if !queryPriority.Enabled || query == "" || len(queryPriority.Priorities) == 0 {
return queryPriority.DefaultPriority
}

es := &parser.EvalStmt{
Expr: expr,
Start: util.TimeFromMillis(startTime),
End: util.TimeFromMillis(endTime),
LookbackDelta: lookbackDelta,
}

minTime, maxTime := FindMinMaxTime(es)

for _, priority := range queryPriority.Priorities {
for _, attribute := range priority.QueryAttributes {
if attribute.Regex != "" && attribute.Regex != ".*" && attribute.Regex != ".+" {
Expand All @@ -70,12 +20,12 @@ func GetPriority(r *http.Request, userID string, limits Limits, now time.Time, l
}

if isWithinTimeAttributes(attribute.TimeWindow, now, minTime, maxTime) {
return priority.Priority, nil
return priority.Priority
}
}
}

return queryPriority.DefaultPriority, nil
return queryPriority.DefaultPriority
}

func isWithinTimeAttributes(timeWindow validation.TimeWindow, now time.Time, startTime, endTime int64) bool {
Expand All @@ -84,24 +34,18 @@ func isWithinTimeAttributes(timeWindow validation.TimeWindow, now time.Time, sta
}

if timeWindow.Start != 0 {
startTimeThreshold := now.Add(-1 * time.Duration(timeWindow.Start).Abs()).Truncate(time.Second).Unix()
startTimeThreshold := now.Add(-1 * time.Duration(timeWindow.Start).Abs()).Add(-1 * time.Minute).Truncate(time.Minute).UnixMilli()
if startTime < startTimeThreshold {
return false
}
}

if timeWindow.End != 0 {
endTimeThreshold := now.Add(-1 * time.Duration(timeWindow.End).Abs()).Add(1 * time.Second).Truncate(time.Second).Unix()
endTimeThreshold := now.Add(-1 * time.Duration(timeWindow.End).Abs()).Add(1 * time.Minute).Truncate(time.Minute).UnixMilli()
if endTime > endTimeThreshold {
return false
}
}

return true
}

func FindMinMaxTime(s *parser.EvalStmt) (int64, int64) {
// Placeholder until Prometheus is updated to include
// https://github.com/prometheus/prometheus/commit/9e3df532d8294d4fe3284bde7bc96db336a33552
return s.Start.Unix(), s.End.Unix()
}
Loading
Loading