From 3f3d10afccae08f27234d8c8040ab2701397af38 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 26 Jul 2021 15:14:53 +0200 Subject: [PATCH] Add min_sharding_lookback limits to the frontends (#4047) * Add min_sharding_lookback limits to the frontends Signed-off-by: Cyril Tovena * improve doc. Signed-off-by: Cyril Tovena * lint Signed-off-by: Cyril Tovena --- docs/sources/configuration/_index.md | 6 ++++ pkg/querier/queryrange/limits.go | 4 +++ pkg/querier/queryrange/querysharding.go | 37 ++++++++++++++------ pkg/querier/queryrange/querysharding_test.go | 16 ++++++--- pkg/querier/queryrange/roundtrip_test.go | 6 +++- pkg/validation/limits.go | 11 +++++- 6 files changed, 62 insertions(+), 18 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 4879abb50075..d66efe5e6d98 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -321,6 +321,12 @@ The queryrange_config configures the query splitting and caching in the Loki que # CLI flag: -querier.split-queries-by-interval [split_queries_by_interval: | default = 0s] +# Limit queries that can be sharded. +# Queries with time range that fall between now and now minus the sharding lookback are not sharded. +# Default value is 0s (disable), meaning all queries of all time range are sharded. +# CLI flag: -frontend.min-sharding-lookback +[min_sharding_lookback: | default = 0s] + # Deprecated: Split queries by day and execute in parallel. Use -querier.split-queries-by-interval instead. # CLI flag: -querier.split-queries-by-day [split_queries_by_day: | default = false] diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index afb52c29a591..f50930bddf4c 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -13,6 +13,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/logql" ) const ( @@ -22,9 +24,11 @@ const ( // Limits extends the cortex limits interface with support for per tenant splitby parameters type Limits interface { queryrange.Limits + logql.Limits QuerySplitDuration(string) time.Duration MaxQuerySeries(string) int MaxEntriesLimitPerQuery(string) int + MinShardingLookback(string) time.Duration } type limits struct { diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 7295ecb80e81..62db7be693f7 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -3,15 +3,18 @@ package queryrange import ( "context" "fmt" + "net/http" "time" "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logql" @@ -25,7 +28,7 @@ func NewQueryShardMiddleware( confs queryrange.ShardingConfigs, middlewareMetrics *queryrange.InstrumentMiddlewareMetrics, shardingMetrics *logql.ShardingMetrics, - limits logql.Limits, + limits Limits, ) queryrange.Middleware { noshards := !hasShards(confs) @@ -44,10 +47,15 @@ func NewQueryShardMiddleware( }) return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { - return queryrange.MergeMiddlewares( - queryrange.InstrumentMiddleware("shardingware", middlewareMetrics), - mapperware, - ).Wrap(next) + return &shardSplitter{ + limits: limits, + shardingware: queryrange.MergeMiddlewares( + queryrange.InstrumentMiddleware("shardingware", middlewareMetrics), + mapperware, + ).Wrap(next), + now: time.Now, + next: queryrange.InstrumentMiddleware("sharding-bypass", middlewareMetrics).Wrap(next), + } }) } @@ -173,15 +181,22 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra // This is used to send nonsharded requests to the ingesters in order to not overload them. // TODO(owen-d): export in cortex so we don't duplicate code type shardSplitter struct { - MinShardingLookback time.Duration // delimiter for splitting sharded vs non-sharded queries - shardingware queryrange.Handler // handler for sharded queries - next queryrange.Handler // handler for non-sharded queries - now func() time.Time // injectable time.Now + limits Limits // delimiter for splitting sharded vs non-sharded queries + shardingware queryrange.Handler // handler for sharded queries + next queryrange.Handler // handler for non-sharded queries + now func() time.Time // injectable time.Now } func (splitter *shardSplitter) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { - cutoff := splitter.now().Add(-splitter.MinShardingLookback) - + userid, err := tenant.TenantID(ctx) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + minShardingLookback := splitter.limits.MinShardingLookback(userid) + if minShardingLookback == 0 { + return splitter.shardingware.Do(ctx, r) + } + cutoff := splitter.now().Add(-minShardingLookback) // Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried). if !cutoff.After(util.TimeFromMillis(r.GetEnd())) { return splitter.next.Do(ctx, r) diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 72463d6c643b..7980e02f610b 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -100,21 +100,27 @@ func Test_shardSplitter(t *testing.T) { lookback: end.Sub(start) + 1, // the entire query is in the ingester range and should avoid sharding. shouldShard: false, }, + { + desc: "default", + lookback: 0, + shouldShard: true, + }, } { t.Run(tc.desc, func(t *testing.T) { var didShard bool - splitter := &shardSplitter{ shardingware: queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { didShard = true return mockHandler(lokiResps[0], nil).Do(ctx, req) }), - next: mockHandler(lokiResps[1], nil), - now: func() time.Time { return end }, - MinShardingLookback: tc.lookback, + next: mockHandler(lokiResps[1], nil), + now: func() time.Time { return end }, + limits: fakeLimits{ + minShardingLookback: tc.lookback, + }, } - resp, err := splitter.Do(context.Background(), req) + resp, err := splitter.Do(user.InjectOrgID(context.Background(), "1"), req) require.Nil(t, err) require.Equal(t, tc.shouldShard, didShard) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index c49f74de1340..8ac6796acb6f 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -219,7 +219,6 @@ func TestLogFilterTripperware(t *testing.T) { } func TestInstantQueryTripperware(t *testing.T) { - testShardingConfig := testConfig testShardingConfig.ShardedQueries = true tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil) @@ -552,6 +551,7 @@ type fakeLimits struct { maxEntriesLimitPerQuery int maxSeries int splits map[string]time.Duration + minShardingLookback time.Duration } func (f fakeLimits) QuerySplitDuration(key string) time.Duration { @@ -588,6 +588,10 @@ func (f fakeLimits) MaxQueryLookback(string) time.Duration { return 0 } +func (f fakeLimits) MinShardingLookback(string) time.Duration { + return f.minShardingLookback +} + func counter() (*int, http.Handler) { count := 0 var lock sync.Mutex diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 357cf0bc189a..6f6a27c1ecb4 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -57,7 +57,8 @@ type Limits struct { MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. - QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` + QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` + MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` @@ -115,6 +116,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query") f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests") + _ = l.MinShardingLookback.Set("0s") + f.Var(&l.MinShardingLookback, "frontend.min-sharding-lookback", "Limit the sharding time range.Queries with time range that fall between now and now minus the sharding lookback are not sharded. 0 to disable.") + _ = l.MaxCacheFreshness.Set("1m") f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") @@ -311,6 +315,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery } +// MinShardingLookback returns the tenant specific min sharding lookback (e.g from when we should start sharding). +func (o *Overrides) MinShardingLookback(userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).MinShardingLookback) +} + // QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend. func (o *Overrides) QuerySplitDuration(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)