Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes 500 when query is outside of max_query_lookback #4902

Merged
merged 3 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* [4853](https://github.com/grafana/loki/pull/4853) **sandeepsukhani**: recreate compacted boltdb files from compactor to reduce storage space usage
* [4875](https://github.com/grafana/loki/pull/4875) **trevorwhitney**: Loki: fix bug where common replication factor wasn't always getting applied
* [4892](https://github.com/grafana/loki/pull/4892) **cristaloleg**: Loki: upgrade cristalhq/hedgedhttp from v0.6.0 to v0.7.0
* [4902](https://github.com/grafana/loki/pull/4902) **cyriltovena**: Fixes 500 when query is outside of max_query_lookback.


# 2.4.1 (2021/11/07)

Expand Down
47 changes: 47 additions & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,3 +863,50 @@ func getQueryTags(ctx context.Context) string {
v, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) // it's ok to be empty
return v
}

func NewEmptyResponse(r queryrange.Request) (queryrange.Response, error) {
switch req := r.(type) {
case *LokiSeriesRequest:
return &LokiSeriesResponse{
Status: loghttp.QueryStatusSuccess,
Version: uint32(loghttp.GetVersion(req.Path)),
}, nil
case *LokiLabelNamesRequest:
return &LokiLabelNamesResponse{
Status: loghttp.QueryStatusSuccess,
Version: uint32(loghttp.GetVersion(req.Path)),
}, nil
case *LokiInstantRequest:
// instant queries in the frontend are always metrics queries.
return &LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
},
},
}, nil
case *LokiRequest:
// range query can either be metrics or logs
expr, err := logql.ParseExpr(req.Query)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
if _, ok := expr.(logql.SampleExpr); ok {
return &LokiPromResponse{
Response: queryrange.NewEmptyPrometheusResponse(),
}, nil
}
return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: req.Direction,
Limit: req.Limit,
Version: uint32(loghttp.GetVersion(req.Path)),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
},
}, nil
default:
return nil, fmt.Errorf("unsupported request type %T", req)
}
}
68 changes: 68 additions & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

Expand Down Expand Up @@ -85,6 +90,69 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st
return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split)
}

type limitsMiddleware struct {
Limits
next queryrange.Handler
}

// NewLimitsMiddleware creates a new Middleware that enforces query limits.
func NewLimitsMiddleware(l Limits) queryrange.Middleware {
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return limitsMiddleware{
next: next,
Limits: l,
}
})
}

func (l limitsMiddleware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
log, ctx := spanlogger.New(ctx, "limits")
defer log.Finish()

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

// Clamp the time range based on the max query lookback.

if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLookback); maxQueryLookback > 0 {
minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback))

if r.GetEnd() < minStartTime {
// The request is fully outside the allowed range, so we can return an
// empty response.
level.Debug(log).Log(
"msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting",
"reqStart", util.FormatTimeMillis(r.GetStart()),
"redEnd", util.FormatTimeMillis(r.GetEnd()),
"maxQueryLookback", maxQueryLookback)

return NewEmptyResponse(r)
}

if r.GetStart() < minStartTime {
// Replace the start time in the request.
level.Debug(log).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
"original", util.FormatTimeMillis(r.GetStart()),
"updated", util.FormatTimeMillis(minStartTime))

r = r.WithStartEnd(minStartTime, r.GetEnd())
}
}

// Enforce the max query length.
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 {
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
if queryLen > maxQueryLength {
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength)
}
}

return l.next.Do(ctx, r)
}

type seriesLimiter struct {
hashes map[uint64]struct{}
rw sync.RWMutex
Expand Down
33 changes: 33 additions & 0 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,36 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
}),
).RoundTrip(r)
}

func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"} |= "foo"`,
Limit: 10000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query_range",
}

ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)

req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
}
8 changes: 4 additions & 4 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func NewLogFilterTripperware(
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByTime, splitByMetrics))
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func NewLabelsTripperware(
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
splitByMetrics *SplitByMetrics,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware,
queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics),
Expand Down Expand Up @@ -371,7 +371,7 @@ func NewMetricTripperware(
splitByMetrics *SplitByMetrics,
registerer prometheus.Registerer,
) (queryrange.Tripperware, Stopper, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand Down Expand Up @@ -462,7 +462,7 @@ func NewInstantMetricTripperware(
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}

if cfg.ShardedQueries {
queryRangeMiddleware = append(queryRangeMiddleware,
Expand Down
13 changes: 9 additions & 4 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestSeriesTripperware(t *testing.T) {
}

func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -547,7 +547,9 @@ func TestEntriesLimitWithZeroTripperware(t *testing.T) {
}

type fakeLimits struct {
maxQueryLength time.Duration
maxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splits map[string]time.Duration
Expand All @@ -561,8 +563,11 @@ func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
return f.splits[key]
}

func (fakeLimits) MaxQueryLength(string) time.Duration {
return time.Hour * 7
func (f fakeLimits) MaxQueryLength(string) time.Duration {
if f.maxQueryLength == 0 {
return time.Hour * 7
}
return f.maxQueryLength
}

func (f fakeLimits) MaxQueryParallelism(string) int {
Expand All @@ -585,7 +590,7 @@ func (f fakeLimits) MaxCacheFreshness(string) time.Duration {
}

func (f fakeLimits) MaxQueryLookback(string) time.Duration {
return 0
return f.maxQueryLookback
}

func (f fakeLimits) MinShardingLookback(string) time.Duration {
Expand Down