Skip to content

Commit

Permalink
Literal Expressions in LogQL (#1677)
Browse files Browse the repository at this point in the history
* binops in ast

* bin op associativity & precedence

* binOpEvaluator work

* defers close only if constructed without error

* tests binary ops

* more binops

* updates docs

* changelog

* number literals in ast

* [wip] literalExpr parsing

* number parsing

* signed literals

* propagates evaluator close, handles healthchecks

* literal evaluator works on non commutative operations

* literalExprs cannot be used as legs of logical/set binops

* removes comment

* single literalExpr tests

* reduces binops with literals in ast construction where possible

* doc updates

* scalar datatypes in logql

* scalar serialization type

* increases safety and reduces complexity in ast evaluator

* recursive literal binop reduction parse test, improves parse errors on literal construction

* vector + literal test
  • Loading branch information
owen-d committed Feb 14, 2020
1 parent 92ec0e5 commit 414f95f
Show file tree
Hide file tree
Showing 13 changed files with 729 additions and 237 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

* [1677](https://github.com/grafana/loki/pull/1677) **owen-d**: Introduces numeric literals in LogQL
* [1686](https://github.com/grafana/loki/pull/1686) **owen-d**: Introduces the `distributor.max-line-size` flag and associated yaml config. When enabled, lines longer than this config will not be accepted.
* [1662](https://github.com/grafana/loki/pull/1662) **owen-d**: Introduces binary operators in LogQL
* [1572](https://github.com/grafana/loki/pull/1572) **owen-d**: Introduces the `querier.query-ingesters-within` flag and associated yaml config. When enabled, queries for a time range that do not overlap this lookback interval will not be sent to the ingesters.
Expand Down
16 changes: 14 additions & 2 deletions docs/logql.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,24 @@ The following binary arithmetic operators exist in Loki:
- `%` (modulo)
- `^` (power/exponentiation)

Binary arithmetic operators are defined only between two vectors.
Binary arithmetic operators are defined between two literals (scalars), a literal and a vector, and two vectors.

Between two instant vectors, a binary arithmetic operator is applied to each entry in the left-hand side vector and its matching element in the right-hand vector. The result is propagated into the result vector with the grouping labels becoming the output label set. Entries for which no matching entry in the right-hand vector can be found are not part of the result.
Between two literals, the behavior is obvious: they evaluate to another literal that is the result of the operator applied to both scalar operands (1 + 1 = 2).

Between a vector and a literal, the operator is applied to the value of every data sample in the vector. E.g. if a time series vector is multiplied by 2, the result is another vector in which every sample value of the original vector is multiplied by 2.

Between two vectors, a binary arithmetic operator is applied to each entry in the left-hand side vector and its matching element in the right-hand vector. The result is propagated into the result vector with the grouping labels becoming the output label set. Entries for which no matching entry in the right-hand vector can be found are not part of the result.

##### Examples

Implement a health check with a simple query:

> `1 + 1`
Double the rate of a a log stream's entries:

> `sum(rate({app="foo"})) * 2`
Get proportion of warning logs to error logs for the `foo` app

> `sum(rate({app="foo", level="warn"}[1m])) / sum(rate({app="foo", level="error"}[1m]))`
Expand Down
11 changes: 11 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ResultType string
// ResultType values
const (
ResultTypeStream = "streams"
ResultTypeScalar = "scalar"
ResultTypeVector = "vector"
ResultTypeMatrix = "matrix"
)
Expand All @@ -65,6 +66,9 @@ type QueryResponseData struct {
// Type implements the promql.Value interface
func (Streams) Type() ResultType { return ResultTypeStream }

// Type implements the promql.Value interface
func (Scalar) Type() ResultType { return ResultTypeScalar }

// Type implements the promql.Value interface
func (Vector) Type() ResultType { return ResultTypeVector }

Expand Down Expand Up @@ -127,6 +131,10 @@ func (q *QueryResponseData) UnmarshalJSON(data []byte) error {
var v Vector
err = json.Unmarshal(unmarshal.Result, &v)
value = v
case ResultTypeScalar:
var v Scalar
err = json.Unmarshal(unmarshal.Result, &v)
value = v
default:
return fmt.Errorf("unknown type: %s", unmarshal.Type)
}
Expand Down Expand Up @@ -171,6 +179,9 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
return nil
}

// Scalar is a single timestamp/float with no labels
type Scalar model.Scalar

// Vector is a slice of Samples
type Vector []model.Sample

Expand Down
129 changes: 86 additions & 43 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package logql
import (
"bytes"
"context"
"errors"
"fmt"
"regexp"
"strconv"
Expand Down Expand Up @@ -228,60 +227,32 @@ const (
OpTypeCountOverTime = "count_over_time"
OpTypeRate = "rate"

// binops
// binops - logical/set
OpTypeOr = "or"
OpTypeAnd = "and"
OpTypeUnless = "unless"
OpTypeAdd = "+"
OpTypeSub = "-"
OpTypeMul = "*"
OpTypeDiv = "/"
OpTypeMod = "%"
OpTypePow = "^"

// binops - operations
OpTypeAdd = "+"
OpTypeSub = "-"
OpTypeMul = "*"
OpTypeDiv = "/"
OpTypeMod = "%"
OpTypePow = "^"
)

// IsLogicalBinOp tests whether an operation is a logical/set binary operation
func IsLogicalBinOp(op string) bool {
return op == OpTypeOr || op == OpTypeAnd || op == OpTypeUnless
}

// SampleExpr is a LogQL expression filtering logs and returning metric samples.
type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Selector() LogSelectorExpr
Expr
}

// StepEvaluator evaluate a single step of a query.
type StepEvaluator interface {
Next() (bool, int64, promql.Vector)
// Close all resources used.
Close() error
}

type stepEvaluator struct {
fn func() (bool, int64, promql.Vector)
close func() error
}

func newStepEvaluator(fn func() (bool, int64, promql.Vector), close func() error) (StepEvaluator, error) {
if fn == nil {
return nil, errors.New("nil step evaluator fn")
}

if close == nil {
close = func() error { return nil }
}

return &stepEvaluator{
fn: fn,
close: close,
}, nil
}

func (e *stepEvaluator) Next() (bool, int64, promql.Vector) {
return e.fn()
}

func (e *stepEvaluator) Close() error {
return e.close()
}

type rangeAggregationExpr struct {
left *logRange
operation string
Expand Down Expand Up @@ -348,6 +319,7 @@ func mustNewVectorAggregationExpr(left SampleExpr, operation string, gr *groupin
if p, err = strconv.Atoi(*params); err != nil {
panic(newParseError(fmt.Sprintf("invalid parameter %s(%s,", operation, *params), 0, 0))
}

default:
if params != nil {
panic(newParseError(fmt.Sprintf("unsupported parameter for operation %s(%s,", operation, *params), 0, 0))
Expand Down Expand Up @@ -409,13 +381,84 @@ func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr {
rhs,
), 0, 0))
}

leftLit, lOk := left.(*literalExpr)
rightLit, rOk := right.(*literalExpr)

if IsLogicalBinOp(op) {
if lOk {
panic(newParseError(fmt.Sprintf(
"unexpected literal for left leg of logical/set binary operation (%s): %f",
op,
leftLit.value,
), 0, 0))
}

if rOk {
panic(newParseError(fmt.Sprintf(
"unexpected literal for right leg of logical/set binary operation (%s): %f",
op,
rightLit.value,
), 0, 0))
}
}

// map expr like (1+1) -> 2
if lOk && rOk {
return reduceBinOp(op, leftLit, rightLit)
}

return &binOpExpr{
SampleExpr: left,
RHS: right,
op: op,
}
}

// Reduces a binary operation expression. A binop is reducable if both of its legs are literal expressions.
// This is because literals need match all labels, which is currently difficult to encode into StepEvaluators.
// Therefore, we ensure a binop can be reduced/simplified, maintaining the invariant that it does not have two literal legs.
func reduceBinOp(op string, left, right *literalExpr) *literalExpr {
merged := (&defaultEvaluator{}).mergeBinOp(
op,
&promql.Sample{Point: promql.Point{V: left.value}},
&promql.Sample{Point: promql.Point{V: right.value}},
)
return &literalExpr{value: merged.V}
}

type literalExpr struct {
value float64
}

func mustNewLiteralExpr(s string, invert bool) *literalExpr {
n, err := strconv.ParseFloat(s, 64)
if err != nil {
panic(newParseError(fmt.Sprintf("unable to parse literal as a float: %s", err.Error()), 0, 0))
}

if invert {
n = -n
}

return &literalExpr{
value: n,
}
}

func (e *literalExpr) logQLExpr() {}

func (e *literalExpr) String() string {
return fmt.Sprintf("%f", e.value)
}

// literlExpr impls SampleExpr & LogSelectorExpr mainly to reduce the need for more complicated typings
// to facilitate sum types. We'll be type switching when evaluating them anyways
// and they will only be present in binary operation legs.
func (e *literalExpr) Selector() LogSelectorExpr { return e }
func (e *literalExpr) Filter() (Filter, error) { return nil, nil }
func (e *literalExpr) Matchers() []*labels.Matcher { return nil }

// helper used to impl Stringer for vector and range aggregations
// nolint:interfacer
func formatOperation(op string, grouping *grouping, params ...string) string {
Expand Down
50 changes: 43 additions & 7 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,6 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
defer cancel()

qs := q.String()
// This is a legacy query used for health checking. Not the best practice, but it works.
if qs == "1+1" {
if GetRangeType(q) == InstantType {
return promql.Vector{}, nil
}
return promql.Matrix{}, nil
}

expr, err := ParseExpr(qs)
if err != nil {
Expand All @@ -211,6 +204,9 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {

// evalSample evaluate a sampleExpr
func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (promql.Value, error) {
if lit, ok := expr.(*literalExpr); ok {
return ng.evalLiteral(ctx, lit, q)
}

stepEvaluator, err := ng.evaluator.Evaluator(ctx, expr, q)
if err != nil {
Expand All @@ -225,7 +221,9 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec, nil
}

for next {

for _, p := range vec {
var (
series *promql.Series
Expand Down Expand Up @@ -257,6 +255,44 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
return result, nil
}

func (ng *engine) evalLiteral(_ context.Context, expr *literalExpr, q *query) (promql.Value, error) {
s := promql.Scalar{
T: q.Start().UnixNano() / int64(time.Millisecond),
V: expr.value,
}

if GetRangeType(q) == InstantType {
return s, nil
}

return PopulateMatrixFromScalar(s, q.LiteralParams), nil

}

func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.Matrix {
var (
start = params.Start()
end = params.End()
step = params.Step()
series = promql.Series{
Points: make(
[]promql.Point,
0,
// allocate enough space for all needed entries
int(params.End().Sub(params.Start())/params.Step())+1,
),
}
)

for ts := start; !ts.After(end); ts = ts.Add(step) {
series.Points = append(series.Points, promql.Point{
T: ts.UnixNano() / int64(time.Millisecond),
V: data.V,
})
}
return promql.Matrix{series}
}

func readStreams(i iter.EntryIterator, size uint32) (Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
Expand Down
Loading

0 comments on commit 414f95f

Please sign in to comment.