diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index a8a4a7df720e..aa045ce1f977 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3889,7 +3889,8 @@ results_cache: [parallelise_shardable_queries: | default = true] # A comma-separated list of LogQL vector and range aggregations that should be -# sharded +# sharded. Possible values 'quantile_over_time', 'last_over_time', +# 'first_over_time'. # CLI flag: -querier.shard-aggregations [shard_aggregations: | default = ""] diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 0777822dbbb6..948aef03876b 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -37,26 +37,27 @@ func TestMappingEquivalence(t *testing.T) { for _, tc := range []struct { query string approximate bool + shardAgg []string }{ - {`1`, false}, - {`1 + 1`, false}, - {`{a="1"}`, false}, - {`{a="1"} |= "number: 10"`, false}, - {`rate({a=~".+"}[1s])`, false}, - {`sum by (a) (rate({a=~".+"}[1s]))`, false}, - {`sum(rate({a=~".+"}[1s]))`, false}, - {`max without (a) (rate({a=~".+"}[1s]))`, false}, - {`count(rate({a=~".+"}[1s]))`, false}, - {`avg(rate({a=~".+"}[1s]))`, true}, - {`avg(rate({a=~".+"}[1s])) by (a)`, true}, - {`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false}, - {`sum(max(rate({a=~".+"}[1s])))`, false}, - {`max(count(rate({a=~".+"}[1s])))`, false}, - {`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false}, - {`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false}, - {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, - {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true}, - {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true}, + {`1`, false, nil}, + {`1 + 1`, false, nil}, + {`{a="1"}`, false, nil}, + {`{a="1"} |= "number: 10"`, false, nil}, + {`rate({a=~".+"}[1s])`, false, nil}, + {`sum by (a) (rate({a=~".+"}[1s]))`, false, nil}, + {`sum(rate({a=~".+"}[1s]))`, false, nil}, + {`max without (a) (rate({a=~".+"}[1s]))`, false, nil}, + {`count(rate({a=~".+"}[1s]))`, false, nil}, + {`avg(rate({a=~".+"}[1s]))`, true, nil}, + {`avg(rate({a=~".+"}[1s])) by (a)`, true, nil}, + {`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false, nil}, + {`sum(max(rate({a=~".+"}[1s])))`, false, nil}, + {`max(count(rate({a=~".+"}[1s])))`, false, nil}, + {`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false, nil}, + {`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil}, + {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil}, + {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil}, + {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}}, { ` (quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1) @@ -64,11 +65,12 @@ func TestMappingEquivalence(t *testing.T) { avg by (a) (rate({a=~".+"}[1s])) `, false, + nil, }, - {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, - {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false}, - {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, - {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}}, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. @@ -102,12 +104,7 @@ func TestMappingEquivalence(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "fake") strategy := NewPowerOfTwoStrategy(ConstantShards(shards)) - mapper := NewShardMapper(strategy, nilShardMetrics, []string{}) - // TODO (callum) refactor this test so that we won't need to set every - // possible sharding config option to true when we have multiple in the future - if tc.approximate { - mapper.quantileOverTimeSharding = true - } + mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg) _, _, mapped, err := mapper.Parse(params.GetExpression()) require.NoError(t, err) diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index 67b35b809df1..e55b01504537 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -13,6 +13,8 @@ import ( ) const ( + ShardLastOverTime = "last_over_time" + ShardFirstOverTime = "first_over_time" ShardQuantileOverTime = "quantile_over_time" ) @@ -20,19 +22,30 @@ type ShardMapper struct { shards ShardingStrategy metrics *MapperMetrics quantileOverTimeSharding bool + lastOverTimeSharding bool + firstOverTimeSharding bool } func NewShardMapper(strategy ShardingStrategy, metrics *MapperMetrics, shardAggregation []string) ShardMapper { quantileOverTimeSharding := false + lastOverTimeSharding := false + firstOverTimeSharding := false for _, a := range shardAggregation { - if a == ShardQuantileOverTime { + switch a { + case ShardQuantileOverTime: quantileOverTimeSharding = true + case ShardLastOverTime: + lastOverTimeSharding = true + case ShardFirstOverTime: + firstOverTimeSharding = true } } return ShardMapper{ shards: strategy, metrics: metrics, quantileOverTimeSharding: quantileOverTimeSharding, + firstOverTimeSharding: firstOverTimeSharding, + lastOverTimeSharding: lastOverTimeSharding, } } @@ -472,6 +485,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, }, bytesPerShard, nil case syntax.OpRangeTypeFirst: + if !m.firstOverTimeSharding { + return noOp(expr, m.shards.Resolver()) + } + potentialConflict := syntax.ReducesLabels(expr) if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) { return m.mapSampleExpr(expr, r) @@ -499,6 +516,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, downstreams: downstreams, }, bytesPerShard, nil case syntax.OpRangeTypeLast: + if !m.lastOverTimeSharding { + return noOp(expr, m.shards.Resolver()) + } + potentialConflict := syntax.ReducesLabels(expr) if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) { return m.mapSampleExpr(expr, r) diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip.go b/pkg/querier/queryrange/queryrangebase/roundtrip.go index 1e0fe625f24d..d8b666f6888c 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip.go @@ -52,7 +52,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ShardAggregations = []string{} f.Var(&cfg.ShardAggregations, "querier.shard-aggregations", - "A comma-separated list of LogQL vector and range aggregations that should be sharded") + "A comma-separated list of LogQL vector and range aggregations that should be sharded. Possible values 'quantile_over_time', 'last_over_time', 'first_over_time'.") cfg.ResultsCacheConfig.RegisterFlags(f) }