Skip to content

Commit

Permalink
fix: Propagate query stats from quantile & topk queries (#13831)
Browse files Browse the repository at this point in the history
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
benclive and cstyan authored Sep 17, 2024
1 parent 1e939aa commit 78b275b
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 141 deletions.
1 change: 1 addition & 0 deletions pkg/logql/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel

var err error
a.matrix, err = a.matrix.Merge(data)
a.stats.Merge(res.Statistics)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")
sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "start", req.GetStart(), "end", req.GetEnd(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")

res, err := in.handler.Do(ctx, req)
if err != nil {
Expand Down
24 changes: 14 additions & 10 deletions pkg/querier/queryrange/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,17 @@ func ResultToResponse(result logqlmodel.Result, params logql.Params) (queryrange
case sketch.TopKMatrix:
sk, err := data.ToProto()
return &TopKSketchesResponse{
Response: sk,
Warnings: result.Warnings,
Response: sk,
Warnings: result.Warnings,
Statistics: result.Statistics,
}, err
case logql.ProbabilisticQuantileMatrix:
r := data.ToProto()
data.Release()
return &QuantileSketchResponse{
Response: r,
Warnings: result.Warnings,
Response: r,
Warnings: result.Warnings,
Statistics: result.Statistics,
}, nil
}

Expand Down Expand Up @@ -184,19 +186,21 @@ func ResponseToResult(resp queryrangebase.Response) (logqlmodel.Result, error) {
}

return logqlmodel.Result{
Data: matrix,
Headers: resp.GetHeaders(),
Warnings: r.Warnings,
Data: matrix,
Headers: resp.GetHeaders(),
Warnings: r.Warnings,
Statistics: r.Statistics,
}, nil
case *QuantileSketchResponse:
matrix, err := logql.ProbabilisticQuantileMatrixFromProto(r.Response)
if err != nil {
return logqlmodel.Result{}, fmt.Errorf("cannot decode quantile sketch: %w", err)
}
return logqlmodel.Result{
Data: matrix,
Headers: resp.GetHeaders(),
Warnings: r.Warnings,
Data: matrix,
Headers: resp.GetHeaders(),
Warnings: r.Warnings,
Statistics: r.Statistics,
}, nil
default:
return logqlmodel.Result{}, fmt.Errorf("cannot decode (%T)", resp)
Expand Down
Loading

0 comments on commit 78b275b

Please sign in to comment.