From b1e6b77baebdf6eceada9361b50f7fd2703cd303 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Mon, 24 Jan 2022 11:16:42 +0530 Subject: [PATCH 1/2] step align start and end time of the original query while splitting it --- pkg/querier/queryrange/split_by_interval.go | 22 +++++++---- .../queryrange/split_by_interval_test.go | 38 +++++++++++++++++-- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 1542781f077d..2c41de058a2a 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -5,6 +5,7 @@ import ( "net/http" "time" + "github.com/cortexproject/cortex/pkg/util" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" @@ -328,6 +329,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer lokiReq := r.(*LokiRequest) // step is >= configured split interval, let us just split the query interval by step + // Note: start and end time of the metric queries with step >= split interval will not be step aligned. if lokiReq.Step >= interval.Milliseconds() { forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) { reqs = append(reqs, &LokiRequest{ @@ -344,12 +346,20 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer return reqs, nil } - // nextIntervalBoundary always moves ahead in a multiple of steps but the time it returns would not be step aligned. - // To have step aligned intervals for better cache-ability of results, let us step align the start time which make all the split intervals step aligned. + // step align start and end time of the query. Start time is rounded down and end time is rounded up. + stepNs := r.GetStep() * 1e6 startNs := lokiReq.StartTs.UnixNano() - start := time.Unix(0, startNs-startNs%(r.GetStep()*1e6)) + start := time.Unix(0, startNs-startNs%stepNs) - for start := start; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { + endNs := lokiReq.EndTs.UnixNano() + if mod := endNs % stepNs; mod != 0 { + endNs += stepNs - mod + } + end := time.Unix(0, endNs) + + lokiReq = lokiReq.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).(*LokiRequest) + + for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { end := nextIntervalBoundary(start, r.GetStep(), interval) if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs { end = lokiReq.EndTs @@ -365,10 +375,6 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer }) } - if len(reqs) != 0 { - // change the start time to original time - reqs[0] = reqs[0].WithStartEnd(lokiReq.GetStart(), reqs[0].GetEnd()) - } return reqs, nil } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index ef65a01d12e3..1d135b693229 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -376,14 +376,44 @@ func Test_splitMetricQuery(t *testing.T) { }, &LokiRequest{ StartTs: time.Unix(2*3*3600+7, 0), - EndTs: time.Unix(3*3*3600, 0), + EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s Step: 17 * seconds, Query: `rate({app="foo"}[1m])`, }, }, interval: 3 * time.Hour, }, - // start time not aligned with step + // end time already step aligned + { + input: &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + expected: []queryrangebase.Request{ + &LokiRequest{ + StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s + EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + &LokiRequest{ + StartTs: time.Unix((3*3600)+12, 0), + EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + &LokiRequest{ + StartTs: time.Unix(2*3*3600+7, 0), + EndTs: time.Unix(3*3*3600+2, 0), + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + }, + interval: 3 * time.Hour, + }, + // start & end time not aligned with step { input: &LokiRequest{ StartTs: time.Unix(2*3600, 0), @@ -393,7 +423,7 @@ func Test_splitMetricQuery(t *testing.T) { }, expected: []queryrangebase.Request{ &LokiRequest{ - StartTs: time.Unix(2*3600, 0), + StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s Step: 17 * seconds, Query: `rate({app="foo"}[1m])`, @@ -406,7 +436,7 @@ func Test_splitMetricQuery(t *testing.T) { }, &LokiRequest{ StartTs: time.Unix(2*3*3600+7, 0), - EndTs: time.Unix(3*3*3600, 0), + EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s Step: 17 * seconds, Query: `rate({app="foo"}[1m])`, }, From 8b9ba413f205f35b0685f40efd0030c6b5c7cdbb Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Mon, 24 Jan 2022 16:36:11 +0530 Subject: [PATCH 2/2] also step align queries with step > split interval --- pkg/querier/queryrange/split_by_interval.go | 28 +++++++++---------- .../queryrange/split_by_interval_test.go | 6 ++-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 2c41de058a2a..a22ae8189600 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -328,8 +328,21 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer } lokiReq := r.(*LokiRequest) + + // step align start and end time of the query. Start time is rounded down and end time is rounded up. + stepNs := r.GetStep() * 1e6 + startNs := lokiReq.StartTs.UnixNano() + start := time.Unix(0, startNs-startNs%stepNs) + + endNs := lokiReq.EndTs.UnixNano() + if mod := endNs % stepNs; mod != 0 { + endNs += stepNs - mod + } + end := time.Unix(0, endNs) + + lokiReq = lokiReq.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).(*LokiRequest) + // step is >= configured split interval, let us just split the query interval by step - // Note: start and end time of the metric queries with step >= split interval will not be step aligned. if lokiReq.Step >= interval.Milliseconds() { forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) { reqs = append(reqs, &LokiRequest{ @@ -346,19 +359,6 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer return reqs, nil } - // step align start and end time of the query. Start time is rounded down and end time is rounded up. - stepNs := r.GetStep() * 1e6 - startNs := lokiReq.StartTs.UnixNano() - start := time.Unix(0, startNs-startNs%stepNs) - - endNs := lokiReq.EndTs.UnixNano() - if mod := endNs % stepNs; mod != 0 { - endNs += stepNs - mod - } - end := time.Unix(0, endNs) - - lokiReq = lokiReq.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).(*LokiRequest) - for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { end := nextIntervalBoundary(start, r.GetStep(), interval) if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs { diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 1d135b693229..f1c37dfa4383 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -479,7 +479,7 @@ func Test_splitMetricQuery(t *testing.T) { }, &LokiRequest{ StartTs: time.Unix(24*3600, 0), - EndTs: time.Unix(25*3600, 0), + EndTs: time.Unix(30*3600, 0), Step: 6 * 3600 * seconds, Query: `rate({app="foo"}[1m])`, }, @@ -488,7 +488,7 @@ func Test_splitMetricQuery(t *testing.T) { }, { input: &LokiRequest{ - StartTs: time.Unix(0, 0), + StartTs: time.Unix(1*3600, 0), EndTs: time.Unix(3*3600, 0), Step: 6 * 3600 * seconds, Query: `rate({app="foo"}[1m])`, @@ -496,7 +496,7 @@ func Test_splitMetricQuery(t *testing.T) { expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), - EndTs: time.Unix(3*3600, 0), + EndTs: time.Unix(6*3600, 0), Step: 6 * 3600 * seconds, Query: `rate({app="foo"}[1m])`, },