Skip to content

Commit

Permalink
fix: Introduce feature flag for [last|first]_over_time sharding. (#13067
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jeschkies committed May 29, 2024
1 parent 670cd89 commit 6e45550
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 32 deletions.
3 changes: 2 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3889,7 +3889,8 @@ results_cache:
[parallelise_shardable_queries: <boolean> | default = true]
# A comma-separated list of LogQL vector and range aggregations that should be
# sharded
# sharded. Possible values 'quantile_over_time', 'last_over_time',
# 'first_over_time'.
# CLI flag: -querier.shard-aggregations
[shard_aggregations: <string> | default = ""]
Expand Down
55 changes: 26 additions & 29 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,40 @@ func TestMappingEquivalence(t *testing.T) {
for _, tc := range []struct {
query string
approximate bool
shardAgg []string
}{
{`1`, false},
{`1 + 1`, false},
{`{a="1"}`, false},
{`{a="1"} |= "number: 10"`, false},
{`rate({a=~".+"}[1s])`, false},
{`sum by (a) (rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"}[1s]))`, false},
{`max without (a) (rate({a=~".+"}[1s]))`, false},
{`count(rate({a=~".+"}[1s]))`, false},
{`avg(rate({a=~".+"}[1s]))`, true},
{`avg(rate({a=~".+"}[1s])) by (a)`, true},
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false},
{`sum(max(rate({a=~".+"}[1s])))`, false},
{`max(count(rate({a=~".+"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true},
{`1`, false, nil},
{`1 + 1`, false, nil},
{`{a="1"}`, false, nil},
{`{a="1"} |= "number: 10"`, false, nil},
{`rate({a=~".+"}[1s])`, false, nil},
{`sum by (a) (rate({a=~".+"}[1s]))`, false, nil},
{`sum(rate({a=~".+"}[1s]))`, false, nil},
{`max without (a) (rate({a=~".+"}[1s]))`, false, nil},
{`count(rate({a=~".+"}[1s]))`, false, nil},
{`avg(rate({a=~".+"}[1s]))`, true, nil},
{`avg(rate({a=~".+"}[1s])) by (a)`, true, nil},
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false, nil},
{`sum(max(rate({a=~".+"}[1s])))`, false, nil},
{`max(count(rate({a=~".+"}[1s])))`, false, nil},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false, nil},
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{
`
(quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1)
and
avg by (a) (rate({a=~".+"}[1s]))
`,
false,
nil,
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down Expand Up @@ -102,12 +104,7 @@ func TestMappingEquivalence(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
mapper := NewShardMapper(strategy, nilShardMetrics, []string{})
// TODO (callum) refactor this test so that we won't need to set every
// possible sharding config option to true when we have multiple in the future
if tc.approximate {
mapper.quantileOverTimeSharding = true
}
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

Expand Down
23 changes: 22 additions & 1 deletion pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,39 @@ import (
)

const (
ShardLastOverTime = "last_over_time"
ShardFirstOverTime = "first_over_time"
ShardQuantileOverTime = "quantile_over_time"
)

type ShardMapper struct {
shards ShardingStrategy
metrics *MapperMetrics
quantileOverTimeSharding bool
lastOverTimeSharding bool
firstOverTimeSharding bool
}

func NewShardMapper(strategy ShardingStrategy, metrics *MapperMetrics, shardAggregation []string) ShardMapper {
quantileOverTimeSharding := false
lastOverTimeSharding := false
firstOverTimeSharding := false
for _, a := range shardAggregation {
if a == ShardQuantileOverTime {
switch a {
case ShardQuantileOverTime:
quantileOverTimeSharding = true
case ShardLastOverTime:
lastOverTimeSharding = true
case ShardFirstOverTime:
firstOverTimeSharding = true
}
}
return ShardMapper{
shards: strategy,
metrics: metrics,
quantileOverTimeSharding: quantileOverTimeSharding,
firstOverTimeSharding: firstOverTimeSharding,
lastOverTimeSharding: lastOverTimeSharding,
}
}

Expand Down Expand Up @@ -472,6 +485,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
}, bytesPerShard, nil

case syntax.OpRangeTypeFirst:
if !m.firstOverTimeSharding {
return noOp(expr, m.shards.Resolver())
}

potentialConflict := syntax.ReducesLabels(expr)
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
return m.mapSampleExpr(expr, r)
Expand Down Expand Up @@ -499,6 +516,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
downstreams: downstreams,
}, bytesPerShard, nil
case syntax.OpRangeTypeLast:
if !m.lastOverTimeSharding {
return noOp(expr, m.shards.Resolver())
}

potentialConflict := syntax.ReducesLabels(expr)
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
return m.mapSampleExpr(expr, r)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/queryrangebase/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

cfg.ShardAggregations = []string{}
f.Var(&cfg.ShardAggregations, "querier.shard-aggregations",
"A comma-separated list of LogQL vector and range aggregations that should be sharded")
"A comma-separated list of LogQL vector and range aggregations that should be sharded. Possible values 'quantile_over_time', 'last_over_time', 'first_over_time'.")

cfg.ResultsCacheConfig.RegisterFlags(f)
}
Expand Down

0 comments on commit 6e45550

Please sign in to comment.