diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 4cb021a50ae5..ccbfa2e8ee2d 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -160,7 +160,10 @@ func (codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http streams := make([]*logproto.Stream, len(proto.Data.Result)) for i, stream := range proto.Data.Result { - streams[i] = &stream + streams[i] = &logproto.Stream{ + Labels: stream.Labels, + Entries: stream.Entries, + } } var buf bytes.Buffer if loghttp.Version(proto.Version) == loghttp.VersionLegacy { @@ -248,7 +251,11 @@ func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, dire for key := range groups { keys = append(keys, key) } - sort.Strings(keys) + if direction == logproto.BACKWARD { + sort.Sort(sort.Reverse(sort.StringSlice(keys))) + } else { + sort.Strings(keys) + } // escape hatch, can just return all the streams if total <= int(limit) { diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 977fa4b8d342..0d0a3bf9c3c8 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -307,15 +307,6 @@ func Test_codec_MergeResponse(t *testing.T) { Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ - { - Labels: `{foo="bar", level="debug"}`, - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 16), Line: "16"}, - {Timestamp: time.Unix(0, 15), Line: "15"}, - {Timestamp: time.Unix(0, 6), Line: "6"}, - {Timestamp: time.Unix(0, 5), Line: "5"}, - }, - }, { Labels: `{foo="bar", level="error"}`, Entries: []logproto.Entry{ @@ -326,6 +317,15 @@ func Test_codec_MergeResponse(t *testing.T) { {Timestamp: time.Unix(0, 1), Line: "1"}, }, }, + { + Labels: `{foo="bar", level="debug"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 16), Line: "16"}, + {Timestamp: time.Unix(0, 15), Line: "15"}, + {Timestamp: time.Unix(0, 6), Line: "6"}, + {Timestamp: time.Unix(0, 5), Line: "5"}, + }, + }, }, }, }, @@ -395,17 +395,17 @@ func Test_codec_MergeResponse(t *testing.T) { ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ { - Labels: `{foo="bar", level="debug"}`, + Labels: `{foo="bar", level="error"}`, Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 16), Line: "16"}, - {Timestamp: time.Unix(0, 15), Line: "15"}, + {Timestamp: time.Unix(0, 10), Line: "10"}, + {Timestamp: time.Unix(0, 9), Line: "9"}, }, }, { - Labels: `{foo="bar", level="error"}`, + Labels: `{foo="bar", level="debug"}`, Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 10), Line: "10"}, - {Timestamp: time.Unix(0, 9), Line: "9"}, + {Timestamp: time.Unix(0, 16), Line: "16"}, + {Timestamp: time.Unix(0, 15), Line: "15"}, }, }, }, @@ -669,11 +669,19 @@ var ( "values":[ [ "123456789012345", "super line" ] ] + }, + { + "stream": { + "test": "test2" + }, + "values":[ + [ "123456789012346", "super line2" ] + ] } ] } }` - streamsStringLegacy = `{"streams":[{"labels":"{test=\"test\"}","entries":[{"ts":"1970-01-02T10:17:36.789012345Z","line":"super line"}]}]}` + streamsStringLegacy = `{"streams":[{"labels":"{test=\"test\"}","entries":[{"ts":"1970-01-02T10:17:36.789012345Z","line":"super line"}]},{"labels":"{test=\"test2\"}","entries":[{"ts":"1970-01-02T10:17:36.789012346Z","line":"super line2"}]}]}` logStreams = []logproto.Stream{ { Labels: `{test="test"}`, @@ -684,6 +692,15 @@ var ( }, }, }, + { + Labels: `{test="test2"}`, + Entries: []logproto.Entry{ + { + Line: "super line2", + Timestamp: time.Unix(0, 123456789012346).UTC(), + }, + }, + }, } )