From ba0a46b1e9fe28ec08f00f4a8cff411a00aab1e3 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 18 Jul 2024 16:14:23 -0700 Subject: [PATCH 1/6] special case the return values from a sharded first/last_over_time query Signed-off-by: Callum Styan --- pkg/logql/downstream.go | 1 - pkg/logql/downstream_test.go | 69 +++++++++++++++++++++++++++++++ pkg/logql/engine.go | 66 +++++++++++++++++++---------- pkg/logql/first_last_over_time.go | 5 ++- pkg/logql/test_utils.go | 1 - 5 files changed, 117 insertions(+), 25 deletions(-) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index f28eaddde3f1..615ddd0f3f47 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -599,7 +599,6 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type()) } } - return NewMergeLastOverTimeStepEvaluator(params, xs), nil default: return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index ae313ea1fc48..d1eac6b35d89 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -230,6 +230,75 @@ func TestMappingEquivalenceSketches(t *testing.T) { } } +func TestMappingEquivalence_Instant(t *testing.T) { + var ( + shards = 3 + nStreams = 60 + rounds = 20 + streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true) + end = time.Unix(0, int64(time.Second*time.Duration(rounds))) + interval = time.Duration(0) + limit = 100 + ) + + for _, tc := range []struct { + query string + approximate bool + shardAgg []string + }{ + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}}, + } { + q := NewMockQuerier( + shards, + streams, + ) + + opts := EngineOpts{} + regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) + sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) + + t.Run(tc.query, func(t *testing.T) { + params, err := NewLiteralParams( + tc.query, + end, + end, + 0, + interval, + logproto.FORWARD, + uint32(limit), + nil, + nil, + ) + require.NoError(t, err) + + qry := regular.Query(params) + ctx := user.InjectOrgID(context.Background(), "fake") + + strategy := NewPowerOfTwoStrategy(ConstantShards(shards)) + mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg) + _, _, mapped, err := mapper.Parse(params.GetExpression()) + require.NoError(t, err) + + shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{Params: params, ExpressionOverride: mapped}) + + res, err := qry.Exec(ctx) + require.NoError(t, err) + + shardedRes, err := shardedQry.Exec(ctx) + require.NoError(t, err) + + if tc.approximate { + approximatelyEquals(t, res.Data.(promql.Matrix), shardedRes.Data.(promql.Matrix)) + } else { + require.Equal(t, res.Data, shardedRes.Data) + } + }) + } +} + func TestShardCounter(t *testing.T) { var ( shards = 3 diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 0a26520b673c..14fd8249a3e8 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -371,7 +371,11 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ case SampleVector: maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) } maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) - return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries) + mfl := false + if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) { + mfl = true + } + return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl) case ProbabilisticQuantileVector: return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params) default: @@ -381,9 +385,36 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ return nil, errors.New("unexpected empty result") } -func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { +func vectorsToSeries(vec promql.Vector) ([]promql.Series, map[uint64]*promql.Series) { seriesIndex := map[uint64]*promql.Series{} + for _, p := range vec { + var ( + series *promql.Series + hash = p.Metric.Hash() + ok bool + ) + + series, ok = seriesIndex[hash] + if !ok { + series = &promql.Series{ + Metric: p.Metric, + Floats: make([]promql.FPoint, 0, 1), + } + seriesIndex[hash] = series + } + series.Floats = append(series.Floats, promql.FPoint{ + T: p.T, + F: p.F, + }) + } + series := make([]promql.Series, 0, len(seriesIndex)) + for _, s := range seriesIndex { + series = append(series, *s) + } + return series, seriesIndex +} +func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) { vec := promql.Vector{} if next { vec = r.SampleVector() @@ -395,6 +426,14 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval } if GetRangeType(q.params) == InstantType { + // an instant query sharded first/last_over_time can return a single vector + if mergeFirstLast { + series, _ := vectorsToSeries(vec) + result := promql.Matrix(series) + sort.Sort(result) + return result, stepEvaluator.Error() + } + sortByValue, err := Sortable(q.params) if err != nil { return nil, fmt.Errorf("fail to check Sortable, logql: %s ,err: %s", q.params.QueryString(), err) @@ -410,28 +449,11 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval stepCount = 1 } + seriesIndex := map[uint64]*promql.Series{} for next { vec = r.SampleVector() - for _, p := range vec { - var ( - series *promql.Series - hash = p.Metric.Hash() - ok bool - ) - - series, ok = seriesIndex[hash] - if !ok { - series = &promql.Series{ - Metric: p.Metric, - Floats: make([]promql.FPoint, 0, stepCount), - } - seriesIndex[hash] = series - } - series.Floats = append(series.Floats, promql.FPoint{ - T: p.T, - F: p.F, - }) - } + _, seriesIndex = vectorsToSeries(vec) + // as we slowly build the full query for each steps, make sure we don't go over the limit of unique series. if len(seriesIndex) > maxSeries { return nil, logqlmodel.NewSeriesLimitError(maxSeries) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 6d0329cacf8d..352b03ee2517 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -142,7 +142,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { // Merge other results for i, m := range e.matrices { for j, series := range m { - if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { continue } @@ -171,6 +170,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) { // inRange returns true if t is in step range of ts. func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool { + // special case instant queries + if e.step.Milliseconds() == 0 { + return true + } return (ts-e.step.Milliseconds()) <= t && t < ts } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 7c9d3233f429..61a60e44f80f 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -234,7 +234,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu if err != nil { return nil, err } - results = append(results, res) } From 63def4856d8ed7530ccf56eae3a1c4b84a17634a Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 24 Jul 2024 13:38:59 -0700 Subject: [PATCH 2/6] new test doesn't need to test any approximate query types Signed-off-by: Callum Styan --- pkg/logql/downstream_test.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index d1eac6b35d89..d2bab3f23091 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -242,14 +242,13 @@ func TestMappingEquivalence_Instant(t *testing.T) { ) for _, tc := range []struct { - query string - approximate bool - shardAgg []string + query string + shardAgg []string }{ - {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}}, - {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}}, - {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}}, - {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, []string{ShardFirstOverTime}}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, []string{ShardFirstOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, []string{ShardLastOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, []string{ShardLastOverTime}}, } { q := NewMockQuerier( shards, @@ -290,11 +289,7 @@ func TestMappingEquivalence_Instant(t *testing.T) { shardedRes, err := shardedQry.Exec(ctx) require.NoError(t, err) - if tc.approximate { - approximatelyEquals(t, res.Data.(promql.Matrix), shardedRes.Data.(promql.Matrix)) - } else { - require.Equal(t, res.Data, shardedRes.Data) - } + require.Equal(t, res.Data, shardedRes.Data) }) } } From d1d72edda2580d8bcb56ecd8c450d2c407ad2368 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 24 Jul 2024 15:39:49 -0700 Subject: [PATCH 3/6] add more test cases + simplify vectorsToSeries Signed-off-by: Callum Styan --- pkg/logql/downstream_test.go | 60 +++++++++++++++++++++++++++++++++++- pkg/logql/engine.go | 30 +++++++++--------- 2 files changed, 73 insertions(+), 17 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index d2bab3f23091..c5bd04e1dd9c 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -89,7 +89,7 @@ func TestMappingEquivalence(t *testing.T) { regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) - t.Run(tc.query, func(t *testing.T) { + t.Run(tc.query+"_range", func(t *testing.T) { params, err := NewLiteralParams( tc.query, start, @@ -125,6 +125,46 @@ func TestMappingEquivalence(t *testing.T) { require.Equal(t, res.Data, shardedRes.Data) } }) + t.Run(tc.query+"_instant", func(t *testing.T) { + // for an instant query we set the start and end to the same timestamp + // plus set step and interval to 0 + params, err := NewLiteralParams( + tc.query, + time.Unix(0, int64(rounds+1)), + time.Unix(0, int64(rounds+1)), + 0, + 0, + logproto.FORWARD, + uint32(limit), + nil, + nil, + ) + require.NoError(t, err) + qry := regular.Query(params) + ctx := user.InjectOrgID(context.Background(), "fake") + + strategy := NewPowerOfTwoStrategy(ConstantShards(shards)) + mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg) + _, _, mapped, err := mapper.Parse(params.GetExpression()) + require.NoError(t, err) + + shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: mapped, + }) + + res, err := qry.Exec(ctx) + require.NoError(t, err) + + shardedRes, err := shardedQry.Exec(ctx) + require.NoError(t, err) + + if tc.approximate { + approximatelyEqualsVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector)) //, tc.realtiveError) + } else { + require.Equal(t, res.Data, shardedRes.Data) + } + }) } } @@ -643,6 +683,24 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) { } } +// approximatelyEquals ensures two responses are approximately equal, +// up to 6 decimals precision per sample +func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) { + require.Len(t, bs, len(as)) + + for i := 0; i < len(as); i++ { + a := as[i] + b := bs[i] + require.Equal(t, a.Metric, b.Metric) + + aSample := a.F + aSample = math.Round(aSample*1e6) / 1e6 + bSample := b.F + bSample = math.Round(bSample*1e6) / 1e6 + require.Equalf(t, a, b, "metric %s differs from %s at %d", a.Metric, b.Metric, i) + } +} + func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) { require.Len(t, actual, len(expected)) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 14fd8249a3e8..294fe81457de 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -385,33 +385,28 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ return nil, errors.New("unexpected empty result") } -func vectorsToSeries(vec promql.Vector) ([]promql.Series, map[uint64]*promql.Series) { - seriesIndex := map[uint64]*promql.Series{} +func vectorsToSeries(vec promql.Vector, sm map[uint64]promql.Series) { for _, p := range vec { var ( - series *promql.Series + series promql.Series hash = p.Metric.Hash() ok bool ) - series, ok = seriesIndex[hash] + series, ok = sm[hash] if !ok { - series = &promql.Series{ + series = promql.Series{ Metric: p.Metric, Floats: make([]promql.FPoint, 0, 1), } - seriesIndex[hash] = series + sm[hash] = series } series.Floats = append(series.Floats, promql.FPoint{ T: p.T, F: p.F, }) + sm[hash] = series } - series := make([]promql.Series, 0, len(seriesIndex)) - for _, s := range seriesIndex { - series = append(series, *s) - } - return series, seriesIndex } func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) { @@ -424,11 +419,16 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval if len(vec) > maxSeries { return nil, logqlmodel.NewSeriesLimitError(maxSeries) } + seriesIndex := map[uint64]promql.Series{} if GetRangeType(q.params) == InstantType { // an instant query sharded first/last_over_time can return a single vector if mergeFirstLast { - series, _ := vectorsToSeries(vec) + vectorsToSeries(vec, seriesIndex) + series := make([]promql.Series, 0, len(seriesIndex)) + for _, s := range seriesIndex { + series = append(series, s) + } result := promql.Matrix(series) sort.Sort(result) return result, stepEvaluator.Error() @@ -449,11 +449,9 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval stepCount = 1 } - seriesIndex := map[uint64]*promql.Series{} for next { vec = r.SampleVector() - _, seriesIndex = vectorsToSeries(vec) - + vectorsToSeries(vec, seriesIndex) // as we slowly build the full query for each steps, make sure we don't go over the limit of unique series. if len(seriesIndex) > maxSeries { return nil, logqlmodel.NewSeriesLimitError(maxSeries) @@ -466,7 +464,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval series := make([]promql.Series, 0, len(seriesIndex)) for _, s := range seriesIndex { - series = append(series, *s) + series = append(series, s) } result := promql.Matrix(series) sort.Sort(result) From 666b40ad2aab8eb90b718d607891750399bb4da4 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 24 Jul 2024 15:44:40 -0700 Subject: [PATCH 4/6] ensure that an empty result is promql.Vector{} not promql.Vector(nil) Signed-off-by: Callum Styan --- pkg/logql/first_last_over_time.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 352b03ee2517..4b6bbde55173 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -131,7 +131,7 @@ type mergeOverTimeStepEvaluator struct { // Next returns the first or last element within one step of each matrix. func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { - var vec promql.Vector + vec := promql.Vector{} e.ts = e.ts.Add(e.step) if e.ts.After(e.end) { From 67945388c95539fee842c3985489dcfddd9bc0ed Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 24 Jul 2024 15:47:01 -0700 Subject: [PATCH 5/6] remove unnecessary test since we made a sub-test for instant queries in TestMappingEquivalence Signed-off-by: Callum Styan --- pkg/logql/downstream_test.go | 64 ------------------------------------ 1 file changed, 64 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index c5bd04e1dd9c..ef8d12987f42 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -270,70 +270,6 @@ func TestMappingEquivalenceSketches(t *testing.T) { } } -func TestMappingEquivalence_Instant(t *testing.T) { - var ( - shards = 3 - nStreams = 60 - rounds = 20 - streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true) - end = time.Unix(0, int64(time.Second*time.Duration(rounds))) - interval = time.Duration(0) - limit = 100 - ) - - for _, tc := range []struct { - query string - shardAgg []string - }{ - {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, []string{ShardFirstOverTime}}, - {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, []string{ShardFirstOverTime}}, - {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, []string{ShardLastOverTime}}, - {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, []string{ShardLastOverTime}}, - } { - q := NewMockQuerier( - shards, - streams, - ) - - opts := EngineOpts{} - regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) - sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) - - t.Run(tc.query, func(t *testing.T) { - params, err := NewLiteralParams( - tc.query, - end, - end, - 0, - interval, - logproto.FORWARD, - uint32(limit), - nil, - nil, - ) - require.NoError(t, err) - - qry := regular.Query(params) - ctx := user.InjectOrgID(context.Background(), "fake") - - strategy := NewPowerOfTwoStrategy(ConstantShards(shards)) - mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg) - _, _, mapped, err := mapper.Parse(params.GetExpression()) - require.NoError(t, err) - - shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{Params: params, ExpressionOverride: mapped}) - - res, err := qry.Exec(ctx) - require.NoError(t, err) - - shardedRes, err := shardedQry.Exec(ctx) - require.NoError(t, err) - - require.Equal(t, res.Data, shardedRes.Data) - }) - } -} - func TestShardCounter(t *testing.T) { var ( shards = 3 From b46e6695a1275193c82d304b089bd070ab1a2a26 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 24 Jul 2024 16:52:10 -0700 Subject: [PATCH 6/6] fix test Signed-off-by: Callum Styan --- pkg/logql/downstream_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index ef8d12987f42..b1082f3d6ad7 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -619,7 +619,7 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) { } } -// approximatelyEquals ensures two responses are approximately equal, +// approximatelyEqualsVector ensures two responses are approximately equal, // up to 6 decimals precision per sample func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) { require.Len(t, bs, len(as)) @@ -633,7 +633,7 @@ func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) { aSample = math.Round(aSample*1e6) / 1e6 bSample := b.F bSample = math.Round(bSample*1e6) / 1e6 - require.Equalf(t, a, b, "metric %s differs from %s at %d", a.Metric, b.Metric, i) + require.Equalf(t, aSample, bSample, "metric %s differs from %s at %d", a.Metric, b.Metric, i) } }