diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 851c2ea718a5..5ee72a4addf4 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -48,6 +48,18 @@ func Test_codec_DecodeRequest(t *testing.T) { StartTs: start, EndTs: end, }, false}, + {"ok", func() (*http.Request, error) { + return http.NewRequest(http.MethodGet, + fmt.Sprintf(`/query_range?start=%d&end=%d&query={foo="bar"}&step=86400&limit=200&direction=FORWARD`, start.UnixNano(), end.UnixNano()), nil) + }, &LokiRequest{ + Query: `{foo="bar"}`, + Limit: 200, + Step: 86400000, // step is expected in ms. + Direction: logproto.FORWARD, + Path: "/query_range", + StartTs: start, + EndTs: end, + }, false}, {"series", func() (*http.Request, error) { return http.NewRequest(http.MethodGet, fmt.Sprintf(`/series?start=%d&end=%d&match={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil) @@ -95,7 +107,8 @@ func Test_codec_DecodeResponse(t *testing.T) { {"bad json", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(""))}, nil, nil, true}, {"not success", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(`{"status":"fail"}`))}, nil, nil, true}, {"unknown", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(`{"status":"success"}`))}, nil, nil, true}, - {"matrix", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(matrixString))}, nil, + { + "matrix", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(matrixString))}, nil, &LokiPromResponse{ Response: &queryrange.PrometheusResponse{ Status: loghttp.QueryStatusSuccess, @@ -105,8 +118,10 @@ func Test_codec_DecodeResponse(t *testing.T) { }, }, Statistics: statsResult, - }, false}, - {"streams v1", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))}, + }, false, + }, + { + "streams v1", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))}, &LokiRequest{Direction: logproto.FORWARD, Limit: 100, Path: "/loki/api/v1/query_range"}, &LokiResponse{ Status: loghttp.QueryStatusSuccess, @@ -118,8 +133,10 @@ func Test_codec_DecodeResponse(t *testing.T) { Result: logStreams, }, Statistics: statsResult, - }, false}, - {"streams legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))}, + }, false, + }, + { + "streams legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))}, &LokiRequest{Direction: logproto.FORWARD, Limit: 100, Path: "/api/prom/query_range"}, &LokiResponse{ Status: loghttp.QueryStatusSuccess, @@ -131,21 +148,26 @@ func Test_codec_DecodeResponse(t *testing.T) { Result: logStreams, }, Statistics: statsResult, - }, false}, - {"series", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(seriesString))}, + }, false, + }, + { + "series", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(seriesString))}, &LokiSeriesRequest{Path: "/loki/api/v1/series"}, &LokiSeriesResponse{ Status: "success", Version: uint32(loghttp.VersionV1), Data: seriesData, - }, false}, - {"labels legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(labelsString))}, + }, false, + }, + { + "labels legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(labelsString))}, &LokiLabelNamesRequest{Path: "/api/prom/label"}, &LokiLabelNamesResponse{ Status: "success", Version: uint32(loghttp.VersionLegacy), Data: labelsData, - }, false}, + }, false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -160,7 +182,6 @@ func Test_codec_DecodeResponse(t *testing.T) { } func Test_codec_EncodeRequest(t *testing.T) { - // we only accept LokiRequest. got, err := lokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{}) require.Error(t, err) @@ -170,7 +191,7 @@ func Test_codec_EncodeRequest(t *testing.T) { toEncode := &LokiRequest{ Query: `{foo="bar"}`, Limit: 200, - Step: 1010, + Step: 86400000, Direction: logproto.FORWARD, Path: "/query_range", StartTs: start, @@ -185,7 +206,7 @@ func Test_codec_EncodeRequest(t *testing.T) { require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query")) require.Equal(t, fmt.Sprintf("%d", 200), got.URL.Query().Get("limit")) require.Equal(t, `FORWARD`, got.URL.Query().Get("direction")) - require.Equal(t, "1.010000", got.URL.Query().Get("step")) + require.Equal(t, "86400.000000", got.URL.Query().Get("step")) // testing a full roundtrip req, err := lokiCodec.DecodeRequest(context.TODO(), got) @@ -229,7 +250,6 @@ func Test_codec_series_EncodeRequest(t *testing.T) { } func Test_codec_labels_EncodeRequest(t *testing.T) { - ctx := context.Background() toEncode := &LokiLabelNamesRequest{ Path: "/loki/api/v1/labels", @@ -252,7 +272,6 @@ func Test_codec_labels_EncodeRequest(t *testing.T) { } func Test_codec_EncodeResponse(t *testing.T) { - tests := []struct { name string res queryrange.Response @@ -270,7 +289,8 @@ func Test_codec_EncodeResponse(t *testing.T) { }, Statistics: statsResult, }, matrixString, false}, - {"loki v1", + { + "loki v1", &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.FORWARD, @@ -281,8 +301,10 @@ func Test_codec_EncodeResponse(t *testing.T) { Result: logStreams, }, Statistics: statsResult, - }, streamsString, false}, - {"loki legacy", + }, streamsString, false, + }, + { + "loki legacy", &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.FORWARD, @@ -293,25 +315,32 @@ func Test_codec_EncodeResponse(t *testing.T) { Result: logStreams, }, Statistics: statsResult, - }, streamsStringLegacy, false}, - {"loki series", + }, streamsStringLegacy, false, + }, + { + "loki series", &LokiSeriesResponse{ Status: "success", Version: uint32(loghttp.VersionV1), Data: seriesData, - }, seriesString, false}, - {"loki labels", + }, seriesString, false, + }, + { + "loki labels", &LokiLabelNamesResponse{ Status: "success", Version: uint32(loghttp.VersionV1), Data: labelsData, - }, labelsString, false}, - {"loki labels legacy", + }, labelsString, false, + }, + { + "loki labels legacy", &LokiLabelNamesResponse{ Status: "success", Version: uint32(loghttp.VersionLegacy), Data: labelsData, - }, labelsLegacyString, false}, + }, labelsLegacyString, false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -340,17 +369,19 @@ func Test_codec_MergeResponse(t *testing.T) { }{ {"empty", []queryrange.Response{}, nil, true}, {"unknown response", []queryrange.Response{&badResponse{}}, nil, true}, - {"prom", []queryrange.Response{ - &LokiPromResponse{ - Response: &queryrange.PrometheusResponse{ - Status: loghttp.QueryStatusSuccess, - Data: queryrange.PrometheusData{ - ResultType: loghttp.ResultTypeMatrix, - Result: sampleStreams, + { + "prom", + []queryrange.Response{ + &LokiPromResponse{ + Response: &queryrange.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrange.PrometheusData{ + ResultType: loghttp.ResultTypeMatrix, + Result: sampleStreams, + }, }, }, }, - }, &LokiPromResponse{ Response: &queryrange.PrometheusResponse{ Status: loghttp.QueryStatusSuccess, @@ -1032,7 +1063,6 @@ func BenchmarkResponseMerge(b *testing.B) { } }) } - } func mkResps(nResps, nStreams, nLogs int, direction logproto.Direction) (resps []*LokiResponse) { diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index e04602b2e06b..aa0b709f75fb 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -60,7 +60,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue req := ParamsToLokiRequest(qry.Params).WithShards(qry.Shards).WithQuery(qry.Expr.String()).(*LokiRequest) logger, ctx := spanlogger.New(ctx, "DownstreamHandler.instance") defer logger.Finish() - level.Debug(logger).Log("shards", fmt.Sprintf("%+v", req.Shards), "query", req.Query) + level.Debug(logger).Log("shards", fmt.Sprintf("%+v", req.Shards), "query", req.Query, "step", req.GetStep()) res, err := in.handler.Do(ctx, req) if err != nil { diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index ceaa1146f5eb..d7dd478697a0 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -62,7 +62,7 @@ func Test_seriesLimiter(t *testing.T) { cfg := testConfig cfg.SplitQueriesByInterval = time.Hour cfg.CacheResults = false - // split in 6 with 4 in // max. + // split in 7 with 2 in // max. tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { defer stopper.Stop() @@ -96,7 +96,7 @@ func Test_seriesLimiter(t *testing.T) { _, err = tpw(rt).RoundTrip(req) require.NoError(t, err) - require.Equal(t, 6, *count) + require.Equal(t, 7, *count) // 2 series should not be allowed. c := new(int) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 150ba240520b..c23526758e81 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -224,7 +224,7 @@ func NewLogFilterTripperware( ) (queryrange.Tripperware, error) { queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)} if cfg.SplitQueriesByInterval != 0 { - queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByMetrics)) + queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByTime, splitByMetrics)) } if cfg.ShardedQueries { @@ -270,7 +270,7 @@ func NewSeriesTripperware( queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), // The Series API needs to pull one chunk per series to extract the label set, which is much cheaper than iterating through all matching chunks. - SplitByIntervalMiddleware(WithSplitByLimits(limits, cfg.SplitQueriesByInterval), codec, splitByMetrics), + SplitByIntervalMiddleware(WithSplitByLimits(limits, cfg.SplitQueriesByInterval), codec, splitByTime, splitByMetrics), ) } if cfg.MaxRetries > 0 { @@ -301,7 +301,7 @@ func NewLabelsTripperware( queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), // Force a 24 hours split by for labels API, this will be more efficient with our static daily bucket storage. // This is because the labels API is an index-only operation. - SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByMetrics), + SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, splitByMetrics), ) } if cfg.MaxRetries > 0 { @@ -348,7 +348,7 @@ func NewMetricTripperware( queryRangeMiddleware = append( queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), - SplitByIntervalMiddleware(limits, codec, splitByMetrics), + SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), ) var c cache.Cache diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 975ea79d5578..30bfe03e8a5e 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -42,20 +42,24 @@ func NewSplitByMetrics(r prometheus.Registerer) *SplitByMetrics { } type splitByInterval struct { - next queryrange.Handler - limits Limits - merger queryrange.Merger - metrics *SplitByMetrics + next queryrange.Handler + limits Limits + merger queryrange.Merger + metrics *SplitByMetrics + splitter Splitter } +type Splitter func(req queryrange.Request, interval time.Duration) []queryrange.Request + // SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval. -func SplitByIntervalMiddleware(limits Limits, merger queryrange.Merger, metrics *SplitByMetrics) queryrange.Middleware { +func SplitByIntervalMiddleware(limits Limits, merger queryrange.Merger, splitter Splitter, metrics *SplitByMetrics) queryrange.Middleware { return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { return &splitByInterval{ - next: next, - limits: limits, - merger: merger, - metrics: metrics, + next: next, + limits: limits, + merger: merger, + metrics: metrics, + splitter: splitter, } }) } @@ -131,14 +135,12 @@ func (h *splitByInterval) Process( } } - } return responses, nil } func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next queryrange.Handler) { - for data := range ch { sp, ctx := opentracing.StartSpanFromContext(ctx, "interval") @@ -157,7 +159,6 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next } func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { - userid, err := user.ExtractOrgID(ctx) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) @@ -169,7 +170,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra return h.next.Do(ctx, r) } - intervals := splitByTime(r, interval) + intervals := h.splitter(r, interval) h.metrics.splits.Observe(float64(len(intervals))) // no interval should not be processed by the frontend. @@ -179,7 +180,6 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra if sp := opentracing.SpanFromContext(ctx); sp != nil { sp.LogFields(otlog.Int("n_intervals", len(intervals))) - } var limit int64 @@ -250,7 +250,6 @@ func splitByTime(req queryrange.Request, interval time.Duration) []queryrange.Re return nil } return reqs - } func forInterval(interval time.Duration, start, end time.Time, callback func(start, end time.Time)) { @@ -262,3 +261,37 @@ func forInterval(interval time.Duration, start, end time.Time, callback func(sta callback(start, newEnd) } } + +func splitMetricByTime(r queryrange.Request, interval time.Duration) []queryrange.Request { + var reqs []queryrange.Request + lokiReq := r.(*LokiRequest) + for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { + end := nextIntervalBoundary(start, r.GetStep(), interval) + if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs { + end = lokiReq.EndTs + } + reqs = append(reqs, &LokiRequest{ + Query: lokiReq.Query, + Limit: lokiReq.Limit, + Step: lokiReq.Step, + Direction: lokiReq.Direction, + Path: lokiReq.Path, + StartTs: start, + EndTs: end, + }) + } + return reqs +} + +// Round up to the step before the next interval boundary. +func nextIntervalBoundary(t time.Time, step int64, interval time.Duration) time.Time { + stepNs := step * 1e6 + nsPerInterval := interval.Nanoseconds() + startOfNextInterval := ((t.UnixNano() / nsPerInterval) + 1) * nsPerInterval + // ensure that target is a multiple of steps away from the start time + target := startOfNextInterval - ((startOfNextInterval - t.UnixNano()) % stepNs) + if target == startOfNextInterval { + target -= stepNs + } + return time.Unix(0, target) +} diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 3ffbfc888c65..07a3a2dbe4cc 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime" + "strconv" "sync" "testing" "time" @@ -19,7 +20,6 @@ import ( var nilMetrics = NewSplitByMetrics(nil) func Test_splitQuery(t *testing.T) { - tests := []struct { name string req queryrange.Request @@ -102,6 +102,173 @@ func Test_splitQuery(t *testing.T) { } } +func Test_splitMetricQuery(t *testing.T) { + const seconds = 1e3 // 1e3 milliseconds per second. + + for i, tc := range []struct { + input queryrange.Request + expected []queryrange.Request + interval time.Duration + }{ + // the step is lower than the interval therefore we should split only once. + { + input: &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(0, 60*time.Minute.Nanoseconds()), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(0, 60*time.Minute.Nanoseconds()), + Step: 15 * seconds, + }, + }, + interval: 24 * time.Hour, + }, + { + input: &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(60*60, 0), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(60*60, 0), + Step: 15 * seconds, + }, + }, + interval: 3 * time.Hour, + }, + { + input: &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(24*3600, 0), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(24*3600, 0), + Step: 15 * seconds, + }, + }, + interval: 24 * time.Hour, + }, + { + input: &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(3*3600, 0), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(3*3600, 0), + Step: 15 * seconds, + }, + }, + interval: 3 * time.Hour, + }, + { + input: &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(2*24*3600, 0), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix((24*3600)-15, 0), + Step: 15 * seconds, + }, + &LokiRequest{ + StartTs: time.Unix((24 * 3600), 0), + EndTs: time.Unix((2 * 24 * 3600), 0), + Step: 15 * seconds, + }, + }, + interval: 24 * time.Hour, + }, + { + input: &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(2*3*3600, 0), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix((3*3600)-15, 0), + Step: 15 * seconds, + }, + &LokiRequest{ + StartTs: time.Unix((3 * 3600), 0), + EndTs: time.Unix((2 * 3 * 3600), 0), + Step: 15 * seconds, + }, + }, + interval: 3 * time.Hour, + }, + { + input: &LokiRequest{ + StartTs: time.Unix(3*3600, 0), + EndTs: time.Unix(3*24*3600, 0), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(3*3600, 0), + EndTs: time.Unix((24*3600)-15, 0), + Step: 15 * seconds, + }, + &LokiRequest{ + StartTs: time.Unix(24*3600, 0), + EndTs: time.Unix((2*24*3600)-15, 0), + Step: 15 * seconds, + }, + &LokiRequest{ + StartTs: time.Unix(2*24*3600, 0), + EndTs: time.Unix(3*24*3600, 0), + Step: 15 * seconds, + }, + }, + interval: 24 * time.Hour, + }, + { + input: &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix(3*3*3600, 0), + Step: 15 * seconds, + }, + expected: []queryrange.Request{ + &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix((3*3600)-15, 0), + Step: 15 * seconds, + }, + &LokiRequest{ + StartTs: time.Unix(3*3600, 0), + EndTs: time.Unix((2*3*3600)-15, 0), + Step: 15 * seconds, + }, + &LokiRequest{ + StartTs: time.Unix(2*3*3600, 0), + EndTs: time.Unix(3*3*3600, 0), + Step: 15 * seconds, + }, + }, + interval: 3 * time.Hour, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + splits := splitMetricByTime(tc.input, tc.interval) + require.Equal(t, tc.expected, splits) + }) + } +} + func Test_splitByInterval_Do(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") next := queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { @@ -129,6 +296,7 @@ func Test_splitByInterval_Do(t *testing.T) { split := SplitByIntervalMiddleware( l, lokiCodec, + splitByTime, nilMetrics, ).Wrap(next) @@ -270,7 +438,6 @@ func Test_splitByInterval_Do(t *testing.T) { require.Equal(t, tt.want, res) }) } - } func Test_series_splitByInterval_Do(t *testing.T) { @@ -297,6 +464,7 @@ func Test_series_splitByInterval_Do(t *testing.T) { split := SplitByIntervalMiddleware( l, lokiCodec, + splitByTime, nilMetrics, ).Wrap(next) @@ -335,7 +503,6 @@ func Test_series_splitByInterval_Do(t *testing.T) { require.Equal(t, tt.want, res) }) } - } func Test_ExitEarly(t *testing.T) { @@ -378,6 +545,7 @@ func Test_ExitEarly(t *testing.T) { split := SplitByIntervalMiddleware( l, lokiCodec, + splitByTime, nilMetrics, ).Wrap(next) @@ -427,7 +595,6 @@ func Test_DoesntDeadlock(t *testing.T) { n := 10 next := queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { - return &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: r.(*LokiRequest).Direction, @@ -457,6 +624,7 @@ func Test_DoesntDeadlock(t *testing.T) { split := SplitByIntervalMiddleware( l, lokiCodec, + splitByTime, nilMetrics, ).Wrap(next) @@ -489,5 +657,4 @@ func Test_DoesntDeadlock(t *testing.T) { // give runtime a bit of slack when catching up -- this isn't an exact science :( // Allow for 1% increase in goroutines require.LessOrEqual(t, endingGoroutines, startingGoroutines*101/100) - }