diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 45d56ad58440..32f1347d0e38 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -321,6 +321,12 @@ The queryrange_config configures the query splitting and caching in the Loki que # CLI flag: -querier.split-queries-by-interval [split_queries_by_interval: | default = 0s] +# Limit queries that can be sharded. +# Queries with time range that fall between now and now minus the sharding lookback are not sharded. +# Default value is 0s (disable), meaning all queries of all time range are sharded. +# CLI flag: -frontend.min-sharding-lookback +[min_sharding_lookback: | default = 0s] + # Deprecated: Split queries by day and execute in parallel. Use -querier.split-queries-by-interval instead. # CLI flag: -querier.split-queries-by-day [split_queries_by_day: | default = false] diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index afb52c29a591..f50930bddf4c 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -13,6 +13,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/logql" ) const ( @@ -22,9 +24,11 @@ const ( // Limits extends the cortex limits interface with support for per tenant splitby parameters type Limits interface { queryrange.Limits + logql.Limits QuerySplitDuration(string) time.Duration MaxQuerySeries(string) int MaxEntriesLimitPerQuery(string) int + MinShardingLookback(string) time.Duration } type limits struct { diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index e4d5d6e09f8f..48ee330e48f4 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -3,15 +3,18 @@ package queryrange import ( "context" "fmt" + "net/http" "time" "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logql" @@ -26,7 +29,7 @@ func NewQueryShardMiddleware( minShardingLookback time.Duration, middlewareMetrics *queryrange.InstrumentMiddlewareMetrics, shardingMetrics *logql.ShardingMetrics, - limits logql.Limits, + limits Limits, ) queryrange.Middleware { noshards := !hasShards(confs) @@ -45,10 +48,15 @@ func NewQueryShardMiddleware( }) return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { - return queryrange.MergeMiddlewares( - queryrange.InstrumentMiddleware("shardingware", middlewareMetrics), - mapperware, - ).Wrap(next) + return &shardSplitter{ + limits: limits, + shardingware: queryrange.MergeMiddlewares( + queryrange.InstrumentMiddleware("shardingware", middlewareMetrics), + mapperware, + ).Wrap(next), + now: time.Now, + next: queryrange.InstrumentMiddleware("sharding-bypass", middlewareMetrics).Wrap(next), + } }) } @@ -156,15 +164,22 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra // This is used to send nonsharded requests to the ingesters in order to not overload them. // TODO(owen-d): export in cortex so we don't duplicate code type shardSplitter struct { - MinShardingLookback time.Duration // delimiter for splitting sharded vs non-sharded queries - shardingware queryrange.Handler // handler for sharded queries - next queryrange.Handler // handler for non-sharded queries - now func() time.Time // injectable time.Now + limits Limits // delimiter for splitting sharded vs non-sharded queries + shardingware queryrange.Handler // handler for sharded queries + next queryrange.Handler // handler for non-sharded queries + now func() time.Time // injectable time.Now } func (splitter *shardSplitter) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { - cutoff := splitter.now().Add(-splitter.MinShardingLookback) - + userid, err := tenant.TenantID(ctx) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + minShardingLookback := splitter.limits.MinShardingLookback(userid) + if minShardingLookback == 0 { + return splitter.shardingware.Do(ctx, r) + } + cutoff := splitter.now().Add(-minShardingLookback) // Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried). if !cutoff.After(util.TimeFromMillis(r.GetEnd())) { return splitter.next.Do(ctx, r) diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 5346dba79598..8c6db6bfb23f 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -99,21 +99,27 @@ func Test_shardSplitter(t *testing.T) { lookback: end.Sub(start) + 1, // the entire query is in the ingester range and should avoid sharding. shouldShard: false, }, + { + desc: "default", + lookback: 0, + shouldShard: true, + }, } { t.Run(tc.desc, func(t *testing.T) { var didShard bool - splitter := &shardSplitter{ shardingware: queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { didShard = true return mockHandler(lokiResps[0], nil).Do(ctx, req) }), - next: mockHandler(lokiResps[1], nil), - now: func() time.Time { return end }, - MinShardingLookback: tc.lookback, + next: mockHandler(lokiResps[1], nil), + now: func() time.Time { return end }, + limits: fakeLimits{ + minShardingLookback: tc.lookback, + }, } - resp, err := splitter.Do(context.Background(), req) + resp, err := splitter.Do(user.InjectOrgID(context.Background(), "1"), req) require.Nil(t, err) require.Equal(t, tc.shouldShard, didShard) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index a9bc1a9e58e6..4e045b2f8304 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -91,7 +91,6 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() @@ -101,7 +100,7 @@ func TestMetricsTripperware(t *testing.T) { lreq := &LokiRequest{ Query: `rate({app="foo"} |= "foo"[1m])`, Limit: 1000, - Step: 30000, //30sec + Step: 30000, // 30sec StartTs: testTime.Add(-6 * time.Hour), EndTs: testTime, Direction: logproto.FORWARD, @@ -155,7 +154,6 @@ func TestMetricsTripperware(t *testing.T) { } func TestLogFilterTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() @@ -182,7 +180,7 @@ func TestLogFilterTripperware(t *testing.T) { err = user.InjectOrgIDIntoHTTPRequest(ctx, req) require.NoError(t, err) - //testing limit + // testing limit count, h := promqlResult(streams) rt.setHandler(h) _, err = tpw(rt).RoundTrip(req) @@ -202,8 +200,45 @@ func TestLogFilterTripperware(t *testing.T) { require.Error(t, err) } -func TestSeriesTripperware(t *testing.T) { +func TestInstantQueryTripperware(t *testing.T) { + testShardingConfig := testConfig + testShardingConfig.ShardedQueries = true + tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + lreq := &LokiInstantRequest{ + Query: `sum by (job) (bytes_rate({cluster="dev-us-central-0"}[15m]))`, + Limit: 1000, + Direction: logproto.FORWARD, + Path: "/loki/api/v1/query", + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := LokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + count, h := promqlResult(vector) + rt.setHandler(h) + resp, err := tpw(rt).RoundTrip(req) + require.Equal(t, 1, *count) + require.NoError(t, err) + + lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq) + require.NoError(t, err) + require.IsType(t, &LokiPromResponse{}, lokiResponse) +} + +func TestSeriesTripperware(t *testing.T) { tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() @@ -245,7 +280,6 @@ func TestSeriesTripperware(t *testing.T) { } func TestLabelsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() @@ -495,6 +529,7 @@ type fakeLimits struct { maxEntriesLimitPerQuery int maxSeries int splits map[string]time.Duration + minShardingLookback time.Duration } func (f fakeLimits) QuerySplitDuration(key string) time.Duration { @@ -531,6 +566,10 @@ func (f fakeLimits) MaxQueryLookback(string) time.Duration { return 0 } +func (f fakeLimits) MinShardingLookback(string) time.Duration { + return f.minShardingLookback +} + func counter() (*int, http.Handler) { count := 0 var lock sync.Mutex diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 357cf0bc189a..6f6a27c1ecb4 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -57,7 +57,8 @@ type Limits struct { MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. - QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` + QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` + MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` @@ -115,6 +116,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query") f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests") + _ = l.MinShardingLookback.Set("0s") + f.Var(&l.MinShardingLookback, "frontend.min-sharding-lookback", "Limit the sharding time range.Queries with time range that fall between now and now minus the sharding lookback are not sharded. 0 to disable.") + _ = l.MaxCacheFreshness.Set("1m") f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") @@ -311,6 +315,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery } +// MinShardingLookback returns the tenant specific min sharding lookback (e.g from when we should start sharding). +func (o *Overrides) MinShardingLookback(userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).MinShardingLookback) +} + // QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend. func (o *Overrides) QuerySplitDuration(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)