diff --git a/CHANGELOG.md b/CHANGELOG.md index aef31a10c837..b207721281c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ * [4853](https://github.com/grafana/loki/pull/4853) **sandeepsukhani**: recreate compacted boltdb files from compactor to reduce storage space usage * [4875](https://github.com/grafana/loki/pull/4875) **trevorwhitney**: Loki: fix bug where common replication factor wasn't always getting applied * [4892](https://github.com/grafana/loki/pull/4892) **cristaloleg**: Loki: upgrade cristalhq/hedgedhttp from v0.6.0 to v0.7.0 +* [4902](https://github.com/grafana/loki/pull/4902) **cyriltovena**: Fixes 500 when query is outside of max_query_lookback. + # 2.4.1 (2021/11/07) diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 6078c5a9edb3..7411dea28d4d 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -863,3 +863,50 @@ func getQueryTags(ctx context.Context) string { v, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) // it's ok to be empty return v } + +func NewEmptyResponse(r queryrange.Request) (queryrange.Response, error) { + switch req := r.(type) { + case *LokiSeriesRequest: + return &LokiSeriesResponse{ + Status: loghttp.QueryStatusSuccess, + Version: uint32(loghttp.GetVersion(req.Path)), + }, nil + case *LokiLabelNamesRequest: + return &LokiLabelNamesResponse{ + Status: loghttp.QueryStatusSuccess, + Version: uint32(loghttp.GetVersion(req.Path)), + }, nil + case *LokiInstantRequest: + // instant queries in the frontend are always metrics queries. + return &LokiPromResponse{ + Response: &queryrange.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrange.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + }, + }, + }, nil + case *LokiRequest: + // range query can either be metrics or logs + expr, err := logql.ParseExpr(req.Query) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + if _, ok := expr.(logql.SampleExpr); ok { + return &LokiPromResponse{ + Response: queryrange.NewEmptyPrometheusResponse(), + }, nil + } + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: req.Direction, + Limit: req.Limit, + Version: uint32(loghttp.GetVersion(req.Path)), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + }, + }, nil + default: + return nil, fmt.Errorf("unsupported request type %T", req) + } +} diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index a28ef07d103e..591b2ffb3cf6 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -10,7 +10,12 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" + "github.com/prometheus/prometheus/model/timestamp" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -85,6 +90,69 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split) } +type limitsMiddleware struct { + Limits + next queryrange.Handler +} + +// NewLimitsMiddleware creates a new Middleware that enforces query limits. +func NewLimitsMiddleware(l Limits) queryrange.Middleware { + return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { + return limitsMiddleware{ + next: next, + Limits: l, + } + }) +} + +func (l limitsMiddleware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { + log, ctx := spanlogger.New(ctx, "limits") + defer log.Finish() + + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + // Clamp the time range based on the max query lookback. + + if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLookback); maxQueryLookback > 0 { + minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback)) + + if r.GetEnd() < minStartTime { + // The request is fully outside the allowed range, so we can return an + // empty response. + level.Debug(log).Log( + "msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting", + "reqStart", util.FormatTimeMillis(r.GetStart()), + "redEnd", util.FormatTimeMillis(r.GetEnd()), + "maxQueryLookback", maxQueryLookback) + + return NewEmptyResponse(r) + } + + if r.GetStart() < minStartTime { + // Replace the start time in the request. + level.Debug(log).Log( + "msg", "the start time of the query has been manipulated because of the 'max query lookback' setting", + "original", util.FormatTimeMillis(r.GetStart()), + "updated", util.FormatTimeMillis(minStartTime)) + + r = r.WithStartEnd(minStartTime, r.GetEnd()) + } + } + + // Enforce the max query length. + if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 { + queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart())) + if queryLen > maxQueryLength { + return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength) + } + } + + return l.next.Do(ctx, r) +} + type seriesLimiter struct { hashes map[uint64]struct{} rw sync.RWMutex diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index a2cb6b229c5b..ede339bb8c4a 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -206,3 +206,36 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) { }), ).RoundTrip(r) } + +func Test_MaxQueryLookBack(t *testing.T) { + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{ + maxQueryLookback: 1 * time.Hour, + }, chunk.SchemaConfig{}, 0, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + lreq := &LokiRequest{ + Query: `{app="foo"} |= "foo"`, + Limit: 10000, + StartTs: testTime.Add(-6 * time.Hour), + EndTs: testTime, + Direction: logproto.FORWARD, + Path: "/loki/api/v1/query_range", + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := LokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + _, err = tpw(rt).RoundTrip(req) + require.NoError(t, err) +} diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index b47817332ffc..1b5f7a0ef8b3 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -245,7 +245,7 @@ func NewLogFilterTripperware( shardingMetrics *logql.ShardingMetrics, splitByMetrics *SplitByMetrics, ) (queryrange.Tripperware, error) { - queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)} + queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)} if cfg.SplitQueriesByInterval != 0 { queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByTime, splitByMetrics)) } @@ -334,7 +334,7 @@ func NewLabelsTripperware( retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics, splitByMetrics *SplitByMetrics, ) (queryrange.Tripperware, error) { - queryRangeMiddleware := []queryrange.Middleware{} + queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)} if cfg.SplitQueriesByInterval != 0 { queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), @@ -371,7 +371,7 @@ func NewMetricTripperware( splitByMetrics *SplitByMetrics, registerer prometheus.Registerer, ) (queryrange.Tripperware, Stopper, error) { - queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)} + queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)} if cfg.AlignQueriesWithStep { queryRangeMiddleware = append( queryRangeMiddleware, @@ -462,7 +462,7 @@ func NewInstantMetricTripperware( shardingMetrics *logql.ShardingMetrics, splitByMetrics *SplitByMetrics, ) (queryrange.Tripperware, error) { - queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)} + queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)} if cfg.ShardedQueries { queryRangeMiddleware = append(queryRangeMiddleware, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index b4c723f4c74a..98f4a4623958 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -298,7 +298,7 @@ func TestSeriesTripperware(t *testing.T) { } func TestLabelsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() } @@ -547,7 +547,9 @@ func TestEntriesLimitWithZeroTripperware(t *testing.T) { } type fakeLimits struct { + maxQueryLength time.Duration maxQueryParallelism int + maxQueryLookback time.Duration maxEntriesLimitPerQuery int maxSeries int splits map[string]time.Duration @@ -561,8 +563,11 @@ func (f fakeLimits) QuerySplitDuration(key string) time.Duration { return f.splits[key] } -func (fakeLimits) MaxQueryLength(string) time.Duration { - return time.Hour * 7 +func (f fakeLimits) MaxQueryLength(string) time.Duration { + if f.maxQueryLength == 0 { + return time.Hour * 7 + } + return f.maxQueryLength } func (f fakeLimits) MaxQueryParallelism(string) int { @@ -585,7 +590,7 @@ func (f fakeLimits) MaxCacheFreshness(string) time.Duration { } func (f fakeLimits) MaxQueryLookback(string) time.Duration { - return 0 + return f.maxQueryLookback } func (f fakeLimits) MinShardingLookback(string) time.Duration {