Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: special case the return values from a sharded first/last_over_time query #13578

Merged
merged 7 commits into from
Jul 25, 2024
1 change: 0 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
Expand Down
60 changes: 59 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestMappingEquivalence(t *testing.T) {
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
t.Run(tc.query+"_range", func(t *testing.T) {
params, err := NewLiteralParams(
tc.query,
start,
Expand Down Expand Up @@ -125,6 +125,46 @@ func TestMappingEquivalence(t *testing.T) {
require.Equal(t, res.Data, shardedRes.Data)
}
})
t.Run(tc.query+"_instant", func(t *testing.T) {
// for an instant query we set the start and end to the same timestamp
// plus set step and interval to 0
params, err := NewLiteralParams(
tc.query,
time.Unix(0, int64(rounds+1)),
time.Unix(0, int64(rounds+1)),
0,
0,
logproto.FORWARD,
uint32(limit),
nil,
nil,
)
require.NoError(t, err)
qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: mapped,
})

res, err := qry.Exec(ctx)
require.NoError(t, err)

shardedRes, err := shardedQry.Exec(ctx)
require.NoError(t, err)

if tc.approximate {
approximatelyEqualsVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector)) //, tc.realtiveError)
} else {
require.Equal(t, res.Data, shardedRes.Data)
}
})
}
}

Expand Down Expand Up @@ -579,6 +619,24 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
}
}

// approximatelyEqualsVector ensures two responses are approximately equal,
// up to 6 decimals precision per sample
func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) {
require.Len(t, bs, len(as))

for i := 0; i < len(as); i++ {
a := as[i]
b := bs[i]
require.Equal(t, a.Metric, b.Metric)

aSample := a.F
aSample = math.Round(aSample*1e6) / 1e6
bSample := b.F
bSample = math.Round(bSample*1e6) / 1e6
require.Equalf(t, aSample, bSample, "metric %s differs from %s at %d", a.Metric, b.Metric, i)
}
}

func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) {
require.Len(t, actual, len(expected))

Expand Down
68 changes: 44 additions & 24 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
mfl := false
if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) {
mfl = true
}
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl)
case ProbabilisticQuantileVector:
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
Expand All @@ -381,9 +385,31 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, errors.New("unexpected empty result")
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
seriesIndex := map[uint64]*promql.Series{}
func vectorsToSeries(vec promql.Vector, sm map[uint64]promql.Series) {
for _, p := range vec {
var (
series promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = sm[hash]
if !ok {
series = promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, 1),
}
sm[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
sm[hash] = series
}
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) {
vec := promql.Vector{}
if next {
vec = r.SampleVector()
Expand All @@ -393,8 +419,21 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
if len(vec) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
}
seriesIndex := map[uint64]promql.Series{}

if GetRangeType(q.params) == InstantType {
// an instant query sharded first/last_over_time can return a single vector
if mergeFirstLast {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does TestMappingEquivalence_Instant need a test for the false case (existing behavior) here? Or is that adequately covered elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the tests here execute both the sharded and unsharded (aka not mergeFirstLast) versions of the queries, and then compare the results

vectorsToSeries(vec, seriesIndex)
series := make([]promql.Series, 0, len(seriesIndex))
for _, s := range seriesIndex {
series = append(series, s)
}
result := promql.Matrix(series)
sort.Sort(result)
return result, stepEvaluator.Error()
}

sortByValue, err := Sortable(q.params)
if err != nil {
return nil, fmt.Errorf("fail to check Sortable, logql: %s ,err: %s", q.params.QueryString(), err)
Expand All @@ -412,26 +451,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval

for next {
vec = r.SampleVector()
for _, p := range vec {
var (
series *promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = seriesIndex[hash]
if !ok {
series = &promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, stepCount),
}
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
}
vectorsToSeries(vec, seriesIndex)
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
Expand All @@ -444,7 +464,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval

series := make([]promql.Series, 0, len(seriesIndex))
for _, s := range seriesIndex {
series = append(series, *s)
series = append(series, s)
}
result := promql.Matrix(series)
sort.Sort(result)
Expand Down
7 changes: 5 additions & 2 deletions pkg/logql/first_last_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type mergeOverTimeStepEvaluator struct {

// Next returns the first or last element within one step of each matrix.
func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
var vec promql.Vector
vec := promql.Vector{}

e.ts = e.ts.Add(e.step)
if e.ts.After(e.end) {
Expand All @@ -142,7 +142,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
// Merge other results
for i, m := range e.matrices {
for j, series := range m {

if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
continue
}
Expand Down Expand Up @@ -171,6 +170,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {

// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
// special case instant queries
if e.step.Milliseconds() == 0 {
return true
}
return (ts-e.step.Milliseconds()) <= t && t < ts
}

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
if err != nil {
return nil, err
}

results = append(results, res)
}

Expand Down
Loading