From 5939d5e9d8639b1d1898e8fa70ecb473a3ab22f7 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Mon, 18 Apr 2022 09:01:45 -0600 Subject: [PATCH] Query filtering in the ingester and storage (#5629) * Add timestamp to pipeline and extractor interfaces * implement filter extractor and pipeline * implement storage filtering * implement ingester filtering * test cleanup * Refactor pipeline setup to util * linter * enable filtering based on mode * review feedback * review comments * review feedback * no need to include label filters in the test pipeline * review feedpack --- pkg/chunkenc/memchunk.go | 8 +- pkg/chunkenc/memchunk_test.go | 8 +- pkg/chunkenc/unordered.go | 4 +- pkg/ingester/instance.go | 13 + pkg/ingester/instance_test.go | 193 ++++++++---- pkg/ingester/tailer.go | 2 +- pkg/logcli/client/file.go | 5 +- pkg/logql/log/metrics_extraction.go | 88 +++++- pkg/logql/log/metrics_extraction_test.go | 85 +++++- pkg/logql/log/parser_hints_test.go | 2 +- pkg/logql/log/pipeline.go | 112 ++++++- pkg/logql/log/pipeline_test.go | 104 ++++++- pkg/logql/syntax/ast_test.go | 6 +- pkg/logql/syntax/parser_test.go | 2 +- pkg/logql/test_utils.go | 4 +- pkg/loki/modules.go | 7 +- pkg/storage/batch_test.go | 2 +- pkg/storage/store.go | 25 +- pkg/storage/store_test.go | 274 ++++++++++++++++-- .../stores/shipper/compactor/deletion/mode.go | 9 + pkg/storage/util_test.go | 6 +- pkg/util/deletion/deletion.go | 58 ++++ 22 files changed, 858 insertions(+), 159 deletions(-) create mode 100644 pkg/util/deletion/deletion.go diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index a9399dae5269..e29e1f25df72 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1006,7 +1006,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, return } stats.AddHeadChunkBytes(int64(len(e.s))) - newLine, parsedLbs, ok := pipeline.ProcessString(e.s) + newLine, parsedLbs, ok := pipeline.ProcessString(e.t, e.s) if !ok { return } @@ -1056,7 +1056,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra for _, e := range hb.entries { stats.AddHeadChunkBytes(int64(len(e.s))) - value, parsedLabels, ok := extractor.ProcessString(e.s) + value, parsedLabels, ok := extractor.ProcessString(e.t, e.s) if !ok { continue } @@ -1263,7 +1263,7 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - newLine, lbs, ok := e.pipeline.Process(e.currLine) + newLine, lbs, ok := e.pipeline.Process(e.currTs, e.currLine) if !ok { continue } @@ -1294,7 +1294,7 @@ type sampleBufferedIterator struct { func (e *sampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - val, labels, ok := e.extractor.Process(e.currLine) + val, labels, ok := e.extractor.Process(e.currTs, e.currLine) if !ok { continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index f6c51b4dc31c..d94259cc2ea4 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -676,9 +676,11 @@ func BenchmarkWrite(b *testing.B) { type nomatchPipeline struct{} -func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } -func (nomatchPipeline) Process(line []byte) ([]byte, log.LabelsResult, bool) { return line, nil, false } -func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, bool) { +func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } +func (nomatchPipeline) Process(_ int64, line []byte) ([]byte, log.LabelsResult, bool) { + return line, nil, false +} +func (nomatchPipeline) ProcessString(_ int64, line string) (string, log.LabelsResult, bool) { return line, nil, false } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 73cabd12662f..0a520007ffe6 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -227,7 +227,7 @@ func (hb *unorderedHeadBlock) Iterator( mint, maxt, func(ts int64, line string) error { - newLine, parsedLbs, ok := pipeline.ProcessString(line) + newLine, parsedLbs, ok := pipeline.ProcessString(ts, line) if !ok { return nil } @@ -275,7 +275,7 @@ func (hb *unorderedHeadBlock) SampleIterator( mint, maxt, func(ts int64, line string) error { - value, parsedLabels, ok := extractor.ProcessString(line) + value, parsedLabels, ok := extractor.ProcessString(ts, line) if !ok { return nil } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 554a33bcd488..1c43d9f40d7d 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/deletion" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/math" "github.com/grafana/loki/pkg/validation" @@ -317,11 +318,17 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E if err != nil { return nil, err } + pipeline, err := expr.Pipeline() if err != nil { return nil, err } + pipeline, err = deletion.SetupPipeline(req, pipeline) + if err != nil { + return nil, err + } + stats := stats.FromContext(ctx) var iters []iter.EntryIterator @@ -355,11 +362,17 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams if err != nil { return nil, err } + extractor, err := expr.Extractor() if err != nil { return nil, err } + extractor, err = deletion.SetupExtractor(req, extractor) + if err != nil { + return nil, err + } + stats := stats.FromContext(ctx) var iters []iter.SampleIterator diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index e8093103550b..97b79bea25db 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -466,45 +466,16 @@ func Benchmark_OnceSwitch(b *testing.B) { } func Test_Iterator(t *testing.T) { - ingesterConfig := defaultIngesterTestConfig(t) - defaultLimits := defaultLimitsTestConfig() - overrides, err := validation.NewOverrides(defaultLimits, nil) - require.NoError(t, err) - instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil) - ctx := context.TODO() - direction := logproto.BACKWARD - limit := uint32(2) + instance := defaultInstance(t) - // insert data. - for i := 0; i < 10; i++ { - // nolint - stream := "dispatcher" - if i%2 == 0 { - stream = "worker" - } - require.NoError(t, - instance.Push(ctx, &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, - }, - }, - }, - }), - ) - } - - // prepare iterators. - it, err := instance.Query(ctx, + it, err := instance.Query(context.TODO(), logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ Selector: `{job="3"} | logfmt`, - Limit: limit, + Limit: uint32(2), Start: time.Unix(0, 0), End: time.Unix(0, 100000000), - Direction: direction, + Direction: logproto.BACKWARD, }, }, ) @@ -513,14 +484,14 @@ func Test_Iterator(t *testing.T) { // assert the order is preserved. var res *logproto.QueryResponse require.NoError(t, - sendBatches(ctx, it, + sendBatches(context.TODO(), it, fakeQueryServer( func(qr *logproto.QueryResponse) error { res = qr return nil }, ), - limit), + uint32(2)), ) require.Equal(t, 2, len(res.Streams)) // each entry translated into a unique stream @@ -546,24 +517,142 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool { } func Test_ChunkFilter(t *testing.T) { + instance := defaultInstance(t) + instance.chunkFilter = &testFilter{} + + it, err := instance.Query(context.TODO(), + logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: `{job="3"}`, + Limit: uint32(2), + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Direction: logproto.BACKWARD, + }, + }, + ) + require.NoError(t, err) + defer it.Close() + + for it.Next() { + require.NoError(t, it.Error()) + lbs, err := syntax.ParseLabels(it.Labels()) + require.NoError(t, err) + require.NotEqual(t, "dispatcher", lbs.Get("log_stream")) + } +} + +func Test_QueryWithDelete(t *testing.T) { + instance := defaultInstance(t) + + it, err := instance.Query(context.TODO(), + logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: `{job="3"}`, + Limit: uint32(2), + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Direction: logproto.BACKWARD, + Deletes: []*logproto.Delete{ + { + Selector: `{log_stream="worker"}`, + Start: 0, + End: 10, + }, + { + Selector: `{log_stream="dispatcher"}`, + Start: 0, + End: 5, + }, + { + Selector: `{log_stream="dispatcher"} |= "9"`, + Start: 0, + End: 10, + }, + }, + }, + }, + ) + require.NoError(t, err) + defer it.Close() + + var logs []string + for it.Next() { + logs = append(logs, it.Entry().Line) + } + + require.Equal(t, logs, []string{`msg="dispatcher_7"`}) +} + +func Test_QuerySampleWithDelete(t *testing.T) { + instance := defaultInstance(t) + + it, err := instance.QuerySample(context.TODO(), + logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: `count_over_time({job="3"}[5m])`, + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Deletes: []*logproto.Delete{ + { + Selector: `{log_stream="worker"}`, + Start: 0, + End: 10, + }, + { + Selector: `{log_stream="dispatcher"}`, + Start: 0, + End: 5, + }, + { + Selector: `{log_stream="dispatcher"} |= "9"`, + Start: 0, + End: 10, + }, + }, + }, + }, + ) + require.NoError(t, err) + defer it.Close() + + var samples []float64 + for it.Next() { + samples = append(samples, it.Sample().Value) + } + + require.Equal(t, samples, []float64{1.}) +} + +func defaultInstance(t *testing.T) *instance { ingesterConfig := defaultIngesterTestConfig(t) defaultLimits := defaultLimitsTestConfig() overrides, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) instance := newInstance( - &ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{}) - ctx := context.TODO() - direction := logproto.BACKWARD - limit := uint32(2) + &ingesterConfig, + "fake", + NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), + loki_runtime.DefaultTenantConfigs(), + noopWAL{}, + NilMetrics, + nil, + nil, + ) + insertData(t, instance) + + return instance +} - // insert data. +func insertData(t *testing.T, instance *instance) { for i := 0; i < 10; i++ { + // nolint stream := "dispatcher" if i%2 == 0 { stream = "worker" } require.NoError(t, - instance.Push(ctx, &logproto.PushRequest{ + instance.Push(context.TODO(), &logproto.PushRequest{ Streams: []logproto.Stream{ { Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), @@ -575,28 +664,6 @@ func Test_ChunkFilter(t *testing.T) { }), ) } - - // prepare iterators. - it, err := instance.Query(ctx, - logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Selector: `{job="3"}`, - Limit: limit, - Start: time.Unix(0, 0), - End: time.Unix(0, 100000000), - Direction: direction, - }, - }, - ) - require.NoError(t, err) - defer it.Close() - - for it.Next() { - require.NoError(t, it.Error()) - lbs, err := syntax.ParseLabels(it.Labels()) - require.NoError(t, err) - require.NotEqual(t, "dispatcher", lbs.Get("log_stream")) - } } type fakeQueryServer func(*logproto.QueryResponse) error diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 3a8fde55aac5..4d034d7acb37 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -147,7 +147,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := sp.ProcessString(e.Line) + newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok { continue } diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index f8493d78dbaf..94621f6aefba 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -233,7 +233,8 @@ func newFileIterator( streams := map[uint64]*logproto.Stream{} processLine := func(line string) { - parsedLine, parsedLabels, ok := pipeline.ProcessString(line) + ts := time.Now() + parsedLine, parsedLabels, ok := pipeline.ProcessString(ts.UnixNano(), line) if !ok { return } @@ -248,7 +249,7 @@ func newFileIterator( } stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: time.Now(), + Timestamp: ts, Line: parsedLine, }) } diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 9faed48034d3..f1f61ca72bbc 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -34,8 +34,8 @@ type SampleExtractor interface { // A StreamSampleExtractor never mutate the received line. type StreamSampleExtractor interface { BaseLabels() LabelsResult - Process(line []byte) (float64, LabelsResult, bool) - ProcessString(line string) (float64, LabelsResult, bool) + Process(ts int64, line []byte) (float64, LabelsResult, bool) + ProcessString(ts int64, line string) (float64, LabelsResult, bool) } type lineSampleExtractor struct { @@ -80,7 +80,7 @@ type streamLineSampleExtractor struct { builder *LabelsBuilder } -func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) Process(_ int64, line []byte) (float64, LabelsResult, bool) { // short circuit. if l.Stage == NoopStage { return l.LineExtractor(line), l.builder.GroupedLabels(), true @@ -93,9 +93,9 @@ func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, return l.LineExtractor(line), l.builder.GroupedLabels(), true } -func (l *streamLineSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(unsafeGetBytes(line)) + return l.Process(ts, unsafeGetBytes(line)) } func (l *streamLineSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } @@ -168,7 +168,7 @@ func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtra return res } -func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) Process(_ int64, line []byte) (float64, LabelsResult, bool) { // Apply the pipeline first. l.builder.Reset() line, ok := l.preStage.Process(line, l.builder) @@ -194,13 +194,85 @@ func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult return v, l.builder.GroupedLabels(), true } -func (l *streamLabelSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(unsafeGetBytes(line)) + return l.Process(ts, unsafeGetBytes(line)) } func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } +// NewFilteringSampleExtractor creates a sample extractor where entries from +// the underlying log stream are filtered by pipeline filters before being +// passed to extract samples. Filters are always upstream of the extractor. +func NewFilteringSampleExtractor(f []PipelineFilter, e SampleExtractor) SampleExtractor { + return &filteringSampleExtractor{ + filters: f, + extractor: e, + } +} + +type filteringSampleExtractor struct { + filters []PipelineFilter + extractor SampleExtractor +} + +func (p *filteringSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + var streamFilters []streamFilter + for _, f := range p.filters { + if allMatch(f.Matchers, labels) { + streamFilters = append(streamFilters, streamFilter{ + start: f.Start, + end: f.End, + pipeline: f.Pipeline.ForStream(labels), + }) + } + } + + return &filteringStreamExtractor{ + filters: streamFilters, + extractor: p.extractor.ForStream(labels), + } +} + +type filteringStreamExtractor struct { + filters []streamFilter + extractor StreamSampleExtractor +} + +func (sp *filteringStreamExtractor) BaseLabels() LabelsResult { + return sp.extractor.BaseLabels() +} + +func (sp *filteringStreamExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.Process(ts, line) + if skip { //When the filter matches, don't run the next step + return 0, nil, false + } + } + + return sp.extractor.Process(ts, line) +} + +func (sp *filteringStreamExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.ProcessString(ts, line) + if skip { //When the filter matches, don't run the next step + return 0, nil, false + } + } + + return sp.extractor.ProcessString(ts, line) +} + func convertFloat(v string) (float64, error) { return strconv.ParseFloat(v, 64) } diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index de1736d3cb2b..0ef32f087e50 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -114,12 +114,12 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { t.Run(tt.name, func(t *testing.T) { sort.Sort(tt.in) - outval, outlbs, ok := tt.ex.ForStream(tt.in).Process([]byte("")) + outval, outlbs, ok := tt.ex.ForStream(tt.in).Process(0, []byte("")) require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) require.Equal(t, tt.wantLbs, outlbs.Labels()) - outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString("") + outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString(0, "") require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) require.Equal(t, tt.wantLbs, outlbs.Labels()) @@ -130,7 +130,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { func Test_Extract_ExpectedLabels(t *testing.T) { ex := mustSampleExtractor(LabelExtractorWithStages("duration", ConvertDuration, []string{"foo"}, false, false, []Stage{NewJSONParser()}, NoopStage)) - f, lbs, ok := ex.ForStream(labels.Labels{{Name: "bar", Value: "foo"}}).ProcessString(`{"duration":"20ms","foo":"json"}`) + f, lbs, ok := ex.ForStream(labels.Labels{{Name: "bar", Value: "foo"}}).ProcessString(0, `{"duration":"20ms","foo":"json"}`) require.True(t, ok) require.Equal(t, (20 * time.Millisecond).Seconds(), f) require.Equal(t, labels.Labels{{Name: "foo", Value: "json"}}, lbs.Labels()) @@ -146,33 +146,98 @@ func mustSampleExtractor(ex SampleExtractor, err error) SampleExtractor { func TestNewLineSampleExtractor(t *testing.T) { se, err := NewLineSampleExtractor(CountExtractor, nil, nil, false, false) require.NoError(t, err) + lbs := labels.Labels{ {Name: "namespace", Value: "dev"}, {Name: "cluster", Value: "us-central1"}, } sort.Sort(lbs) + sse := se.ForStream(lbs) - f, l, ok := sse.Process([]byte(`foo`)) + f, l, ok := sse.Process(0, []byte(`foo`)) require.True(t, ok) require.Equal(t, 1., f) assertLabelResult(t, lbs, l) - f, l, ok = sse.ProcessString(`foo`) + f, l, ok = sse.ProcessString(0, `foo`) require.True(t, ok) require.Equal(t, 1., f) assertLabelResult(t, lbs, l) - filter, err := NewFilter("foo", labels.MatchEqual) + stage := mustFilter(NewFilter("foo", labels.MatchEqual)).ToStage() + se, err = NewLineSampleExtractor(BytesExtractor, []Stage{stage}, []string{"namespace"}, false, false) require.NoError(t, err) - se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter.ToStage()}, []string{"namespace"}, false, false) - require.NoError(t, err) sse = se.ForStream(lbs) - f, l, ok = sse.Process([]byte(`foo`)) + f, l, ok = sse.Process(0, []byte(`foo`)) require.True(t, ok) require.Equal(t, 3., f) assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "dev"}}, l) + sse = se.ForStream(lbs) - _, _, ok = sse.Process([]byte(`nope`)) + _, _, ok = sse.Process(0, []byte(`nope`)) require.False(t, ok) } + +func TestFilteringSampleExtractor(t *testing.T) { + se := NewFilteringSampleExtractor([]PipelineFilter{ + newPipelineFilter(2, 4, labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, "e"), + newPipelineFilter(3, 5, labels.Labels{{Name: "baz", Value: "foo"}}, "e"), + }, newStubExtractor()) + + tt := []struct { + name string + ts int64 + line string + labels labels.Labels + ok bool + }{ + {"it is after the timerange", 6, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it is before the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match the filter", 3, "all good", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match all the selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}}, true}, + {"it doesn't match any selectors", 3, "line", labels.Labels{{Name: "beep", Value: "boop"}}, true}, + {"it matches all selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, false}, + {"it tries all the filters", 5, "line", labels.Labels{{Name: "baz", Value: "foo"}}, false}, + } + + for _, test := range tt { + t.Run(test.name, func(t *testing.T) { + _, _, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line)) + require.Equal(t, test.ok, ok) + + _, _, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line) + require.Equal(t, test.ok, ok) + }) + } +} + +func newStubExtractor() *stubExtractor { + return &stubExtractor{ + sp: &stubStreamExtractor{}, + } +} + +// A stub always returns the same data +type stubExtractor struct { + sp *stubStreamExtractor +} + +func (p *stubExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + return p.sp +} + +// A stub always returns the same data +type stubStreamExtractor struct{} + +func (p *stubStreamExtractor) BaseLabels() LabelsResult { + return nil +} + +func (p *stubStreamExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { + return 0, nil, true +} + +func (p *stubStreamExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { + return 0, nil, true +} diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index 5c13cb304ced..c3d3d136e1f4 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -221,7 +221,7 @@ func Test_ParserHints(t *testing.T) { ex, err := expr.Extractor() require.NoError(t, err) - v, lbsRes, ok := ex.ForStream(lbs).Process(append([]byte{}, tt.line...)) + v, lbsRes, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...)) var lbsResString string if lbsRes != nil { lbsResString = lbsRes.String() diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 8430bac68a01..a7f8e6453dc4 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -19,8 +19,8 @@ type Pipeline interface { // A StreamPipeline never mutate the received line. type StreamPipeline interface { BaseLabels() LabelsResult - Process(line []byte) (resultLine []byte, resultLabels LabelsResult, skip bool) - ProcessString(line string) (resultLine string, resultLabels LabelsResult, skip bool) + Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, skip bool) + ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, skip bool) } // Stage is a single step of a Pipeline. @@ -52,11 +52,11 @@ type noopStreamPipeline struct { LabelsResult } -func (n noopStreamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { +func (n noopStreamPipeline) Process(_ int64, line []byte) ([]byte, LabelsResult, bool) { return line, n.LabelsResult, true } -func (n noopStreamPipeline) ProcessString(line string) (string, LabelsResult, bool) { +func (n noopStreamPipeline) ProcessString(_ int64, line string) (string, LabelsResult, bool) { return line, n.LabelsResult, true } @@ -135,7 +135,7 @@ func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline { return res } -func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { +func (p *streamPipeline) Process(_ int64, line []byte) ([]byte, LabelsResult, bool) { var ok bool p.builder.Reset() for _, s := range p.stages { @@ -147,10 +147,10 @@ func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { return line, p.builder.LabelsResult(), true } -func (p *streamPipeline) ProcessString(line string) (string, LabelsResult, bool) { +func (p *streamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { // Stages only read from the line. lb := unsafeGetBytes(line) - lb, lr, ok := p.Process(lb) + lb, lr, ok := p.Process(ts, lb) // either the line is unchanged and we can just send back the same string. // or we created a new buffer for it in which case it is still safe to avoid the string(byte) copy. return unsafeGetString(lb), lr, ok @@ -158,6 +158,104 @@ func (p *streamPipeline) ProcessString(line string) (string, LabelsResult, bool) func (p *streamPipeline) BaseLabels() LabelsResult { return p.builder.currentResult } +// PipelineFilter contains a set of matchers and a pipeline that, when matched, +// causes an entry from a log stream to be skipped. Matching entries must also +// fall between 'start' and 'end', inclusive +type PipelineFilter struct { + Start int64 + End int64 + Matchers []*labels.Matcher + Pipeline Pipeline +} + +// NewFilteringPipeline creates a pipeline where entries from the underlying +// log stream are filtered by pipeline filters before being passed to the +// pipeline representing the queried data. Filters are always upstream of the +// pipeline +func NewFilteringPipeline(f []PipelineFilter, p Pipeline) Pipeline { + return &filteringPipeline{ + filters: f, + pipeline: p, + } +} + +type filteringPipeline struct { + filters []PipelineFilter + pipeline Pipeline +} + +func (p *filteringPipeline) ForStream(labels labels.Labels) StreamPipeline { + var streamFilters []streamFilter + for _, f := range p.filters { + if allMatch(f.Matchers, labels) { + streamFilters = append(streamFilters, streamFilter{ + start: f.Start, + end: f.End, + pipeline: f.Pipeline.ForStream(labels), + }) + } + } + + return &filteringStreamPipeline{ + filters: streamFilters, + pipeline: p.pipeline.ForStream(labels), + } +} + +func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { + for _, m := range matchers { + if !m.Matches(labels.Get(m.Name)) { + return false + } + } + return true +} + +type streamFilter struct { + start int64 + end int64 + pipeline StreamPipeline +} + +type filteringStreamPipeline struct { + filters []streamFilter + pipeline StreamPipeline +} + +func (sp *filteringStreamPipeline) BaseLabels() LabelsResult { + return sp.pipeline.BaseLabels() +} + +func (sp *filteringStreamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.Process(ts, line) + if skip { // When the filter matches, don't run the next step + return nil, nil, false + } + } + + return sp.pipeline.Process(ts, line) +} + +func (sp *filteringStreamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.ProcessString(ts, line) + if skip { // When the filter matches, don't run the next step + return "", nil, false + } + } + + return sp.pipeline.ProcessString(ts, line) +} + // ReduceStages reduces multiple stages into one. func ReduceStages(stages []Stage) Stage { if len(stages) == 0 { diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 2f31ab21035b..912e03ad5081 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -12,12 +12,12 @@ import ( func TestNoopPipeline(t *testing.T) { lbs := labels.Labels{{Name: "foo", Value: "bar"}} - l, lbr, ok := NewNoopPipeline().ForStream(lbs).Process([]byte("")) + l, lbr, ok := NewNoopPipeline().ForStream(lbs).Process(0, []byte("")) require.Equal(t, []byte(""), l) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) - ls, lbr, ok := NewNoopPipeline().ForStream(lbs).ProcessString("") + ls, lbr, ok := NewNoopPipeline().ForStream(lbs).ProcessString(0, "") require.Equal(t, "", ls) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) @@ -29,27 +29,103 @@ func TestPipeline(t *testing.T) { NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), newMustLineFormatter("lbs {{.foo}}"), }) - l, lbr, ok := p.ForStream(lbs).Process([]byte("line")) + l, lbr, ok := p.ForStream(lbs).Process(0, []byte("line")) require.Equal(t, []byte("lbs bar"), l) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) - ls, lbr, ok := p.ForStream(lbs).ProcessString("line") + ls, lbr, ok := p.ForStream(lbs).ProcessString(0, "line") require.Equal(t, "lbs bar", ls) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) - l, lbr, ok = p.ForStream(labels.Labels{}).Process([]byte("line")) + l, lbr, ok = p.ForStream(labels.Labels{}).Process(0, []byte("line")) require.Equal(t, []byte(nil), l) require.Equal(t, nil, lbr) require.Equal(t, false, ok) - ls, lbr, ok = p.ForStream(labels.Labels{}).ProcessString("line") + ls, lbr, ok = p.ForStream(labels.Labels{}).ProcessString(0, "line") require.Equal(t, "", ls) require.Equal(t, nil, lbr) require.Equal(t, false, ok) } +func TestFilteringPipeline(t *testing.T) { + p := NewFilteringPipeline([]PipelineFilter{ + newPipelineFilter(2, 4, labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, "e"), + newPipelineFilter(3, 5, labels.Labels{{Name: "baz", Value: "foo"}}, "e"), + }, newStubPipeline()) + + tt := []struct { + name string + ts int64 + line string + inputStreamLabels labels.Labels + ok bool + }{ + {"it is before the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it is after the timerange", 6, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match the filter", 3, "all good", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match all the selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}}, true}, + {"it doesn't match any selectors", 3, "line", labels.Labels{{Name: "beep", Value: "boop"}}, true}, + {"it matches all selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, false}, + {"it tries all the filters", 5, "line", labels.Labels{{Name: "baz", Value: "foo"}}, false}, + } + + for _, test := range tt { + t.Run(test.name, func(t *testing.T) { + _, _, ok := p.ForStream(test.inputStreamLabels).Process(test.ts, []byte(test.line)) + require.Equal(t, test.ok, ok) + + _, _, ok = p.ForStream(test.inputStreamLabels).ProcessString(test.ts, test.line) + require.Equal(t, test.ok, ok) + }) + } +} + +//nolint:unparam +func newPipelineFilter(start, end int64, lbls labels.Labels, filter string) PipelineFilter { + var stages []Stage + var matchers []*labels.Matcher + for _, l := range lbls { + m := labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value) + matchers = append(matchers, m) + } + stages = append(stages, mustFilter(NewFilter(filter, labels.MatchEqual)).ToStage()) + + return PipelineFilter{start, end, matchers, NewPipeline(stages)} +} + +func newStubPipeline() *stubPipeline { + return &stubPipeline{ + sp: &stubStreamPipeline{}, + } +} + +// A stub always returns the same data +type stubPipeline struct { + sp *stubStreamPipeline +} + +func (p *stubPipeline) ForStream(labels labels.Labels) StreamPipeline { + return p.sp +} + +// A stub always returns the same data +type stubStreamPipeline struct{} + +func (p *stubStreamPipeline) BaseLabels() LabelsResult { + return nil +} + +func (p *stubStreamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { + return nil, nil, true +} + +func (p *stubStreamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { + return "", nil, true +} + var ( resOK bool resLine []byte @@ -92,13 +168,13 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("pipeline bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = sp.Process(line) + resLine, resLbs, resOK = sp.Process(0, line) } }) b.Run("pipeline string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resLineString, resLbs, resOK = sp.ProcessString(lineString) + resLineString, resLbs, resOK = sp.ProcessString(0, lineString) } }) @@ -108,13 +184,13 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("line extractor bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.Process(line) + resSample, resLbs, resOK = ex.Process(0, line) } }) b.Run("line extractor string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.ProcessString(lineString) + resSample, resLbs, resOK = ex.ProcessString(0, lineString) } }) @@ -125,13 +201,13 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("label extractor bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.Process(line) + resSample, resLbs, resOK = ex.Process(0, line) } }) b.Run("label extractor string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.ProcessString(lineString) + resSample, resLbs, resOK = ex.ProcessString(0, lineString) } }) } @@ -164,7 +240,7 @@ func jsonBenchmark(b *testing.B, parser Stage) { b.ResetTimer() sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = sp.Process(line) + resLine, resLbs, resOK = sp.Process(0, line) if !resOK { b.Fatalf("resulting line not ok: %s\n", line) @@ -187,7 +263,7 @@ func invalidJSONBenchmark(b *testing.B, parser Stage) { b.ResetTimer() sp := p.ForStream(labels.Labels{}) for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = sp.Process(line) + resLine, resLbs, resOK = sp.Process(0, line) if !resOK { b.Fatalf("resulting line not ok: %s\n", line) diff --git a/pkg/logql/syntax/ast_test.go b/pkg/logql/syntax/ast_test.go index de3b252bd45e..9fc9a33c288f 100644 --- a/pkg/logql/syntax/ast_test.go +++ b/pkg/logql/syntax/ast_test.go @@ -193,7 +193,7 @@ func Test_NilFilterDoesntPanic(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) - _, _, ok := p.ForStream(labelBar).Process([]byte("bleepbloop")) + _, _, ok := p.ForStream(labelBar).Process(0, []byte("bleepbloop")) require.True(t, ok) }) @@ -287,7 +287,7 @@ func Test_FilterMatcher(t *testing.T) { } else { sp := p.ForStream(labelBar) for _, lc := range tt.lines { - _, _, ok := sp.Process([]byte(lc.l)) + _, _, ok := sp.Process(0, []byte(lc.l)) assert.Equalf(t, lc.e, ok, "query for line '%s' was %v and not %v", lc.l, ok, lc.e) } } @@ -392,7 +392,7 @@ func BenchmarkContainsFilter(b *testing.B) { sp := p.ForStream(labelBar) for i := 0; i < b.N; i++ { for _, line := range lines { - sp.Process(line) + sp.Process(0, line) } } }) diff --git a/pkg/logql/syntax/parser_test.go b/pkg/logql/syntax/parser_test.go index 8fa89526abbc..8747c0e16557 100644 --- a/pkg/logql/syntax/parser_test.go +++ b/pkg/logql/syntax/parser_test.go @@ -3040,7 +3040,7 @@ func Test_PipelineCombined(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) sp := p.ForStream(labels.Labels{}) - line, lbs, ok := sp.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)) + line, lbs, ok := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)) require.True(t, ok) require.Equal( t, diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 983e989acd0e..e40f12ed0cdc 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -101,7 +101,7 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea for _, stream := range in { for _, e := range stream.Entries { sp := pipeline.ForStream(mustParseLabels(stream.Labels)) - if l, out, ok := sp.Process([]byte(e.Line)); ok { + if l, out, ok := sp.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok { var s *logproto.Stream var found bool s, found = resByStream[out.String()] @@ -129,7 +129,7 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri for _, stream := range in { for _, e := range stream.Entries { exs := ex.ForStream(mustParseLabels(stream.Labels)) - if f, lbs, ok := exs.Process([]byte(e.Line)); ok { + if f, lbs, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok { var s *logproto.Series var found bool s, found = resBySeries[lbs.String()] diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index faab9b3a95cf..cb9a80644ea9 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -875,8 +875,13 @@ func (t *Loki) initUsageReport() (services.Service, error) { } func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) { + filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode) + if err != nil { + return nil, err + } + deleteStore := deletion.NewNoOpDeleteRequestsStore() - if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { + if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled { indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer) if err != nil { return nil, err diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 0506889ab67a..fa68aaee3f1d 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1587,7 +1587,7 @@ func TestBuildHeapIterator(t *testing.T) { t.Errorf("buildHeapIterator error = %v", err) return } - req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil) + req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil) streams, _, err := iter.ReadBatch(it, req.Limit) _ = it.Close() if err != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 417976335403..926a535cabb6 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/deletion" ) var ( @@ -383,6 +384,10 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return nil, err } + if len(lazyChunks) == 0 { + return iter.NoopIterator, nil + } + expr, err := req.LogSelector() if err != nil { return nil, err @@ -393,9 +398,11 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return nil, err } - if len(lazyChunks) == 0 { - return iter.NoopIterator, nil + pipeline, err = deletion.SetupPipeline(req, pipeline) + if err != nil { + return nil, err } + var chunkFilterer chunk.Filterer if s.chunkFilterer != nil { chunkFilterer = s.chunkFilterer.ForRequest(ctx) @@ -410,6 +417,15 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) return nil, err } + lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + if err != nil { + return nil, err + } + + if len(lazyChunks) == 0 { + return iter.NoopIterator, nil + } + expr, err := req.Expr() if err != nil { return nil, err @@ -420,14 +436,11 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) return nil, err } - lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + extractor, err = deletion.SetupExtractor(req, extractor) if err != nil { return nil, err } - if len(lazyChunks) == 0 { - return iter.NoopIterator, nil - } var chunkFilterer chunk.Filterer if s.chunkFilterer != nil { chunkFilterer = s.chunkFilterer.ForRequest(ctx) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 4fe3a0c04145..3e698d54c620 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -104,7 +104,7 @@ func Benchmark_store_SelectSample(b *testing.B) { b.Run(test, func(b *testing.B) { for i := 0; i < b.N; i++ { iter, err := chunkStore.SelectSamples(ctx, logql.SelectSampleParams{ - SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano())), + SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), nil), }) if err != nil { b.Fatal(err) @@ -222,7 +222,7 @@ func Test_store_SelectLogs(t *testing.T) { }{ { "all", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -231,7 +231,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from, Line: "1", }, - { Timestamp: from.Add(time.Millisecond), Line: "2", @@ -244,7 +243,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from.Add(3 * time.Millisecond), Line: "4", }, - { Timestamp: from.Add(4 * time.Millisecond), Line: "5", @@ -262,7 +260,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from, Line: "1", }, - { Timestamp: from.Add(time.Millisecond), Line: "2", @@ -275,7 +272,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from.Add(3 * time.Millisecond), Line: "4", }, - { Timestamp: from.Add(4 * time.Millisecond), Line: "5", @@ -290,7 +286,7 @@ func Test_store_SelectLogs(t *testing.T) { }, { "filter regex", - newQuery("{foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"", from, from.Add(6*time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -314,7 +310,7 @@ func Test_store_SelectLogs(t *testing.T) { }, { "filter matcher", - newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -323,7 +319,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from, Line: "1", }, - { Timestamp: from.Add(time.Millisecond), Line: "2", @@ -336,7 +331,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from.Add(3 * time.Millisecond), Line: "4", }, - { Timestamp: from.Add(4 * time.Millisecond), Line: "5", @@ -351,7 +345,7 @@ func Test_store_SelectLogs(t *testing.T) { }, { "filter time", - newQuery("{foo=~\"ba.*\"}", from, from.Add(time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -373,7 +367,115 @@ func Test_store_SelectLogs(t *testing.T) { }, }, }, + { + "delete covers whole time range", + newQuery( + "{foo=~\"ba.*\"}", + from, + from.Add(6*time.Millisecond), + nil, + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "6"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Stream{ + { + Labels: "{foo=\"bazz\"}", + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }, + }, + }, + { + "delete covers partial time range", + newQuery( + "{foo=~\"ba.*\"}", + from, + from.Add(6*time.Millisecond), + nil, + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "2"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Stream{ + { + Labels: "{foo=\"bar\"}", + Entries: []logproto.Entry{ + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + { + Timestamp: from.Add(5 * time.Millisecond), + Line: "6", + }, + }, + }, + { + Labels: "{foo=\"bazz\"}", + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + { + Timestamp: from.Add(5 * time.Millisecond), + Line: "6", + }, + }, + }, + }, + }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &store{ @@ -409,7 +511,7 @@ func Test_store_SelectSample(t *testing.T) { }{ { "all", - newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond)), + newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -419,7 +521,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("1"), Value: 1., }, - { Timestamp: from.Add(time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("2"), @@ -435,7 +536,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("4"), Value: 1., }, - { Timestamp: from.Add(4 * time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("5"), @@ -456,7 +556,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("1"), Value: 1., }, - { Timestamp: from.Add(time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("2"), @@ -472,7 +571,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("4"), Value: 1., }, - { Timestamp: from.Add(4 * time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("5"), @@ -489,7 +587,7 @@ func Test_store_SelectSample(t *testing.T) { }, { "filter regex", - newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond)), + newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -515,7 +613,7 @@ func Test_store_SelectSample(t *testing.T) { }, { "filter matcher", - newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond)), + newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -525,7 +623,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("1"), Value: 1., }, - { Timestamp: from.Add(time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("2"), @@ -558,7 +655,7 @@ func Test_store_SelectSample(t *testing.T) { }, { "filter time", - newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond)), + newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -582,7 +679,127 @@ func Test_store_SelectSample(t *testing.T) { }, }, }, + { + "delete covers whole time range", + newSampleQuery( + "count_over_time({foo=~\"ba.*\"}[5m])", + from, + from.Add(6*time.Millisecond), + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "6"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Series{ + { + Labels: "{foo=\"bazz\"}", + Samples: []logproto.Sample{ + { + Timestamp: from.UnixNano(), + Hash: xxhash.Sum64String("1"), + Value: 1., + }, + + { + Timestamp: from.Add(time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("2"), + Value: 1., + }, + { + Timestamp: from.Add(2 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("3"), + Value: 1., + }, + { + Timestamp: from.Add(3 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("4"), + Value: 1., + }, + + { + Timestamp: from.Add(4 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("5"), + Value: 1., + }, + }, + }, + }, + }, + { + "delete covers partial time range", + newSampleQuery( + "count_over_time({foo=~\"ba.*\"}[5m])", + from, + from.Add(6*time.Millisecond), + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "2"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Series{ + { + Labels: "{foo=\"bar\"}", + Samples: []logproto.Sample{ + { + Timestamp: from.Add(4 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("5"), + Value: 1., + }, + { + Timestamp: from.Add(5 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("6"), + Value: 1., + }, + }, + }, + { + Labels: "{foo=\"bazz\"}", + Samples: []logproto.Sample{ + { + Timestamp: from.UnixNano(), + Hash: xxhash.Sum64String("1"), + Value: 1., + }, + { + Timestamp: from.Add(2 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("3"), + Value: 1., + }, + { + Timestamp: from.Add(3 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("4"), + Value: 1., + }, + { + Timestamp: from.Add(4 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("5"), + Value: 1., + }, + { + Timestamp: from.Add(5 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("6"), + Value: 1., + }, + }, + }, + }, + }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &store{ @@ -630,7 +847,7 @@ func Test_ChunkFilterer(t *testing.T) { } s.SetChunkFilterer(&fakeChunkFilterer{}) ctx = user.InjectOrgID(context.Background(), "test-user") - it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour))}) + it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), nil)}) if err != nil { t.Errorf("store.SelectSamples() error = %v", err) return @@ -641,7 +858,7 @@ func Test_ChunkFilterer(t *testing.T) { require.NotEqual(t, "bazz", v) } - logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)}) + logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil, nil)}) if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -651,7 +868,7 @@ func Test_ChunkFilterer(t *testing.T) { v := mustParseLabels(it.Labels())["foo"] require.NotEqual(t, "bazz", v) } - ids, err := s.Series(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)}) + ids, err := s.Series(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil, nil)}) require.NoError(t, err) for _, id := range ids { v := id.Labels["foo"] @@ -668,7 +885,7 @@ func Test_store_GetSeries(t *testing.T) { }{ { "all", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, {Labels: mustParseLabels("{foo=\"bazz\"}")}, @@ -677,7 +894,7 @@ func Test_store_GetSeries(t *testing.T) { }, { "all-single-batch", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, {Labels: mustParseLabels("{foo=\"bazz\"}")}, @@ -686,7 +903,7 @@ func Test_store_GetSeries(t *testing.T) { }, { "regexp filter (post chunk fetching)", - newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, }, @@ -694,7 +911,7 @@ func Test_store_GetSeries(t *testing.T) { }, { "filter matcher", - newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, }, @@ -729,7 +946,7 @@ func Test_store_decodeReq_Matchers(t *testing.T) { }{ { "unsharded", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []*labels.Matcher{ labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "logs"), @@ -742,6 +959,7 @@ func Test_store_decodeReq_Matchers(t *testing.T) { []astmapper.ShardAnnotation{ {Shard: 1, Of: 2}, }, + nil, ), []*labels.Matcher{ labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"), diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode.go b/pkg/storage/stores/shipper/compactor/deletion/mode.go index 9a6afe1f8ab3..ca0dd624f8b9 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/mode.go +++ b/pkg/storage/stores/shipper/compactor/deletion/mode.go @@ -48,3 +48,12 @@ func ParseMode(in string) (Mode, error) { } return 0, errUnknownMode } + +func FilteringEnabled(in string) (bool, error) { + deleteMode, err := ParseMode(in) + if err != nil { + return false, err + } + + return deleteMode == FilterOnly || deleteMode == FilterAndDelete, nil +} diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 511726ee6550..f23238f46fc1 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -124,13 +124,14 @@ func newMatchers(matchers string) []*labels.Matcher { return res } -func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnotation) *logproto.QueryRequest { +func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnotation, deletes []*logproto.Delete) *logproto.QueryRequest { req := &logproto.QueryRequest{ Selector: query, Start: start, Limit: 1000, End: end, Direction: logproto.FORWARD, + Deletes: deletes, } for _, shard := range shards { req.Shards = append(req.Shards, shard.String()) @@ -138,11 +139,12 @@ func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnota return req } -func newSampleQuery(query string, start, end time.Time) *logproto.SampleQueryRequest { +func newSampleQuery(query string, start, end time.Time, deletes []*logproto.Delete) *logproto.SampleQueryRequest { req := &logproto.SampleQueryRequest{ Selector: query, Start: start, End: end, + Deletes: deletes, } return req } diff --git a/pkg/util/deletion/deletion.go b/pkg/util/deletion/deletion.go new file mode 100644 index 000000000000..e90b6a4c2f07 --- /dev/null +++ b/pkg/util/deletion/deletion.go @@ -0,0 +1,58 @@ +package deletion + +import ( + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/logql/syntax" +) + +func SetupPipeline(req logql.SelectLogParams, p log.Pipeline) (log.Pipeline, error) { + if len(req.Deletes) == 0 { + return p, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return log.NewFilteringPipeline(filters, p), nil +} + +func SetupExtractor(req logql.SelectSampleParams, se log.SampleExtractor) (log.SampleExtractor, error) { + if len(req.Deletes) == 0 { + return se, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return log.NewFilteringSampleExtractor(filters, se), nil +} + +func deleteFilters(deletes []*logproto.Delete) ([]log.PipelineFilter, error) { + var filters []log.PipelineFilter + for _, d := range deletes { + expr, err := syntax.ParseLogSelector(d.Selector, true) + if err != nil { + return nil, err + } + + pipeline, err := expr.Pipeline() + if err != nil { + return nil, err + } + + filters = append(filters, log.PipelineFilter{ + Start: d.Start, + End: d.End, + Matchers: expr.Matchers(), + Pipeline: pipeline, + }) + } + + return filters, nil +}