Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logqlv2 pushes groups down to edge #2786

Merged
merged 28 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fb789b8
Do not mark the position if the file is removed, doing so only result…
slim-bean Sep 29, 2020
e5b63c1
updates alerting docs with new cortex tool loki linting support (#2697)
owen-d Sep 29, 2020
129c1ea
[Promtail] enables configuring psp in helm chart (#2659)
Sep 29, 2020
746d16c
fluent-bit plugin support tls (#2568)
zjj2wry Sep 29, 2020
61c4b08
Improve lz4 compression (#2614)
cyriltovena Sep 30, 2020
35f7a3d
shows cortextool lint command for loki in alerting docs (#2705)
owen-d Sep 30, 2020
3dced22
docs: use repetitive numbering (#2699)
sandangel Sep 30, 2020
63a91aa
Bypass sharding middleware when a query can't be sharded. (#2709)
cyriltovena Oct 1, 2020
f6eb8d6
Fix timestamp parser for short year format (#2708)
Falco20019 Oct 1, 2020
6000f3c
Fixes the frontend logs to include org_id.
cyriltovena Oct 6, 2020
2f217c2
Update docs for redis (#2728)
vishesh92 Oct 7, 2020
f1cfee8
hack: clean getStore (#2726)
dvrkps Oct 7, 2020
fed4834
Logstash cpu usage fix (#2607)
adityacs Oct 12, 2020
4f8f4ce
Doc: fix broken links in production/README.md (#2702)
huikang Oct 12, 2020
37b3e40
add "verify-config" flag that load and verify configuration and exits…
dlemel8 Oct 12, 2020
db5e8d6
removes r/w pools from block/chunk types (#2711)
owen-d Oct 12, 2020
dbb0409
better tenant logging in ruler memstore (#2741)
owen-d Oct 12, 2020
e222d36
fixes path in prom rules api docs (#2750)
owen-d Oct 12, 2020
7202714
fix: Remove depricated `entry_parser` from scrapeconfig (#2752)
kavirajk Oct 14, 2020
06a60bd
Revendor Cortex (#2755)
owen-d Oct 14, 2020
0d2e2cc
fix: typo in upgrade.md (#2762)
PabloCastellano Oct 15, 2020
30abeaa
Fixes frontend handler.
cyriltovena Oct 15, 2020
6801c6e
Fixes old test.
cyriltovena Oct 15, 2020
14c4e9e
Fix go1.15 local failing test.
cyriltovena Oct 15, 2020
c772dfd
Pushes down grouping into range aggregation to reduce labels at edges.
cyriltovena Oct 19, 2020
a1d612e
Merge remote-tracking branch 'upstream/master' into logqlv2-pushes-gr…
cyriltovena Oct 21, 2020
2de051f
Reset wrong merge.
cyriltovena Oct 21, 2020
638b139
Fixes tests.
cyriltovena Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"hash"
"hash/crc32"
"io"
"sort"
"time"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -646,6 +647,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
sort.Sort(s)
seriesRes = append(seriesRes, *s)
}
return iter.NewMultiSeriesIterator(ctx, seriesRes)
Expand Down
5 changes: 3 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/stats"
)

Expand Down Expand Up @@ -128,7 +129,7 @@ func TestBlock(t *testing.T) {
require.NoError(t, it.Close())
require.Equal(t, len(cases), idx)

sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, logql.ExtractCount)
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false))
idx = 0
for sampleIt.Next() {
s := sampleIt.Sample()
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestSerialization(t *testing.T) {
}
require.NoError(t, it.Error())

sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, logql.ExtractCount)
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false))
for i := 0; i < numSamples; i++ {
require.True(t, sampleIt.Next(), i)

Expand Down
4 changes: 4 additions & 0 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ type Streams []Stream
func (xs Streams) Len() int { return len(xs) }
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }

func (s Series) Len() int { return len(s.Samples) }
func (s Series) Swap(i, j int) { s.Samples[i], s.Samples[j] = s.Samples[j], s.Samples[i] }
func (s Series) Less(i, j int) bool { return s.Samples[i].Timestamp < s.Samples[j].Timestamp }
6 changes: 5 additions & 1 deletion pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type SampleExtractor = log.SampleExtractor

var (
NoopPipeline = log.NoopPipeline
ExtractCount = log.CountExtractor.ToSampleExtractor()
)

// PipelineExpr is an expression defining a log pipeline.
Expand Down Expand Up @@ -716,6 +715,11 @@ func (e *vectorAggregationExpr) Selector() LogSelectorExpr {
}

func (e *vectorAggregationExpr) Extractor() (log.SampleExtractor, error) {
// inject in the range vector extractor the outer groups to improve performance.
// This is only possible if the operation is a sum. Anything else needs all labels.
if r, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum {
return r.extractor(e.grouping, true)
}
return e.left.Extractor()
}

Expand Down
31 changes: 15 additions & 16 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math"
"strings"

// "math"
"testing"
"time"

Expand Down Expand Up @@ -163,7 +162,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{newSeries(testSize, factor(5, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.4}, Metric: labels.Labels{}},
Expand All @@ -175,7 +174,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(10, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 6}, Metric: labels.Labels{labels.Label{Name: "app", Value: "bar"}}},
Expand All @@ -191,7 +190,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (namespace,app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m])) `}},
},
promql.Vector{
promql.Sample{
Expand Down Expand Up @@ -432,8 +431,8 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="bar"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without (app) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without (app) (count_over_time({app="bar"}[1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0}, Metric: labels.Labels{}},
Expand All @@ -449,8 +448,8 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{newSeries(testSize, identity, `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="bar"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without(app) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without(app) (count_over_time({app="bar"}[1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 60}, Metric: labels.Labels{}},
Expand Down Expand Up @@ -688,7 +687,7 @@ func TestEngine_RangeQuery(t *testing.T) {
{newSeries(testSize, factor(5, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand All @@ -703,7 +702,7 @@ func TestEngine_RangeQuery(t *testing.T) {
{newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand All @@ -727,7 +726,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (namespace,cluster, app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand Down Expand Up @@ -759,7 +758,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (cluster, namespace, app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand Down Expand Up @@ -791,7 +790,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (namespace, app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand Down Expand Up @@ -1224,7 +1223,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand Down Expand Up @@ -1252,7 +1251,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand Down Expand Up @@ -1280,7 +1279,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
Expand Down
41 changes: 37 additions & 4 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,23 @@ func GetRangeType(q Params) QueryRangeType {

// Evaluator is an interface for iterating over data at different nodes in the AST
type Evaluator interface {
SampleEvaluator
EntryEvaluator
}

type SampleEvaluator interface {
// StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible
// StepEvaluator implementations which can be composed.
StepEvaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error)
StepEvaluator(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error)
}

type SampleEvaluatorFunc func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error)

func (s SampleEvaluatorFunc) StepEvaluator(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
return s(ctx, nextEvaluator, expr, p)
}

type EntryEvaluator interface {
// Iterator returns the iter.EntryIterator for a given LogSelectorExpr
Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error)
}
Expand Down Expand Up @@ -151,12 +165,31 @@ func (ev *DefaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr,

func (ev *DefaultEvaluator) StepEvaluator(
ctx context.Context,
nextEv Evaluator,
nextEv SampleEvaluator,
expr SampleExpr,
q Params,
) (StepEvaluator, error) {
switch e := expr.(type) {
case *vectorAggregationExpr:
if rangExpr, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum {
// if range expression is wrapped with a vector expression
// we should send the vector expression for allowing reducing labels at the source.
nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.left.interval),
End: q.End(),
Selector: e.String(), // intentionally send the the vector for reducing labels.
Shards: q.Shards(),
},
})
if err != nil {
return nil, err
}
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, q)
})

}
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
Expand All @@ -180,7 +213,7 @@ func (ev *DefaultEvaluator) StepEvaluator(

func vectorAggEvaluator(
ctx context.Context,
ev Evaluator,
ev SampleEvaluator,
expr *vectorAggregationExpr,
q Params,
) (StepEvaluator, error) {
Expand Down Expand Up @@ -458,7 +491,7 @@ func (r rangeVectorEvaluator) Error() error {
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(
ctx context.Context,
ev Evaluator,
ev SampleEvaluator,
expr *binOpExpr,
q Params,
) (StepEvaluator, error) {
Expand Down
34 changes: 25 additions & 9 deletions pkg/logql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,30 @@ import (
const unsupportedErr = "unsupported range vector aggregation operation: %s"

func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) {
return r.extractor(nil, false)
}

func (r rangeAggregationExpr) extractor(gr *grouping, all bool) (log.SampleExtractor, error) {
if err := r.validate(); err != nil {
return nil, err
}
var groups []string
var without bool

// fallback to parents grouping
if gr != nil {
groups = gr.groups
without = gr.without
}

// range aggregation grouping takes priority
if r.grouping != nil {
groups = r.grouping.groups
without = r.grouping.without
}

sort.Strings(groups)

var stages []log.Stage
if p, ok := r.left.left.(*pipelineExpr); ok {
// if the expression is a pipeline then take all stages into account first.
Expand All @@ -28,30 +49,25 @@ func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) {
// unwrap...means we want to extract metrics from labels.
if r.left.unwrap != nil {
var convOp string
var groups []string
var without bool
switch r.left.unwrap.operation {
case OpConvDuration, OpConvDurationSeconds:
convOp = log.ConvertDuration
default:
convOp = log.ConvertFloat
}
if r.grouping != nil {
groups = r.grouping.groups
without = r.grouping.without
}

return log.LabelExtractorWithStages(
r.left.unwrap.identifier,
convOp, groups, without, stages,
convOp, groups, without, all, stages,
log.ReduceAndLabelFilter(r.left.unwrap.postFilters),
)
}
// otherwise we extract metrics from the log line.
switch r.operation {
case OpRangeTypeRate, OpRangeTypeCount:
return log.LineExtractorWithStages(log.CountExtractor, stages)
return log.LineExtractorWithStages(log.CountExtractor, stages, groups, without, all)
case OpRangeTypeBytes, OpRangeTypeBytesRate:
return log.LineExtractorWithStages(log.BytesExtractor, stages)
return log.LineExtractorWithStages(log.BytesExtractor, stages, groups, without, all)
default:
return nil, fmt.Errorf(unsupportedErr, r.operation)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,13 @@ Outer:

return res
}

func (b *LabelsBuilder) WithoutLabels(names ...string) labels.Labels {
// naive implementation for now.
return b.Labels().WithoutLabels(names...)
}

func (b *LabelsBuilder) WithLabels(names ...string) labels.Labels {
// naive implementation for now.
return b.Labels().WithLabels(names...)
}
Loading