From b2aec4a4f3c4c688b0569dc318ba250160c629d2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 29 Mar 2021 03:46:18 -0400 Subject: [PATCH] Fixes a bug in MatrixStepper when sharding queries. (#3550) * Fixes a bug in MatrixStepper when sharding queries. Since we split correctly metric queries, this bug has show itself. Basically we were not correctly stepping through time. We should always start from start, add the step until the start is after the end. For more read: https://www.robustperception.io/step-and-query_range Fixes #3541 Signed-off-by: Cyril Tovena * Fixes tests. Signed-off-by: Cyril Tovena * Fixes tests. for real. Signed-off-by: Cyril Tovena * Not easy to get those test working. Signed-off-by: Cyril Tovena (cherry picked from commit b11d2effe29d0ee497def7eef84634310563c602) --- pkg/logcli/query/query.go | 8 +------ pkg/logql/matrix.go | 2 +- pkg/logql/matrix_test.go | 48 +++++++++++++++++++++++++++++++++++---- pkg/logql/test_utils.go | 6 ++--- 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index a3403c4b0298..efba440b86f5 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -58,7 +58,6 @@ type Query struct { // DoQuery executes the query and prints out the results func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) { - if q.LocalConfig != "" { if err := q.DoLocalQuery(out, statistics, c.GetOrgID()); err != nil { log.Fatalf("Query failed: %+v", err) @@ -149,7 +148,6 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) } } - } func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { @@ -172,7 +170,6 @@ func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, las // DoLocalQuery executes the query against the local store using a Loki configuration file. func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string) error { - var conf loki.Config conf.RegisterFlags(flag.CommandLine) if q.LocalConfig == "" { @@ -255,7 +252,7 @@ func (q *Query) SetInstant(time time.Time) { } func (q *Query) isInstant() bool { - return q.Start == q.End + return q.Start == q.End && q.Step == 0 } func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { @@ -369,7 +366,6 @@ func (q *Query) printMatrix(matrix loghttp.Matrix) { // it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually // we might add output options such as render to an image file on disk bytes, err := json.MarshalIndent(matrix, "", " ") - if err != nil { log.Fatalf("Error marshalling matrix: %v", err) } @@ -379,7 +375,6 @@ func (q *Query) printMatrix(matrix loghttp.Matrix) { func (q *Query) printVector(vector loghttp.Vector) { bytes, err := json.MarshalIndent(vector, "", " ") - if err != nil { log.Fatalf("Error marshalling vector: %v", err) } @@ -389,7 +384,6 @@ func (q *Query) printVector(vector loghttp.Vector) { func (q *Query) printScalar(scalar loghttp.Scalar) { bytes, err := json.MarshalIndent(scalar, "", " ") - if err != nil { log.Fatalf("Error marshalling scalar: %v", err) } diff --git a/pkg/logql/matrix.go b/pkg/logql/matrix.go index 4a1e40678148..e2feaeb6bb46 100644 --- a/pkg/logql/matrix.go +++ b/pkg/logql/matrix.go @@ -33,7 +33,7 @@ func NewMatrixStepper(start, end time.Time, step time.Duration, m promql.Matrix) func (m *MatrixStepper) Next() (bool, int64, promql.Vector) { m.ts = m.ts.Add(m.step) - if !m.ts.Before(m.end) { + if m.ts.After(m.end) { return false, 0, nil } diff --git a/pkg/logql/matrix_test.go b/pkg/logql/matrix_test.go index ddd0389c5c35..56a3d2a6d33b 100644 --- a/pkg/logql/matrix_test.go +++ b/pkg/logql/matrix_test.go @@ -20,7 +20,7 @@ func TestMatrixStepper(t *testing.T) { promql.Series{ Metric: labels.Labels{{Name: "foo", Value: "bar"}}, Points: []promql.Point{ - {T: start.UnixNano() / int64(step), V: 0}, + {T: start.UnixNano(), V: 0}, {T: start.Add(step).UnixNano() / int64(time.Millisecond), V: 1}, {T: start.Add(2*step).UnixNano() / int64(time.Millisecond), V: 2}, {T: start.Add(3*step).UnixNano() / int64(time.Millisecond), V: 3}, @@ -42,11 +42,11 @@ func TestMatrixStepper(t *testing.T) { expected := []promql.Vector{ { promql.Sample{ - Point: promql.Point{T: start.UnixNano() / int64(step), V: 0}, + Point: promql.Point{T: start.UnixNano(), V: 0}, Metric: labels.Labels{{Name: "foo", Value: "bar"}}, }, promql.Sample{ - Point: promql.Point{T: start.UnixNano() / int64(step), V: 0}, + Point: promql.Point{T: start.UnixNano(), V: 0}, Metric: labels.Labels{{Name: "bazz", Value: "buzz"}}, }, }, @@ -100,9 +100,19 @@ func TestMatrixStepper(t *testing.T) { Metric: labels.Labels{{Name: "bazz", Value: "buzz"}}, }, }, + { + promql.Sample{ + Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 0}, + Metric: labels.Labels{{Name: "foo", Value: "bar"}}, + }, + promql.Sample{ + Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 0}, + Metric: labels.Labels{{Name: "bazz", Value: "buzz"}}, + }, + }, } - for i := 0; i < int(end.Sub(start)/step); i++ { + for i := 0; i <= int(end.Sub(start)/step); i++ { ok, ts, vec := s.Next() require.Equal(t, ok, true) require.Equal(t, start.Add(step*time.Duration(i)).UnixNano()/int64(time.Millisecond), ts) @@ -113,3 +123,33 @@ func TestMatrixStepper(t *testing.T) { require.Equal(t, ok, false) } + +func Test_SingleStepMatrix(t *testing.T) { + var ( + start = time.Unix(0, 0) + end = time.Unix(0, 0) + step = time.Second + ) + + m := promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{ + {T: start.UnixNano(), V: 10}, + }, + }, + } + + s := NewMatrixStepper(start, end, step, m) + + ok, ts, vec := s.Next() + require.True(t, ok) + require.Equal(t, start.UnixNano(), ts) + require.Equal(t, promql.Vector{promql.Sample{ + Point: promql.Point{T: start.UnixNano(), V: 10}, + Metric: labels.Labels{}, + }}, vec) + + ok, _, _ = s.Next() + require.False(t, ok) +} diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index c15970bcf726..cd610d33b0dc 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -201,7 +201,7 @@ outer: return iter.NewTimeRangedSampleIterator( iter.NewMultiSeriesIterator(ctx, filtered), req.Start.UnixNano(), - req.End.UnixNano(), + req.End.UnixNano()+1, ), nil } @@ -232,7 +232,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu results = append(results, res) } return results, nil - } // create nStreams of nEntries with labelNames each where each label value @@ -256,7 +255,7 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream Value: fmt.Sprintf("%d", shard), }) } - for j := 0; j < nEntries; j++ { + for j := 0; j <= nEntries; j++ { stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Unix(0, int64(j*int(time.Second))), Line: fmt.Sprintf("line number: %d", j), @@ -267,7 +266,6 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream streams = append(streams, stream) } return streams - } func mustParseLabels(s string) labels.Labels {