diff --git a/CHANGELOG.md b/CHANGELOG.md index 4423537567..7b03aa7870 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6103](https://github.com/thanos-io/thanos/pull/6103) Mixins(Rule): Fix query for long rule evaluations. - [#6121](https://github.com/thanos-io/thanos/pull/6121) Receive: Deduplicate metamonitoring queries. - [#6137](https://github.com/thanos-io/thanos/pull/6137) Downsample: Repair of non-empty XOR chunks during 1h downsampling. +- [#6125](https://github.com/thanos-io/thanos/pull/6125) Query Frontend: Fix vertical shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk` and `bottomk` functions. ### Changed diff --git a/internal/cortex/querier/queryrange/query_range.go b/internal/cortex/querier/queryrange/query_range.go index b6b414cdcd..00ad43177e 100644 --- a/internal/cortex/querier/queryrange/query_range.go +++ b/internal/cortex/querier/queryrange/query_range.go @@ -72,7 +72,7 @@ type Codec interface { // Merger is used by middlewares making multiple requests to merge back all responses into a single one. type Merger interface { // MergeResponse merges responses from multiple requests into a single Response - MergeResponse(...Response) (Response, error) + MergeResponse(Request, ...Response) (Response, error) } // Request represents a query range request that can be process by middlewares. @@ -192,7 +192,7 @@ func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse { } } -func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) { +func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response, error) { if len(responses) == 0 { return NewEmptyPrometheusResponse(), nil } diff --git a/internal/cortex/querier/queryrange/query_range_test.go b/internal/cortex/querier/queryrange/query_range_test.go index 74ba190ea3..424e45b619 100644 --- a/internal/cortex/querier/queryrange/query_range_test.go +++ b/internal/cortex/querier/queryrange/query_range_test.go @@ -653,7 +653,7 @@ func TestMergeAPIResponses(t *testing.T) { }, }} { t.Run(tc.name, func(t *testing.T) { - output, err := PrometheusCodec.MergeResponse(tc.input...) + output, err := PrometheusCodec.MergeResponse(nil, tc.input...) require.NoError(t, err) require.Equal(t, tc.expected, output) }) diff --git a/internal/cortex/querier/queryrange/results_cache.go b/internal/cortex/querier/queryrange/results_cache.go index b718a19432..5577f6b00c 100644 --- a/internal/cortex/querier/queryrange/results_cache.go +++ b/internal/cortex/querier/queryrange/results_cache.go @@ -404,7 +404,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent return nil, nil, err } if len(requests) == 0 { - response, err := s.merger.MergeResponse(responses...) + response, err := s.merger.MergeResponse(r, responses...) // No downstream requests so no need to write back to the cache. return response, nil, err } @@ -466,7 +466,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent if err != nil { return nil, nil, err } - merged, err := s.merger.MergeResponse(accumulator.Response, currentRes) + merged, err := s.merger.MergeResponse(r, accumulator.Response, currentRes) if err != nil { return nil, nil, err } @@ -478,7 +478,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent return nil, nil, err } - response, err := s.merger.MergeResponse(responses...) + response, err := s.merger.MergeResponse(r, responses...) return response, mergedExtents, err } diff --git a/internal/cortex/querier/queryrange/split_by_interval.go b/internal/cortex/querier/queryrange/split_by_interval.go index 7b97c68b3f..27b0139fc4 100644 --- a/internal/cortex/querier/queryrange/split_by_interval.go +++ b/internal/cortex/querier/queryrange/split_by_interval.go @@ -62,7 +62,7 @@ func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) { resps = append(resps, reqResp.Response) } - response, err := s.merger.MergeResponse(resps...) + response, err := s.merger.MergeResponse(r, resps...) if err != nil { return nil, err } diff --git a/internal/cortex/querier/queryrange/split_by_interval_test.go b/internal/cortex/querier/queryrange/split_by_interval_test.go index 22999c9c43..953efb7f56 100644 --- a/internal/cortex/querier/queryrange/split_by_interval_test.go +++ b/internal/cortex/querier/queryrange/split_by_interval_test.go @@ -267,7 +267,7 @@ func TestSplitQuery(t *testing.T) { } func TestSplitByDay(t *testing.T) { - mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse) + mergedResponse, err := PrometheusCodec.MergeResponse(nil, parsedResponse, parsedResponse) require.NoError(t, err) mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse) diff --git a/pkg/queryfrontend/downsampled.go b/pkg/queryfrontend/downsampled.go index 729a8760ea..fcaa46104e 100644 --- a/pkg/queryfrontend/downsampled.go +++ b/pkg/queryfrontend/downsampled.go @@ -85,7 +85,7 @@ forLoop: break forLoop } } - response, err := d.merger.MergeResponse(resps...) + response, err := d.merger.MergeResponse(req, resps...) if err != nil { return nil, err } diff --git a/pkg/queryfrontend/labels_codec.go b/pkg/queryfrontend/labels_codec.go index aeea03431e..c56d8b52fc 100644 --- a/pkg/queryfrontend/labels_codec.go +++ b/pkg/queryfrontend/labels_codec.go @@ -48,7 +48,7 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du } // MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order. -func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) { +func (c labelsCodec) MergeResponse(_ queryrange.Request, responses ...queryrange.Response) (queryrange.Response, error) { if len(responses) == 0 { // Empty response for label_names, label_values and series API. return &ThanosLabelsResponse{ diff --git a/pkg/queryfrontend/labels_codec_test.go b/pkg/queryfrontend/labels_codec_test.go index bab0b772f5..c7ff233aa5 100644 --- a/pkg/queryfrontend/labels_codec_test.go +++ b/pkg/queryfrontend/labels_codec_test.go @@ -513,7 +513,7 @@ func TestLabelsCodec_MergeResponse(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // Default partial response value doesn't matter when encoding requests. codec := NewThanosLabelsCodec(false, time.Hour*2) - r, err := codec.MergeResponse(tc.responses...) + r, err := codec.MergeResponse(nil, tc.responses...) if tc.expectedError != nil { testutil.Equals(t, err, tc.expectedError) } else { @@ -677,7 +677,7 @@ func benchmarkMergeResponses(b *testing.B, size int) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = codec.MergeResponse(queryResSeries...) + _, _ = codec.MergeResponse(nil, queryResSeries...) } }) @@ -686,7 +686,7 @@ func benchmarkMergeResponses(b *testing.B, size int) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = codec.MergeResponse(queryResLabel...) + _, _ = codec.MergeResponse(nil, queryResLabel...) } }) diff --git a/pkg/queryfrontend/queryinstant_codec.go b/pkg/queryfrontend/queryinstant_codec.go index 2f1b47088b..27d01beffb 100644 --- a/pkg/queryfrontend/queryinstant_codec.go +++ b/pkg/queryfrontend/queryinstant_codec.go @@ -19,6 +19,8 @@ import ( "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" + "github.com/prometheus/prometheus/promql/parser" + promqlparser "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/internal/cortex/cortexpb" "github.com/thanos-io/thanos/internal/cortex/querier/queryrange" cortexutil "github.com/thanos-io/thanos/internal/cortex/util" @@ -41,7 +43,7 @@ func NewThanosQueryInstantCodec(partialResponse bool) *queryInstantCodec { // MergeResponse merges multiple responses into a single response. For instant query // only vector and matrix responses will be merged because other types of queries // are not shardable like number literal, string literal, scalar, etc. -func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) { +func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...queryrange.Response) (queryrange.Response, error) { if len(responses) == 0 { return queryrange.NewEmptyPrometheusInstantQueryResponse(), nil } else if len(responses) == 1 { @@ -68,13 +70,17 @@ func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (quer }, } default: + v, err := vectorMerge(req, promResponses) + if err != nil { + return nil, err + } res = &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, Data: queryrange.PrometheusInstantQueryData{ ResultType: model.ValVector.String(), Result: queryrange.PrometheusInstantQueryResult{ Result: &queryrange.PrometheusInstantQueryResult_Vector{ - Vector: vectorMerge(promResponses), + Vector: v, }, }, Stats: queryrange.StatsMerge(responses), @@ -228,7 +234,7 @@ func (c queryInstantCodec) EncodeResponse(ctx context.Context, res queryrange.Re return &resp, nil } -func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, _ queryrange.Request) (queryrange.Response, error) { +func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) { if r.StatusCode/100 != 2 { body, _ := io.ReadAll(r.Body) return nil, httpgrpc.Errorf(r.StatusCode, string(body)) @@ -254,8 +260,13 @@ func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, return &resp, nil } -func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Vector { +func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQueryResponse) (*queryrange.Vector, error) { output := map[string]*queryrange.Sample{} + metrics := []string{} // Used to preserve the order for topk and bottomk. + sortPlan, err := sortPlanForQuery(req.GetQuery()) + if err != nil { + return nil, err + } for _, resp := range resps { if resp == nil { continue @@ -273,6 +284,7 @@ func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange metric := cortexpb.FromLabelAdaptersToLabels(sample.Labels).String() if existingSample, ok := output[metric]; !ok { output[metric] = s + metrics = append(metrics, metric) // Preserve the order of metric. } else if existingSample.GetSample().TimestampMs < s.GetSample().TimestampMs { // Choose the latest sample if we see overlap. output[metric] = s @@ -280,25 +292,108 @@ func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange } } + result := &queryrange.Vector{ + Samples: make([]*queryrange.Sample, 0, len(output)), + } + if len(output) == 0 { - return &queryrange.Vector{ - Samples: make([]*queryrange.Sample, 0), + return result, nil + } + + if sortPlan == mergeOnly { + for _, k := range metrics { + result.Samples = append(result.Samples, output[k]) } + return result, nil } - keys := make([]string, 0, len(output)) - for key := range output { - keys = append(keys, key) + type pair struct { + metric string + s *queryrange.Sample } - sort.Strings(keys) - result := &queryrange.Vector{ - Samples: make([]*queryrange.Sample, 0, len(output)), + samples := make([]*pair, 0, len(output)) + for k, v := range output { + samples = append(samples, &pair{ + metric: k, + s: v, + }) } - for _, key := range keys { - result.Samples = append(result.Samples, output[key]) + + sort.Slice(samples, func(i, j int) bool { + // Order is determined by vector + switch sortPlan { + case sortByValuesAsc: + return samples[i].s.Sample.Value < samples[j].s.Sample.Value + case sortByValuesDesc: + return samples[i].s.Sample.Value > samples[j].s.Sample.Value + } + return samples[i].metric < samples[j].metric + }) + + for _, p := range samples { + result.Samples = append(result.Samples, p.s) } - return result + return result, nil +} + +type sortPlan int + +const ( + mergeOnly sortPlan = 0 + sortByValuesAsc sortPlan = 1 + sortByValuesDesc sortPlan = 2 + sortByLabels sortPlan = 3 +) + +func sortPlanForQuery(q string) (sortPlan, error) { + expr, err := promqlparser.ParseExpr(q) + if err != nil { + return 0, err + } + // Check if the root expression is topk or bottomk + if aggr, ok := expr.(*parser.AggregateExpr); ok { + if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK { + return mergeOnly, nil + } + } + checkForSort := func(expr promqlparser.Expr) (sortAsc, sortDesc bool) { + if n, ok := expr.(*promqlparser.Call); ok { + if n.Func != nil { + if n.Func.Name == "sort" { + sortAsc = true + } + if n.Func.Name == "sort_desc" { + sortDesc = true + } + } + } + return sortAsc, sortDesc + } + // Check the root expression for sort + if sortAsc, sortDesc := checkForSort(expr); sortAsc || sortDesc { + if sortAsc { + return sortByValuesAsc, nil + } + return sortByValuesDesc, nil + } + + // If the root expression is a binary expression, check the LHS and RHS for sort + if bin, ok := expr.(*parser.BinaryExpr); ok { + if sortAsc, sortDesc := checkForSort(bin.LHS); sortAsc || sortDesc { + if sortAsc { + return sortByValuesAsc, nil + } + return sortByValuesDesc, nil + } + if sortAsc, sortDesc := checkForSort(bin.RHS); sortAsc || sortDesc { + if sortAsc { + return sortByValuesAsc, nil + } + return sortByValuesDesc, nil + } + } + return sortByLabels, nil } func matrixMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Matrix { diff --git a/pkg/queryfrontend/queryinstant_codec_test.go b/pkg/queryfrontend/queryinstant_codec_test.go index 6e9beeabed..8f67928fca 100644 --- a/pkg/queryfrontend/queryinstant_codec_test.go +++ b/pkg/queryfrontend/queryinstant_codec_test.go @@ -261,14 +261,19 @@ func TestQueryInstantCodec_EncodeRequest(t *testing.T) { func TestMergeResponse(t *testing.T) { codec := NewThanosQueryInstantCodec(false) + defaultReq := &queryrange.PrometheusRequest{ + Query: "sum(up)", + } for _, tc := range []struct { name string + req *queryrange.PrometheusRequest resps []queryrange.Response expectedResp queryrange.Response expectedErr error }{ { name: "empty response", + req: defaultReq, resps: []queryrange.Response{}, expectedResp: &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, @@ -282,6 +287,55 @@ func TestMergeResponse(t *testing.T) { }, { name: "one response", + req: defaultReq, + resps: []queryrange.Response{ + &queryrange.PrometheusInstantQueryResponse{ + Status: queryrange.StatusSuccess, + Data: queryrange.PrometheusInstantQueryData{ + ResultType: model.ValVector.String(), + Result: queryrange.PrometheusInstantQueryResult{ + Result: &queryrange.PrometheusInstantQueryResult_Vector{ + Vector: &queryrange.Vector{ + Samples: []*queryrange.Sample{ + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + })), + }, + }, + }, + }, + }, + }, + }, + }, + expectedResp: &queryrange.PrometheusInstantQueryResponse{ + Status: queryrange.StatusSuccess, + Data: queryrange.PrometheusInstantQueryData{ + ResultType: model.ValVector.String(), + Result: queryrange.PrometheusInstantQueryResult{ + Result: &queryrange.PrometheusInstantQueryResult_Vector{ + Vector: &queryrange.Vector{ + Samples: []*queryrange.Sample{ + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + })), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "merge two responses with sort", + req: &queryrange.PrometheusRequest{ + Query: "1 + sort(topk(1, up))", + }, resps: []queryrange.Response{ &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, @@ -295,6 +349,28 @@ func TestMergeResponse(t *testing.T) { Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ "__name__": "up", + "job": "foo", + })), + }, + }, + }, + }, + }, + }, + }, + &queryrange.PrometheusInstantQueryResponse{ + Status: queryrange.StatusSuccess, + Data: queryrange.PrometheusInstantQueryData{ + ResultType: model.ValVector.String(), + Result: queryrange.PrometheusInstantQueryResult{ + Result: &queryrange.PrometheusInstantQueryResult_Vector{ + Vector: &queryrange.Vector{ + Samples: []*queryrange.Sample{ + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 2}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + "job": "bar", })), }, }, @@ -316,6 +392,92 @@ func TestMergeResponse(t *testing.T) { Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ "__name__": "up", + "job": "foo", + })), + }, + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 2}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + "job": "bar", + })), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "merge two responses with topk", + req: &queryrange.PrometheusRequest{ + Query: "topk(10, sort(up)) by (job)", + }, + resps: []queryrange.Response{ + &queryrange.PrometheusInstantQueryResponse{ + Status: queryrange.StatusSuccess, + Data: queryrange.PrometheusInstantQueryData{ + ResultType: model.ValVector.String(), + Result: queryrange.PrometheusInstantQueryResult{ + Result: &queryrange.PrometheusInstantQueryResult_Vector{ + Vector: &queryrange.Vector{ + Samples: []*queryrange.Sample{ + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + "job": "foo", + })), + }, + }, + }, + }, + }, + }, + }, + &queryrange.PrometheusInstantQueryResponse{ + Status: queryrange.StatusSuccess, + Data: queryrange.PrometheusInstantQueryData{ + ResultType: model.ValVector.String(), + Result: queryrange.PrometheusInstantQueryResult{ + Result: &queryrange.PrometheusInstantQueryResult_Vector{ + Vector: &queryrange.Vector{ + Samples: []*queryrange.Sample{ + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 2}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + "job": "bar", + })), + }, + }, + }, + }, + }, + }, + }, + }, + expectedResp: &queryrange.PrometheusInstantQueryResponse{ + Status: queryrange.StatusSuccess, + Data: queryrange.PrometheusInstantQueryData{ + ResultType: model.ValVector.String(), + Result: queryrange.PrometheusInstantQueryResult{ + Result: &queryrange.PrometheusInstantQueryResult_Vector{ + Vector: &queryrange.Vector{ + Samples: []*queryrange.Sample{ + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + "job": "foo", + })), + }, + { + Sample: cortexpb.Sample{TimestampMs: 0, Value: 2}, + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ + "__name__": "up", + "job": "bar", })), }, }, @@ -327,6 +489,7 @@ func TestMergeResponse(t *testing.T) { }, { name: "merge two responses", + req: defaultReq, resps: []queryrange.Response{ &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, @@ -402,6 +565,7 @@ func TestMergeResponse(t *testing.T) { }, { name: "merge multiple responses with same label sets, won't happen if sharding is enabled on downstream querier", + req: defaultReq, resps: []queryrange.Response{ &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, @@ -470,6 +634,7 @@ func TestMergeResponse(t *testing.T) { }, { name: "responses don't contain vector, return empty vector", + req: defaultReq, resps: []queryrange.Response{ &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, @@ -516,6 +681,7 @@ func TestMergeResponse(t *testing.T) { }, { name: "merge two matrix responses with non-duplicate samples", + req: defaultReq, resps: []queryrange.Response{ &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, @@ -605,6 +771,7 @@ func TestMergeResponse(t *testing.T) { }, { name: "merge two matrix responses with duplicate samples", + req: defaultReq, resps: []queryrange.Response{ &queryrange.PrometheusInstantQueryResponse{ Status: queryrange.StatusSuccess, @@ -694,7 +861,7 @@ func TestMergeResponse(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - resp, err := codec.MergeResponse(tc.resps...) + resp, err := codec.MergeResponse(tc.req, tc.resps...) testutil.Equals(t, err, tc.expectedErr) testutil.Equals(t, resp, tc.expectedResp) }) @@ -1044,3 +1211,69 @@ func TestDecodeResponse(t *testing.T) { testutil.Equals(t, tc.expectedResponse, gotResponse) } } + +func Test_sortPlanForQuery(t *testing.T) { + tc := []struct { + query string + expectedPlan sortPlan + err bool + }{ + { + query: "invalid(10, up)", + expectedPlan: mergeOnly, + err: true, + }, + { + query: "topk(10, up)", + expectedPlan: mergeOnly, + err: false, + }, + { + query: "bottomk(10, up)", + expectedPlan: mergeOnly, + err: false, + }, + { + query: "1 + topk(10, up)", + expectedPlan: sortByLabels, + err: false, + }, + { + query: "1 + sort_desc(sum by (job) (up) )", + expectedPlan: sortByValuesDesc, + err: false, + }, + { + query: "sort(topk by (job) (10, up))", + expectedPlan: sortByValuesAsc, + err: false, + }, + { + query: "topk(5, up) by (job) + sort_desc(up)", + expectedPlan: sortByValuesDesc, + err: false, + }, + { + query: "sort(up) + topk(5, up) by (job)", + expectedPlan: sortByValuesAsc, + err: false, + }, + { + query: "sum(up) by (job)", + expectedPlan: sortByLabels, + err: false, + }, + } + + for _, tc := range tc { + t.Run(tc.query, func(t *testing.T) { + p, err := sortPlanForQuery(tc.query) + if tc.err { + testutil.NotOk(t, err) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedPlan, p) + } + }) + } +} diff --git a/pkg/queryfrontend/shard_query.go b/pkg/queryfrontend/shard_query.go index 67f7cd56f5..941a925849 100644 --- a/pkg/queryfrontend/shard_query.go +++ b/pkg/queryfrontend/shard_query.go @@ -74,7 +74,7 @@ func (s querySharder) Do(ctx context.Context, r queryrange.Request) (queryrange. resps = append(resps, reqResp.Response) } - response, err := s.merger.MergeResponse(resps...) + response, err := s.merger.MergeResponse(r, resps...) if err != nil { return nil, err } diff --git a/pkg/queryfrontend/split_by_interval.go b/pkg/queryfrontend/split_by_interval.go index 9944fcfdaa..9d79512303 100644 --- a/pkg/queryfrontend/split_by_interval.go +++ b/pkg/queryfrontend/split_by_interval.go @@ -62,7 +62,7 @@ func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryran resps = append(resps, reqResp.Response) } - response, err := s.merger.MergeResponse(resps...) + response, err := s.merger.MergeResponse(r, resps...) if err != nil { return nil, err }