From a0c75987a24d0adc520c60dd5d85df4c34009548 Mon Sep 17 00:00:00 2001 From: benclive Date: Thu, 29 Aug 2024 16:29:50 +0100 Subject: [PATCH] fix: Propagate headers/warnings/stats from quantile downstreams (#13881) --- pkg/logql/accumulator.go | 47 +++++++++++++++++++++++++++++++++-- pkg/logql/accumulator_test.go | 23 ++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/pkg/logql/accumulator.go b/pkg/logql/accumulator.go index 446433f9a9144..434af93cb3c28 100644 --- a/pkg/logql/accumulator.go +++ b/pkg/logql/accumulator.go @@ -41,12 +41,19 @@ func (a *BufferedAccumulator) Result() []logqlmodel.Result { type QuantileSketchAccumulator struct { matrix ProbabilisticQuantileMatrix + + stats stats.Result // for accumulating statistics from downstream requests + headers map[string][]string // for accumulating headers from downstream requests + warnings map[string]struct{} // for accumulating warnings from downstream requests} } // newQuantileSketchAccumulator returns an accumulator for sharded // probabilistic quantile queries that merges results as they come in. func newQuantileSketchAccumulator() *QuantileSketchAccumulator { - return &QuantileSketchAccumulator{} + return &QuantileSketchAccumulator{ + headers: make(map[string][]string), + warnings: make(map[string]struct{}), + } } func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel.Result, _ int) error { @@ -57,6 +64,21 @@ func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel if !ok { return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data) } + + // TODO(owen-d/ewelch): Shard counts should be set by the querier + // so we don't have to do it in tricky ways in multiple places. + // See pkg/logql/downstream.go:DownstreamEvaluator.Downstream + // for another example. + if res.Statistics.Summary.Shards == 0 { + res.Statistics.Summary.Shards = 1 + } + a.stats.Merge(res.Statistics) + metadata.ExtendHeaders(a.headers, res.Headers) + + for _, w := range res.Warnings { + a.warnings[w] = struct{}{} + } + if a.matrix == nil { a.matrix = data return nil @@ -68,7 +90,28 @@ func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel } func (a *QuantileSketchAccumulator) Result() []logqlmodel.Result { - return []logqlmodel.Result{{Data: a.matrix}} + headers := make([]*definitions.PrometheusResponseHeader, 0, len(a.headers)) + for name, vals := range a.headers { + headers = append( + headers, + &definitions.PrometheusResponseHeader{ + Name: name, + Values: vals, + }, + ) + } + + warnings := maps.Keys(a.warnings) + sort.Strings(warnings) + + return []logqlmodel.Result{ + { + Data: a.matrix, + Headers: headers, + Warnings: warnings, + Statistics: a.stats, + }, + } } // heap impl for keeping only the top n results across m streams diff --git a/pkg/logql/accumulator_test.go b/pkg/logql/accumulator_test.go index 0975ea4789d2e..d7d379cb87a4e 100644 --- a/pkg/logql/accumulator_test.go +++ b/pkg/logql/accumulator_test.go @@ -13,6 +13,8 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/sketch" "github.com/grafana/loki/v3/pkg/logqlmodel" + "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase/definitions" ) func TestAccumulatedStreams(t *testing.T) { @@ -149,6 +151,22 @@ func TestDownstreamAccumulatorMultiMerge(t *testing.T) { } } +func TestQuantileSketchDownstreamAccumulatorSimple(t *testing.T) { + acc := newQuantileSketchAccumulator() + downstreamResult := newQuantileSketchResults()[0] + + require.Nil(t, acc.Accumulate(context.Background(), downstreamResult, 0)) + + res := acc.Result()[0] + got, ok := res.Data.(ProbabilisticQuantileMatrix) + require.Equal(t, true, ok) + require.Equal(t, 10, len(got), "correct number of vectors") + + require.Equal(t, res.Headers[0].Name, "HeaderA") + require.Equal(t, res.Warnings, []string{"warning"}) + require.Equal(t, int64(33), res.Statistics.Summary.Shards) +} + func BenchmarkAccumulator(b *testing.B) { // dummy params. Only need to populate direction & limit @@ -218,6 +236,9 @@ func newStreamResults() []logqlmodel.Result { func newQuantileSketchResults() []logqlmodel.Result { results := make([]logqlmodel.Result, 100) + statistics := stats.Result{ + Summary: stats.Summary{Shards: 33}, + } for r := range results { vectors := make([]ProbabilisticQuantileVector, 10) @@ -231,7 +252,7 @@ func newQuantileSketchResults() []logqlmodel.Result { } } } - results[r] = logqlmodel.Result{Data: ProbabilisticQuantileMatrix(vectors)} + results[r] = logqlmodel.Result{Data: ProbabilisticQuantileMatrix(vectors), Headers: []*definitions.PrometheusResponseHeader{{Name: "HeaderA", Values: []string{"ValueA"}}}, Warnings: []string{"warning"}, Statistics: statistics} } return results