Skip to content

Commit

Permalink
Add min_sharding_lookback limits to the frontends (#4047)
Browse files Browse the repository at this point in the history
* Add min_sharding_lookback limits to the frontends

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* improve doc.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jul 26, 2021
1 parent 551d682 commit 3f3d10a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 18 deletions.
6 changes: 6 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ The queryrange_config configures the query splitting and caching in the Loki que
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]
# Limit queries that can be sharded.
# Queries with time range that fall between now and now minus the sharding lookback are not sharded.
# Default value is 0s (disable), meaning all queries of all time range are sharded.
# CLI flag: -frontend.min-sharding-lookback
[min_sharding_lookback: <duration> | default = 0s]
# Deprecated: Split queries by day and execute in parallel. Use -querier.split-queries-by-interval instead.
# CLI flag: -querier.split-queries-by-day
[split_queries_by_day: <boolean> | default = false]
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logql"
)

const (
Expand All @@ -22,9 +24,11 @@ const (
// Limits extends the cortex limits interface with support for per tenant splitby parameters
type Limits interface {
queryrange.Limits
logql.Limits
QuerySplitDuration(string) time.Duration
MaxQuerySeries(string) int
MaxEntriesLimitPerQuery(string) int
MinShardingLookback(string) time.Duration
}

type limits struct {
Expand Down
37 changes: 26 additions & 11 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package queryrange
import (
"context"
"fmt"
"net/http"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
Expand All @@ -25,7 +28,7 @@ func NewQueryShardMiddleware(
confs queryrange.ShardingConfigs,
middlewareMetrics *queryrange.InstrumentMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
limits logql.Limits,
limits Limits,
) queryrange.Middleware {

noshards := !hasShards(confs)
Expand All @@ -44,10 +47,15 @@ func NewQueryShardMiddleware(
})

return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return queryrange.MergeMiddlewares(
queryrange.InstrumentMiddleware("shardingware", middlewareMetrics),
mapperware,
).Wrap(next)
return &shardSplitter{
limits: limits,
shardingware: queryrange.MergeMiddlewares(
queryrange.InstrumentMiddleware("shardingware", middlewareMetrics),
mapperware,
).Wrap(next),
now: time.Now,
next: queryrange.InstrumentMiddleware("sharding-bypass", middlewareMetrics).Wrap(next),
}
})
}

Expand Down Expand Up @@ -173,15 +181,22 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
// This is used to send nonsharded requests to the ingesters in order to not overload them.
// TODO(owen-d): export in cortex so we don't duplicate code
type shardSplitter struct {
MinShardingLookback time.Duration // delimiter for splitting sharded vs non-sharded queries
shardingware queryrange.Handler // handler for sharded queries
next queryrange.Handler // handler for non-sharded queries
now func() time.Time // injectable time.Now
limits Limits // delimiter for splitting sharded vs non-sharded queries
shardingware queryrange.Handler // handler for sharded queries
next queryrange.Handler // handler for non-sharded queries
now func() time.Time // injectable time.Now
}

func (splitter *shardSplitter) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
cutoff := splitter.now().Add(-splitter.MinShardingLookback)

userid, err := tenant.TenantID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
minShardingLookback := splitter.limits.MinShardingLookback(userid)
if minShardingLookback == 0 {
return splitter.shardingware.Do(ctx, r)
}
cutoff := splitter.now().Add(-minShardingLookback)
// Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried).
if !cutoff.After(util.TimeFromMillis(r.GetEnd())) {
return splitter.next.Do(ctx, r)
Expand Down
16 changes: 11 additions & 5 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,27 @@ func Test_shardSplitter(t *testing.T) {
lookback: end.Sub(start) + 1, // the entire query is in the ingester range and should avoid sharding.
shouldShard: false,
},
{
desc: "default",
lookback: 0,
shouldShard: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
var didShard bool

splitter := &shardSplitter{
shardingware: queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
didShard = true
return mockHandler(lokiResps[0], nil).Do(ctx, req)
}),
next: mockHandler(lokiResps[1], nil),
now: func() time.Time { return end },
MinShardingLookback: tc.lookback,
next: mockHandler(lokiResps[1], nil),
now: func() time.Time { return end },
limits: fakeLimits{
minShardingLookback: tc.lookback,
},
}

resp, err := splitter.Do(context.Background(), req)
resp, err := splitter.Do(user.InjectOrgID(context.Background(), "1"), req)
require.Nil(t, err)

require.Equal(t, tc.shouldShard, didShard)
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func TestLogFilterTripperware(t *testing.T) {
}

func TestInstantQueryTripperware(t *testing.T) {

testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil)
Expand Down Expand Up @@ -552,6 +551,7 @@ type fakeLimits struct {
maxEntriesLimitPerQuery int
maxSeries int
splits map[string]time.Duration
minShardingLookback time.Duration
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand Down Expand Up @@ -588,6 +588,10 @@ func (f fakeLimits) MaxQueryLookback(string) time.Duration {
return 0
}

func (f fakeLimits) MinShardingLookback(string) time.Duration {
return f.minShardingLookback
}

func counter() (*int, http.Handler) {
count := 0
var lock sync.Mutex
Expand Down
11 changes: 10 additions & 1 deletion pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Limits struct {
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"`

// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`

// Ruler defaults and limits.
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
Expand Down Expand Up @@ -115,6 +116,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query")
f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests")

_ = l.MinShardingLookback.Set("0s")
f.Var(&l.MinShardingLookback, "frontend.min-sharding-lookback", "Limit the sharding time range.Queries with time range that fall between now and now minus the sharding lookback are not sharded. 0 to disable.")

_ = l.MaxCacheFreshness.Set("1m")
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")

Expand Down Expand Up @@ -311,6 +315,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery
}

// MinShardingLookback returns the tenant specific min sharding lookback (e.g from when we should start sharding).
func (o *Overrides) MinShardingLookback(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MinShardingLookback)
}

// QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend.
func (o *Overrides) QuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)
Expand Down

0 comments on commit 3f3d10a

Please sign in to comment.