Skip to content

Commit

Permalink
Fixes 500 when query is outside of max_query_lookback (#4902)
Browse files Browse the repository at this point in the history
* Fixes 500 when query is outside of max_query_lookback

We were using cortex middleware to apply this limit, but when the max_query_lookback was enforced it was generating
a reponse that is not known to Loki.

And since Loki frontend support many other type of reponses (Series,Labels,...) I had to fork the code in Loki.

I added a regression tests and also enforce the limit for labels API which was for some reason omitted.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add changelog entry

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Dec 9, 2021
1 parent 96ee4a2 commit a47a76c
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* [4853](https://github.com/grafana/loki/pull/4853) **sandeepsukhani**: recreate compacted boltdb files from compactor to reduce storage space usage
* [4875](https://github.com/grafana/loki/pull/4875) **trevorwhitney**: Loki: fix bug where common replication factor wasn't always getting applied
* [4892](https://github.com/grafana/loki/pull/4892) **cristaloleg**: Loki: upgrade cristalhq/hedgedhttp from v0.6.0 to v0.7.0
* [4902](https://github.com/grafana/loki/pull/4902) **cyriltovena**: Fixes 500 when query is outside of max_query_lookback.


# 2.4.1 (2021/11/07)

Expand Down
47 changes: 47 additions & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,3 +863,50 @@ func getQueryTags(ctx context.Context) string {
v, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) // it's ok to be empty
return v
}

func NewEmptyResponse(r queryrange.Request) (queryrange.Response, error) {
switch req := r.(type) {
case *LokiSeriesRequest:
return &LokiSeriesResponse{
Status: loghttp.QueryStatusSuccess,
Version: uint32(loghttp.GetVersion(req.Path)),
}, nil
case *LokiLabelNamesRequest:
return &LokiLabelNamesResponse{
Status: loghttp.QueryStatusSuccess,
Version: uint32(loghttp.GetVersion(req.Path)),
}, nil
case *LokiInstantRequest:
// instant queries in the frontend are always metrics queries.
return &LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
},
},
}, nil
case *LokiRequest:
// range query can either be metrics or logs
expr, err := logql.ParseExpr(req.Query)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
if _, ok := expr.(logql.SampleExpr); ok {
return &LokiPromResponse{
Response: queryrange.NewEmptyPrometheusResponse(),
}, nil
}
return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: req.Direction,
Limit: req.Limit,
Version: uint32(loghttp.GetVersion(req.Path)),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
},
}, nil
default:
return nil, fmt.Errorf("unsupported request type %T", req)
}
}
68 changes: 68 additions & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

Expand Down Expand Up @@ -85,6 +90,69 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st
return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split)
}

type limitsMiddleware struct {
Limits
next queryrange.Handler
}

// NewLimitsMiddleware creates a new Middleware that enforces query limits.
func NewLimitsMiddleware(l Limits) queryrange.Middleware {
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return limitsMiddleware{
next: next,
Limits: l,
}
})
}

func (l limitsMiddleware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
log, ctx := spanlogger.New(ctx, "limits")
defer log.Finish()

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

// Clamp the time range based on the max query lookback.

if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLookback); maxQueryLookback > 0 {
minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback))

if r.GetEnd() < minStartTime {
// The request is fully outside the allowed range, so we can return an
// empty response.
level.Debug(log).Log(
"msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting",
"reqStart", util.FormatTimeMillis(r.GetStart()),
"redEnd", util.FormatTimeMillis(r.GetEnd()),
"maxQueryLookback", maxQueryLookback)

return NewEmptyResponse(r)
}

if r.GetStart() < minStartTime {
// Replace the start time in the request.
level.Debug(log).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
"original", util.FormatTimeMillis(r.GetStart()),
"updated", util.FormatTimeMillis(minStartTime))

r = r.WithStartEnd(minStartTime, r.GetEnd())
}
}

// Enforce the max query length.
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 {
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
if queryLen > maxQueryLength {
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength)
}
}

return l.next.Do(ctx, r)
}

type seriesLimiter struct {
hashes map[uint64]struct{}
rw sync.RWMutex
Expand Down
33 changes: 33 additions & 0 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,36 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
}),
).RoundTrip(r)
}

func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"} |= "foo"`,
Limit: 10000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query_range",
}

ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)

req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
}
8 changes: 4 additions & 4 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func NewLogFilterTripperware(
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByTime, splitByMetrics))
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func NewLabelsTripperware(
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
splitByMetrics *SplitByMetrics,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware,
queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics),
Expand Down Expand Up @@ -371,7 +371,7 @@ func NewMetricTripperware(
splitByMetrics *SplitByMetrics,
registerer prometheus.Registerer,
) (queryrange.Tripperware, Stopper, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand Down Expand Up @@ -462,7 +462,7 @@ func NewInstantMetricTripperware(
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}

if cfg.ShardedQueries {
queryRangeMiddleware = append(queryRangeMiddleware,
Expand Down
13 changes: 9 additions & 4 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestSeriesTripperware(t *testing.T) {
}

func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -547,7 +547,9 @@ func TestEntriesLimitWithZeroTripperware(t *testing.T) {
}

type fakeLimits struct {
maxQueryLength time.Duration
maxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splits map[string]time.Duration
Expand All @@ -561,8 +563,11 @@ func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
return f.splits[key]
}

func (fakeLimits) MaxQueryLength(string) time.Duration {
return time.Hour * 7
func (f fakeLimits) MaxQueryLength(string) time.Duration {
if f.maxQueryLength == 0 {
return time.Hour * 7
}
return f.maxQueryLength
}

func (f fakeLimits) MaxQueryParallelism(string) int {
Expand All @@ -585,7 +590,7 @@ func (f fakeLimits) MaxCacheFreshness(string) time.Duration {
}

func (f fakeLimits) MaxQueryLookback(string) time.Duration {
return 0
return f.maxQueryLookback
}

func (f fakeLimits) MinShardingLookback(string) time.Duration {
Expand Down

0 comments on commit a47a76c

Please sign in to comment.