Skip to content

Commit

Permalink
Add absent_over_time (#3053)
Browse files Browse the repository at this point in the history
* Add absent_over_time

This adds absent_over_time which is super useful when alerting, it can be combine with any logql selector including parsers and filter to returns a timeseries with 1  when logs go missing.

Fixes #2893

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Adds comment.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Dec 8, 2020
1 parent 3f0800d commit 6d85c7c
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 164 deletions.
3 changes: 2 additions & 1 deletion docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,9 @@ Supported function for operating over unwrapped ranges are:
- `stdvar_over_time(unwrapped-range)`: the population standard variance of the values in the specified interval.
- `stddev_over_time(unwrapped-range)`: the population standard deviation of the values in the specified interval.
- `quantile_over_time(scalar,unwrapped-range)`: the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
- `absent_over_time(unwrapped-range)`: returns an empty vector if the range vector passed to it has any elements and a 1-element vector with the value 1 if the range vector passed to it has no elements. (`absent_over_time` is useful for alerting on when no time series and logs stream exist for label combination for a certain amount of time.)

Except for `sum_over_time` and `rate` unwrapped range aggregations support grouping.
Except for `sum_over_time`,`absent_over_time` and `rate`, unwrapped range aggregations support grouping.

```logql
<aggr-op>([parameter,] <unwrapped-range>) [without|by (<label list>)]
Expand Down
5 changes: 3 additions & 2 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ const (
OpRangeTypeStdvar = "stdvar_over_time"
OpRangeTypeStddev = "stddev_over_time"
OpRangeTypeQuantile = "quantile_over_time"
OpRangeTypeAbsent = "absent_over_time"

// binops - logical/set
OpTypeOr = "or"
Expand Down Expand Up @@ -611,14 +612,14 @@ func (e rangeAggregationExpr) validate() error {
}
if e.left.unwrap != nil {
switch e.operation {
case OpRangeTypeRate, OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile:
case OpRangeTypeRate, OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeAbsent:
return nil
default:
return fmt.Errorf("invalid aggregation %s with unwrap", e.operation)
}
}
switch e.operation {
case OpRangeTypeBytes, OpRangeTypeBytesRate, OpRangeTypeCount, OpRangeTypeRate:
case OpRangeTypeBytes, OpRangeTypeBytesRate, OpRangeTypeCount, OpRangeTypeRate, OpRangeTypeAbsent:
return nil
default:
return fmt.Errorf("invalid aggregation %s without unwrap", e.operation)
Expand Down
2 changes: 2 additions & 0 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_SampleExpr_String(t *testing.T) {
t.Parallel()
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
Expand Down Expand Up @@ -101,6 +102,7 @@ func Test_SampleExpr_String(t *testing.T) {
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m])
)`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms | unwrap latency | __error__!~".*" | foo >5[5m])`,
`absent_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms | unwrap latency | __error__!~".*" | foo >5[5m])`,
`sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m]
Expand Down
35 changes: 35 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
promql.Vector{promql.Sample{Point: promql.Point{T: 5 * 60 * 1000, V: 30}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`absent_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
{newSeries(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 300 = 30 total
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(5*60, 0), Selector: `absent_over_time({app="foo"}|~".+bar"[5m])`}},
},
promql.Vector{},
},
{
`absent_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{},
[]SelectSampleParams{},
promql.Vector{promql.Sample{Point: promql.Point{T: 5 * 60 * 1000, V: 1}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), logproto.FORWARD, 100,
[][]logproto.Series{
Expand Down Expand Up @@ -914,6 +930,22 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
`absent_over_time(({app="foo"} |~".+bar")[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
{newSeries(1, constant(50), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `absent_over_time({app="foo"}|~".+bar"[1m])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{
{T: 120000, V: 1}, {T: 150000, V: 1}, {T: 180000, V: 1}},
},
},
},
{
`rate(({app=~"foo|bar"} |~".+bar" | unwrap bar)[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
Expand Down Expand Up @@ -1935,6 +1967,9 @@ func (q *querierRecorder) SelectSamples(ctx context.Context, p SelectSampleParam
}
}
recordID := paramsID(p)
if len(q.series) == 0 {
return iter.NoopIterator, nil
}
series, ok := q.series[recordID]
if !ok {
return nil, fmt.Errorf("no series found for id: %s has: %+v", recordID, q.series)
Expand Down
92 changes: 85 additions & 7 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,21 @@ func rangeAggEvaluator(
if err != nil {
return nil, err
}
iter := newRangeVectorIterator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
)
if expr.operation == OpRangeTypeAbsent {
return &absentRangeVectorEvaluator{
iter: iter,
lbs: absentLabels(expr),
}, nil
}
return &rangeVectorEvaluator{
iter: newRangeVectorIterator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
),
agg: agg,
iter: iter,
agg: agg,
}, nil
}

Expand Down Expand Up @@ -462,6 +469,50 @@ func (r rangeVectorEvaluator) Error() error {
return r.iter.Error()
}

type absentRangeVectorEvaluator struct {
iter RangeVectorIterator
lbs labels.Labels

err error
}

func (r *absentRangeVectorEvaluator) Next() (bool, int64, promql.Vector) {
next := r.iter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := r.iter.At(one)
for _, s := range vec {
// Errors are not allowed in metrics.
if s.Metric.Has(log.ErrorLabel) {
r.err = newPipelineErr(s.Metric)
return false, 0, promql.Vector{}
}
}
if len(vec) > 0 {
return next, ts, promql.Vector{}
}
// values are missing.
return next, ts, promql.Vector{
promql.Sample{
Point: promql.Point{
T: ts,
V: 1.,
},
Metric: r.lbs,
},
}
}

func (r absentRangeVectorEvaluator) Close() error { return r.iter.Close() }

func (r absentRangeVectorEvaluator) Error() error {
if r.err != nil {
return r.err
}
return r.iter.Error()
}

// binOpExpr explicitly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(
Expand Down Expand Up @@ -948,3 +999,30 @@ func labelReplaceEvaluator(
return next, ts, vec
}, nextEvaluator.Close, nextEvaluator.Error)
}

// This is to replace missing timeseries during absent_over_time aggregation.
func absentLabels(expr SampleExpr) labels.Labels {
m := labels.Labels{}

lm := expr.Selector().Matchers()
if len(lm) == 0 {
return m
}

empty := []string{}
for _, ma := range lm {
if ma.Name == labels.MetricName {
continue
}
if ma.Type == labels.MatchEqual && !m.Has(ma.Name) {
m = labels.NewBuilder(m).Set(ma.Name, ma.Value).Labels()
} else {
empty = append(empty, ma.Name)
}
}

for _, v := range empty {
m = labels.NewBuilder(m).Del(v).Labels()
}
return m
}
3 changes: 2 additions & 1 deletion pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
LABEL_REPLACE
ABSENT_OVER_TIME LABEL_REPLACE

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -340,6 +340,7 @@ rangeOp:
| STDVAR_OVER_TIME { $$ = OpRangeTypeStdvar }
| STDDEV_OVER_TIME { $$ = OpRangeTypeStddev }
| QUANTILE_OVER_TIME { $$ = OpRangeTypeQuantile }
| ABSENT_OVER_TIME { $$ = OpRangeTypeAbsent }
;


Expand Down
Loading

0 comments on commit 6d85c7c

Please sign in to comment.