From 99114e9c1c1e7dac502787594a7f7d5ecb4489ff Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Mon, 14 Mar 2022 09:09:29 -0600 Subject: [PATCH 01/18] Add timestamp to pipeline and extractor interfaces --- pkg/chunkenc/memchunk.go | 8 +++---- pkg/chunkenc/memchunk_test.go | 8 ++++--- pkg/chunkenc/unordered.go | 4 ++-- pkg/ingester/tailer.go | 2 +- pkg/logcli/client/file.go | 5 +++-- pkg/logql/log/metrics_extraction.go | 16 +++++++------- pkg/logql/log/metrics_extraction_test.go | 14 ++++++------ pkg/logql/log/parser_hints_test.go | 2 +- pkg/logql/log/pipeline.go | 14 ++++++------ pkg/logql/log/pipeline_test.go | 28 ++++++++++++------------ pkg/logql/syntax/ast_test.go | 6 ++--- pkg/logql/syntax/parser_test.go | 2 +- pkg/logql/test_utils.go | 4 ++-- 13 files changed, 58 insertions(+), 55 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 548840032703..3c35a634e522 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1006,7 +1006,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, return } stats.AddHeadChunkBytes(int64(len(e.s))) - newLine, parsedLbs, ok := pipeline.ProcessString(e.s) + newLine, parsedLbs, ok := pipeline.ProcessString(e.t, e.s) if !ok { return } @@ -1056,7 +1056,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra for _, e := range hb.entries { stats.AddHeadChunkBytes(int64(len(e.s))) - value, parsedLabels, ok := extractor.ProcessString(e.s) + value, parsedLabels, ok := extractor.ProcessString(e.t, e.s) if !ok { continue } @@ -1263,7 +1263,7 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - newLine, lbs, ok := e.pipeline.Process(e.currLine) + newLine, lbs, ok := e.pipeline.Process(e.currTs, e.currLine) if !ok { continue } @@ -1294,7 +1294,7 @@ type sampleBufferedIterator struct { func (e *sampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - val, labels, ok := e.extractor.Process(e.currLine) + val, labels, ok := e.extractor.Process(e.currTs, e.currLine) if !ok { continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 73d6961f1965..df714a21ab54 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -677,9 +677,11 @@ func BenchmarkWrite(b *testing.B) { type nomatchPipeline struct{} -func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } -func (nomatchPipeline) Process(line []byte) ([]byte, log.LabelsResult, bool) { return line, nil, false } -func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, bool) { +func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } +func (nomatchPipeline) Process(ts int64, line []byte) ([]byte, log.LabelsResult, bool) { + return line, nil, false +} +func (nomatchPipeline) ProcessString(ts int64, line string) (string, log.LabelsResult, bool) { return line, nil, false } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 73cabd12662f..0a520007ffe6 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -227,7 +227,7 @@ func (hb *unorderedHeadBlock) Iterator( mint, maxt, func(ts int64, line string) error { - newLine, parsedLbs, ok := pipeline.ProcessString(line) + newLine, parsedLbs, ok := pipeline.ProcessString(ts, line) if !ok { return nil } @@ -275,7 +275,7 @@ func (hb *unorderedHeadBlock) SampleIterator( mint, maxt, func(ts int64, line string) error { - value, parsedLabels, ok := extractor.ProcessString(line) + value, parsedLabels, ok := extractor.ProcessString(ts, line) if !ok { return nil } diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 3a8fde55aac5..4d034d7acb37 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -147,7 +147,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := sp.ProcessString(e.Line) + newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok { continue } diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index f8493d78dbaf..94621f6aefba 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -233,7 +233,8 @@ func newFileIterator( streams := map[uint64]*logproto.Stream{} processLine := func(line string) { - parsedLine, parsedLabels, ok := pipeline.ProcessString(line) + ts := time.Now() + parsedLine, parsedLabels, ok := pipeline.ProcessString(ts.UnixNano(), line) if !ok { return } @@ -248,7 +249,7 @@ func newFileIterator( } stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: time.Now(), + Timestamp: ts, Line: parsedLine, }) } diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 9faed48034d3..e0aa913f7925 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -34,8 +34,8 @@ type SampleExtractor interface { // A StreamSampleExtractor never mutate the received line. type StreamSampleExtractor interface { BaseLabels() LabelsResult - Process(line []byte) (float64, LabelsResult, bool) - ProcessString(line string) (float64, LabelsResult, bool) + Process(ts int64, line []byte) (float64, LabelsResult, bool) + ProcessString(ts int64, line string) (float64, LabelsResult, bool) } type lineSampleExtractor struct { @@ -80,7 +80,7 @@ type streamLineSampleExtractor struct { builder *LabelsBuilder } -func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { // short circuit. if l.Stage == NoopStage { return l.LineExtractor(line), l.builder.GroupedLabels(), true @@ -93,9 +93,9 @@ func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, return l.LineExtractor(line), l.builder.GroupedLabels(), true } -func (l *streamLineSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(unsafeGetBytes(line)) + return l.Process(ts, unsafeGetBytes(line)) } func (l *streamLineSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } @@ -168,7 +168,7 @@ func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtra return res } -func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { // Apply the pipeline first. l.builder.Reset() line, ok := l.preStage.Process(line, l.builder) @@ -194,9 +194,9 @@ func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult return v, l.builder.GroupedLabels(), true } -func (l *streamLabelSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(unsafeGetBytes(line)) + return l.Process(ts, unsafeGetBytes(line)) } func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index de1736d3cb2b..5ea0e6f2dc4c 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -114,12 +114,12 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { t.Run(tt.name, func(t *testing.T) { sort.Sort(tt.in) - outval, outlbs, ok := tt.ex.ForStream(tt.in).Process([]byte("")) + outval, outlbs, ok := tt.ex.ForStream(tt.in).Process(0, []byte("")) require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) require.Equal(t, tt.wantLbs, outlbs.Labels()) - outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString("") + outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString(0, "") require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) require.Equal(t, tt.wantLbs, outlbs.Labels()) @@ -130,7 +130,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { func Test_Extract_ExpectedLabels(t *testing.T) { ex := mustSampleExtractor(LabelExtractorWithStages("duration", ConvertDuration, []string{"foo"}, false, false, []Stage{NewJSONParser()}, NoopStage)) - f, lbs, ok := ex.ForStream(labels.Labels{{Name: "bar", Value: "foo"}}).ProcessString(`{"duration":"20ms","foo":"json"}`) + f, lbs, ok := ex.ForStream(labels.Labels{{Name: "bar", Value: "foo"}}).ProcessString(0, `{"duration":"20ms","foo":"json"}`) require.True(t, ok) require.Equal(t, (20 * time.Millisecond).Seconds(), f) require.Equal(t, labels.Labels{{Name: "foo", Value: "json"}}, lbs.Labels()) @@ -152,12 +152,12 @@ func TestNewLineSampleExtractor(t *testing.T) { } sort.Sort(lbs) sse := se.ForStream(lbs) - f, l, ok := sse.Process([]byte(`foo`)) + f, l, ok := sse.Process(0, []byte(`foo`)) require.True(t, ok) require.Equal(t, 1., f) assertLabelResult(t, lbs, l) - f, l, ok = sse.ProcessString(`foo`) + f, l, ok = sse.ProcessString(0, `foo`) require.True(t, ok) require.Equal(t, 1., f) assertLabelResult(t, lbs, l) @@ -168,11 +168,11 @@ func TestNewLineSampleExtractor(t *testing.T) { se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter.ToStage()}, []string{"namespace"}, false, false) require.NoError(t, err) sse = se.ForStream(lbs) - f, l, ok = sse.Process([]byte(`foo`)) + f, l, ok = sse.Process(0, []byte(`foo`)) require.True(t, ok) require.Equal(t, 3., f) assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "dev"}}, l) sse = se.ForStream(lbs) - _, _, ok = sse.Process([]byte(`nope`)) + _, _, ok = sse.Process(0, []byte(`nope`)) require.False(t, ok) } diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index 5c13cb304ced..c3d3d136e1f4 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -221,7 +221,7 @@ func Test_ParserHints(t *testing.T) { ex, err := expr.Extractor() require.NoError(t, err) - v, lbsRes, ok := ex.ForStream(lbs).Process(append([]byte{}, tt.line...)) + v, lbsRes, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...)) var lbsResString string if lbsRes != nil { lbsResString = lbsRes.String() diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 8430bac68a01..f9d24867038e 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -19,8 +19,8 @@ type Pipeline interface { // A StreamPipeline never mutate the received line. type StreamPipeline interface { BaseLabels() LabelsResult - Process(line []byte) (resultLine []byte, resultLabels LabelsResult, skip bool) - ProcessString(line string) (resultLine string, resultLabels LabelsResult, skip bool) + Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, skip bool) + ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, skip bool) } // Stage is a single step of a Pipeline. @@ -52,11 +52,11 @@ type noopStreamPipeline struct { LabelsResult } -func (n noopStreamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { +func (n noopStreamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { return line, n.LabelsResult, true } -func (n noopStreamPipeline) ProcessString(line string) (string, LabelsResult, bool) { +func (n noopStreamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { return line, n.LabelsResult, true } @@ -135,7 +135,7 @@ func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline { return res } -func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { +func (p *streamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { var ok bool p.builder.Reset() for _, s := range p.stages { @@ -147,10 +147,10 @@ func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { return line, p.builder.LabelsResult(), true } -func (p *streamPipeline) ProcessString(line string) (string, LabelsResult, bool) { +func (p *streamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { // Stages only read from the line. lb := unsafeGetBytes(line) - lb, lr, ok := p.Process(lb) + lb, lr, ok := p.Process(ts, lb) // either the line is unchanged and we can just send back the same string. // or we created a new buffer for it in which case it is still safe to avoid the string(byte) copy. return unsafeGetString(lb), lr, ok diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 2f31ab21035b..f1a39d8e25e2 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -12,12 +12,12 @@ import ( func TestNoopPipeline(t *testing.T) { lbs := labels.Labels{{Name: "foo", Value: "bar"}} - l, lbr, ok := NewNoopPipeline().ForStream(lbs).Process([]byte("")) + l, lbr, ok := NewNoopPipeline().ForStream(lbs).Process(0, []byte("")) require.Equal(t, []byte(""), l) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) - ls, lbr, ok := NewNoopPipeline().ForStream(lbs).ProcessString("") + ls, lbr, ok := NewNoopPipeline().ForStream(lbs).ProcessString(0, "") require.Equal(t, "", ls) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) @@ -29,22 +29,22 @@ func TestPipeline(t *testing.T) { NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), newMustLineFormatter("lbs {{.foo}}"), }) - l, lbr, ok := p.ForStream(lbs).Process([]byte("line")) + l, lbr, ok := p.ForStream(lbs).Process(0, []byte("line")) require.Equal(t, []byte("lbs bar"), l) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) - ls, lbr, ok := p.ForStream(lbs).ProcessString("line") + ls, lbr, ok := p.ForStream(lbs).ProcessString(0, "line") require.Equal(t, "lbs bar", ls) require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, ok) - l, lbr, ok = p.ForStream(labels.Labels{}).Process([]byte("line")) + l, lbr, ok = p.ForStream(labels.Labels{}).Process(0, []byte("line")) require.Equal(t, []byte(nil), l) require.Equal(t, nil, lbr) require.Equal(t, false, ok) - ls, lbr, ok = p.ForStream(labels.Labels{}).ProcessString("line") + ls, lbr, ok = p.ForStream(labels.Labels{}).ProcessString(0, "line") require.Equal(t, "", ls) require.Equal(t, nil, lbr) require.Equal(t, false, ok) @@ -92,13 +92,13 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("pipeline bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = sp.Process(line) + resLine, resLbs, resOK = sp.Process(0, line) } }) b.Run("pipeline string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resLineString, resLbs, resOK = sp.ProcessString(lineString) + resLineString, resLbs, resOK = sp.ProcessString(0, lineString) } }) @@ -108,13 +108,13 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("line extractor bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.Process(line) + resSample, resLbs, resOK = ex.Process(0, line) } }) b.Run("line extractor string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.ProcessString(lineString) + resSample, resLbs, resOK = ex.ProcessString(0, lineString) } }) @@ -125,13 +125,13 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("label extractor bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.Process(line) + resSample, resLbs, resOK = ex.Process(0, line) } }) b.Run("label extractor string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - resSample, resLbs, resOK = ex.ProcessString(lineString) + resSample, resLbs, resOK = ex.ProcessString(0, lineString) } }) } @@ -164,7 +164,7 @@ func jsonBenchmark(b *testing.B, parser Stage) { b.ResetTimer() sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = sp.Process(line) + resLine, resLbs, resOK = sp.Process(0, line) if !resOK { b.Fatalf("resulting line not ok: %s\n", line) @@ -187,7 +187,7 @@ func invalidJSONBenchmark(b *testing.B, parser Stage) { b.ResetTimer() sp := p.ForStream(labels.Labels{}) for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = sp.Process(line) + resLine, resLbs, resOK = sp.Process(0, line) if !resOK { b.Fatalf("resulting line not ok: %s\n", line) diff --git a/pkg/logql/syntax/ast_test.go b/pkg/logql/syntax/ast_test.go index de3b252bd45e..9fc9a33c288f 100644 --- a/pkg/logql/syntax/ast_test.go +++ b/pkg/logql/syntax/ast_test.go @@ -193,7 +193,7 @@ func Test_NilFilterDoesntPanic(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) - _, _, ok := p.ForStream(labelBar).Process([]byte("bleepbloop")) + _, _, ok := p.ForStream(labelBar).Process(0, []byte("bleepbloop")) require.True(t, ok) }) @@ -287,7 +287,7 @@ func Test_FilterMatcher(t *testing.T) { } else { sp := p.ForStream(labelBar) for _, lc := range tt.lines { - _, _, ok := sp.Process([]byte(lc.l)) + _, _, ok := sp.Process(0, []byte(lc.l)) assert.Equalf(t, lc.e, ok, "query for line '%s' was %v and not %v", lc.l, ok, lc.e) } } @@ -392,7 +392,7 @@ func BenchmarkContainsFilter(b *testing.B) { sp := p.ForStream(labelBar) for i := 0; i < b.N; i++ { for _, line := range lines { - sp.Process(line) + sp.Process(0, line) } } }) diff --git a/pkg/logql/syntax/parser_test.go b/pkg/logql/syntax/parser_test.go index 8fa89526abbc..8747c0e16557 100644 --- a/pkg/logql/syntax/parser_test.go +++ b/pkg/logql/syntax/parser_test.go @@ -3040,7 +3040,7 @@ func Test_PipelineCombined(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) sp := p.ForStream(labels.Labels{}) - line, lbs, ok := sp.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)) + line, lbs, ok := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)) require.True(t, ok) require.Equal( t, diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 55cef0e55b8d..3bfb8aa2f5eb 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -101,7 +101,7 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea for _, stream := range in { for _, e := range stream.Entries { sp := pipeline.ForStream(mustParseLabels(stream.Labels)) - if l, out, ok := sp.Process([]byte(e.Line)); ok { + if l, out, ok := sp.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok { var s *logproto.Stream var found bool s, found = resByStream[out.String()] @@ -129,7 +129,7 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri for _, stream := range in { for _, e := range stream.Entries { exs := ex.ForStream(mustParseLabels(stream.Labels)) - if f, lbs, ok := exs.Process([]byte(e.Line)); ok { + if f, lbs, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok { var s *logproto.Series var found bool s, found = resBySeries[lbs.String()] From d12063e876be5593809b59bd539aa32d5e710b6f Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Mon, 14 Mar 2022 14:30:20 -0600 Subject: [PATCH 02/18] implement filter extractor and pipeline --- pkg/logql/log/metrics_extraction.go | 72 ++++++++++++++++ pkg/logql/log/metrics_extraction_test.go | 63 +++++++++++++- pkg/logql/log/pipeline.go | 100 +++++++++++++++++++++++ pkg/logql/log/pipeline_test.go | 72 ++++++++++++++++ 4 files changed, 305 insertions(+), 2 deletions(-) diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index e0aa913f7925..2d51a82e3103 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -201,6 +201,78 @@ func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string) (float func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } +// NewFilteringSampleExtractor creates a sample extractor where entries from +// the underlying log stream are filtered by pipeline filters before being +// passed to extract samples. Filters are always upstream of the extractor. +func NewFilteringSampleExtractor(f []PipelineFilter, e SampleExtractor) SampleExtractor { + return &filteringSampleExtractor{ + filters: f, + extractor: e, + } +} + +type filteringSampleExtractor struct { + filters []PipelineFilter + extractor SampleExtractor +} + +func (p *filteringSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + var streamFilters []streamFilter + for _, f := range p.filters { + if allMatch(f.Matchers, labels) { + streamFilters = append(streamFilters, streamFilter{ + start: f.Start, + end: f.End, + pipeline: f.Pipeline.ForStream(labels), + }) + } + } + + return &filteringStreamExtractor{ + filters: streamFilters, + extractor: p.extractor.ForStream(labels), + } +} + +type filteringStreamExtractor struct { + filters []streamFilter + extractor StreamSampleExtractor +} + +func (sp *filteringStreamExtractor) BaseLabels() LabelsResult { + return sp.extractor.BaseLabels() +} + +func (sp *filteringStreamExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.Process(ts, line) + if skip { //When the filter matches, don't run the next step + return 0, nil, false + } + } + + return sp.extractor.Process(ts, line) +} + +func (sp *filteringStreamExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.ProcessString(ts, line) + if skip { //When the filter matches, don't run the next step + return 0, nil, false + } + } + + return sp.extractor.ProcessString(ts, line) +} + func convertFloat(v string) (float64, error) { return strconv.ParseFloat(v, 64) } diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index 5ea0e6f2dc4c..85e4c43a7e4a 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -162,10 +162,10 @@ func TestNewLineSampleExtractor(t *testing.T) { require.Equal(t, 1., f) assertLabelResult(t, lbs, l) - filter, err := NewFilter("foo", labels.MatchEqual) + filter := mustFilter(NewFilter("foo", labels.MatchEqual)).ToStage() require.NoError(t, err) - se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter.ToStage()}, []string{"namespace"}, false, false) + se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter}, []string{"namespace"}, false, false) require.NoError(t, err) sse = se.ForStream(lbs) f, l, ok = sse.Process(0, []byte(`foo`)) @@ -176,3 +176,62 @@ func TestNewLineSampleExtractor(t *testing.T) { _, _, ok = sse.Process(0, []byte(`nope`)) require.False(t, ok) } + +func TestFilteringSampleExtractor(t *testing.T) { + se := NewFilteringSampleExtractor([]PipelineFilter{ + newPipelineFilter(2, 4, labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, "e"), + newPipelineFilter(3, 5, labels.Labels{{Name: "baz", Value: "foo"}}, "e"), + }, newStubExtractor()) + + tt := []struct { + name string + ts int64 + line string + labels labels.Labels + ok bool + }{ + {"it doesn't fall in the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match the filter", 3, "all good", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match all the selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}}, true}, + {"it matches all selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, false}, + {"it tries all the filters", 5, "line", labels.Labels{{Name: "baz", Value: "foo"}}, false}, + } + + for _, test := range tt { + t.Run(test.name, func(t *testing.T) { + _, _, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line)) + require.Equal(t, test.ok, ok) + + _, _, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line) + require.Equal(t, test.ok, ok) + }) + } +} + +func newStubExtractor() *stubExtractor { + return &stubExtractor{ + sp: &stubStreamExtractor{}, + } +} + +type stubExtractor struct { + sp *stubStreamExtractor +} + +func (p *stubExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + return p.sp +} + +type stubStreamExtractor struct{} + +func (p *stubStreamExtractor) BaseLabels() LabelsResult { + return nil +} + +func (p *stubStreamExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { + return 0, nil, true +} + +func (p *stubStreamExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { + return 0, nil, true +} diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index f9d24867038e..cbcd3f782868 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -158,6 +158,106 @@ func (p *streamPipeline) ProcessString(ts int64, line string) (string, LabelsRes func (p *streamPipeline) BaseLabels() LabelsResult { return p.builder.currentResult } +// PipelineFilter contains a set of matchers and a pipeline that, when matched, +// causes an entry from a log stream to be skipped. Matching entries must also +// fall between 'start' and 'end' +type PipelineFilter struct { + Start int64 + End int64 + Matchers []*labels.Matcher + Pipeline Pipeline +} + +// NewFilteringPipeline creates a pipeline where entries from the underlying +// log stream are filtered by pipeline filters before being passed to the +// pipeline representing the queried data. Filters are always upstream of the +// pipeline +func NewFilteringPipeline(f []PipelineFilter, p Pipeline) Pipeline { + return &filteringPipeline{ + filters: f, + pipeline: p, + } +} + +type filteringPipeline struct { + filters []PipelineFilter + pipeline Pipeline +} + +func (p *filteringPipeline) ForStream(labels labels.Labels) StreamPipeline { + var streamFilters []streamFilter + for _, f := range p.filters { + if allMatch(f.Matchers, labels) { + streamFilters = append(streamFilters, streamFilter{ + start: f.Start, + end: f.End, + pipeline: f.Pipeline.ForStream(labels), + }) + } + } + + return &filteringStreamPipeline{ + filters: streamFilters, + pipeline: p.pipeline.ForStream(labels), + } +} + +func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { + for _, m := range matchers { + for _, l := range labels { + if m.Name == l.Name && !m.Matches(l.Value) { + return false + } + } + } + return true +} + +type streamFilter struct { + start int64 + end int64 + pipeline StreamPipeline +} + +type filteringStreamPipeline struct { + filters []streamFilter + pipeline StreamPipeline +} + +func (sp *filteringStreamPipeline) BaseLabels() LabelsResult { + return sp.pipeline.BaseLabels() +} + +func (sp *filteringStreamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.Process(ts, line) + if skip { //When the filter matches, don't run the next step + return nil, nil, false + } + } + + return sp.pipeline.Process(ts, line) +} + +func (sp *filteringStreamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { + for _, filter := range sp.filters { + if ts < filter.start || ts > filter.end { + continue + } + + _, _, skip := filter.pipeline.ProcessString(ts, line) + if skip { //When the filter matches, don't run the next step + return "", nil, false + } + } + + return sp.pipeline.ProcessString(ts, line) +} + // ReduceStages reduces multiple stages into one. func ReduceStages(stages []Stage) Stage { if len(stages) == 0 { diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index f1a39d8e25e2..c6be01d289bd 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -50,6 +50,78 @@ func TestPipeline(t *testing.T) { require.Equal(t, false, ok) } +func TestFilteringPipeline(t *testing.T) { + p := NewFilteringPipeline([]PipelineFilter{ + newPipelineFilter(2, 4, labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, "e"), + newPipelineFilter(3, 5, labels.Labels{{Name: "baz", Value: "foo"}}, "e"), + }, newStubPipeline()) + + tt := []struct { + name string + ts int64 + line string + labels labels.Labels + ok bool + }{ + {"it doesn't fall in the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match the filter", 3, "all good", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it doesn't match all the selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}}, true}, + {"it matches all selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, false}, + {"it tries all the filters", 5, "line", labels.Labels{{Name: "baz", Value: "foo"}}, false}, + } + + for _, test := range tt { + t.Run(test.name, func(t *testing.T) { + _, _, ok := p.ForStream(test.labels).Process(test.ts, []byte(test.line)) + require.Equal(t, test.ok, ok) + + _, _, ok = p.ForStream(test.labels).ProcessString(test.ts, test.line) + require.Equal(t, test.ok, ok) + }) + } +} + +func newPipelineFilter(start, end int64, lbls labels.Labels, filter string) PipelineFilter { + var stages []Stage + var matchers []*labels.Matcher + for _, l := range lbls { + m := labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value) + stages = append(stages, NewStringLabelFilter(m)) + matchers = append(matchers, m) + } + stages = append(stages, mustFilter(NewFilter(filter, labels.MatchEqual)).ToStage()) + + return PipelineFilter{start, end, matchers, NewPipeline(stages)} +} + +func newStubPipeline() *stubPipeline { + return &stubPipeline{ + sp: &stubStreamPipeline{}, + } +} + +type stubPipeline struct { + sp *stubStreamPipeline +} + +func (p *stubPipeline) ForStream(labels labels.Labels) StreamPipeline { + return p.sp +} + +type stubStreamPipeline struct{} + +func (p *stubStreamPipeline) BaseLabels() LabelsResult { + return nil +} + +func (p *stubStreamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { + return nil, nil, true +} + +func (p *stubStreamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { + return "", nil, true +} + var ( resOK bool resLine []byte From fa4056421cba306fda9f18a0cb74f577d68fcb60 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Tue, 15 Mar 2022 08:33:11 -0600 Subject: [PATCH 03/18] implement storage filtering --- pkg/storage/batch_test.go | 2 +- pkg/storage/store.go | 77 ++++++++++- pkg/storage/store_test.go | 274 ++++++++++++++++++++++++++++++++++---- pkg/storage/util_test.go | 6 +- 4 files changed, 322 insertions(+), 37 deletions(-) diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index b1d9efc3eb38..15a4054b239d 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1589,7 +1589,7 @@ func TestBuildHeapIterator(t *testing.T) { t.Errorf("buildHeapIterator error = %v", err) return } - req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil) + req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil) streams, _, err := iter.ReadBatch(it, req.Limit) _ = it.Close() if err != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 27ab077cbf9d..2b10b90712b9 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -7,6 +7,8 @@ import ( "sort" "time" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" @@ -19,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + logql_log "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/chunk" @@ -352,6 +355,10 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return nil, err } + if len(lazyChunks) == 0 { + return iter.NoopIterator, nil + } + expr, err := req.LogSelector() if err != nil { return nil, err @@ -362,9 +369,11 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return nil, err } - if len(lazyChunks) == 0 { - return iter.NoopIterator, nil + pipeline, err = setupPipelineDelete(req, pipeline) + if err != nil { + return nil, err } + var chunkFilterer ChunkFilterer if s.chunkFilterer != nil { chunkFilterer = s.chunkFilterer.ForRequest(ctx) @@ -373,12 +382,58 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return newLogBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, pipeline, req.Direction, req.Start, req.End, chunkFilterer) } +func setupPipelineDelete(req logql.SelectLogParams, p logql_log.Pipeline) (logql_log.Pipeline, error) { + if len(req.Deletes) == 0 { + return p, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return logql_log.NewFilteringPipeline(filters, p), nil +} + +func deleteFilters(deletes []*logproto.Delete) ([]logql_log.PipelineFilter, error) { + var filters []logql_log.PipelineFilter + for _, d := range deletes { + expr, err := syntax.ParseLogSelector(d.Selector, true) + if err != nil { + return nil, err + } + + pipeline, err := expr.Pipeline() + if err != nil { + return nil, err + } + + filters = append(filters, logql_log.PipelineFilter{ + Start: d.Start, + End: d.End, + Matchers: expr.Matchers(), + Pipeline: pipeline, + }) + } + + return filters, nil +} + func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { matchers, from, through, err := decodeReq(req) if err != nil { return nil, err } + lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + if err != nil { + return nil, err + } + + if len(lazyChunks) == 0 { + return iter.NoopIterator, nil + } + expr, err := req.Expr() if err != nil { return nil, err @@ -389,14 +444,11 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) return nil, err } - lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + extractor, err = setupExtractorDelete(req, extractor) if err != nil { return nil, err } - if len(lazyChunks) == 0 { - return iter.NoopIterator, nil - } var chunkFilterer ChunkFilterer if s.chunkFilterer != nil { chunkFilterer = s.chunkFilterer.ForRequest(ctx) @@ -405,6 +457,19 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) return newSampleBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer) } +func setupExtractorDelete(req logql.SelectSampleParams, se logql_log.SampleExtractor) (logql_log.SampleExtractor, error) { + if len(req.Deletes) == 0 { + return se, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return logql_log.NewFilteringSampleExtractor(filters, se), nil +} + func (s *store) GetSchemaConfigs() []chunk.PeriodConfig { return s.schemaCfg.Configs } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index ac879fc0b681..1a8da5a7cdb5 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -106,7 +106,7 @@ func Benchmark_store_SelectSample(b *testing.B) { b.Run(test, func(b *testing.B) { for i := 0; i < b.N; i++ { iter, err := chunkStore.SelectSamples(ctx, logql.SelectSampleParams{ - SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano())), + SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), nil), }) if err != nil { b.Fatal(err) @@ -236,7 +236,7 @@ func Test_store_SelectLogs(t *testing.T) { }{ { "all", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -245,7 +245,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from, Line: "1", }, - { Timestamp: from.Add(time.Millisecond), Line: "2", @@ -258,7 +257,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from.Add(3 * time.Millisecond), Line: "4", }, - { Timestamp: from.Add(4 * time.Millisecond), Line: "5", @@ -276,7 +274,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from, Line: "1", }, - { Timestamp: from.Add(time.Millisecond), Line: "2", @@ -289,7 +286,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from.Add(3 * time.Millisecond), Line: "4", }, - { Timestamp: from.Add(4 * time.Millisecond), Line: "5", @@ -304,7 +300,7 @@ func Test_store_SelectLogs(t *testing.T) { }, { "filter regex", - newQuery("{foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"", from, from.Add(6*time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -328,7 +324,7 @@ func Test_store_SelectLogs(t *testing.T) { }, { "filter matcher", - newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -337,7 +333,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from, Line: "1", }, - { Timestamp: from.Add(time.Millisecond), Line: "2", @@ -350,7 +345,6 @@ func Test_store_SelectLogs(t *testing.T) { Timestamp: from.Add(3 * time.Millisecond), Line: "4", }, - { Timestamp: from.Add(4 * time.Millisecond), Line: "5", @@ -365,7 +359,7 @@ func Test_store_SelectLogs(t *testing.T) { }, { "filter time", - newQuery("{foo=~\"ba.*\"}", from, from.Add(time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(time.Millisecond), nil, nil), []logproto.Stream{ { Labels: "{foo=\"bar\"}", @@ -387,7 +381,115 @@ func Test_store_SelectLogs(t *testing.T) { }, }, }, + { + "delete covers whole time range", + newQuery( + "{foo=~\"ba.*\"}", + from, + from.Add(6*time.Millisecond), + nil, + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "6"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Stream{ + { + Labels: "{foo=\"bazz\"}", + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }, + }, + }, + { + "delete covers partial time range", + newQuery( + "{foo=~\"ba.*\"}", + from, + from.Add(6*time.Millisecond), + nil, + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "2"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Stream{ + { + Labels: "{foo=\"bar\"}", + Entries: []logproto.Entry{ + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + { + Timestamp: from.Add(5 * time.Millisecond), + Line: "6", + }, + }, + }, + { + Labels: "{foo=\"bazz\"}", + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + { + Timestamp: from.Add(5 * time.Millisecond), + Line: "6", + }, + }, + }, + }, + }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &store{ @@ -423,7 +525,7 @@ func Test_store_SelectSample(t *testing.T) { }{ { "all", - newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond)), + newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -433,7 +535,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("1"), Value: 1., }, - { Timestamp: from.Add(time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("2"), @@ -449,7 +550,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("4"), Value: 1., }, - { Timestamp: from.Add(4 * time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("5"), @@ -470,7 +570,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("1"), Value: 1., }, - { Timestamp: from.Add(time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("2"), @@ -486,7 +585,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("4"), Value: 1., }, - { Timestamp: from.Add(4 * time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("5"), @@ -503,7 +601,7 @@ func Test_store_SelectSample(t *testing.T) { }, { "filter regex", - newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond)), + newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -529,7 +627,7 @@ func Test_store_SelectSample(t *testing.T) { }, { "filter matcher", - newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond)), + newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -539,7 +637,6 @@ func Test_store_SelectSample(t *testing.T) { Hash: xxhash.Sum64String("1"), Value: 1., }, - { Timestamp: from.Add(time.Millisecond).UnixNano(), Hash: xxhash.Sum64String("2"), @@ -572,7 +669,7 @@ func Test_store_SelectSample(t *testing.T) { }, { "filter time", - newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond)), + newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond), nil), []logproto.Series{ { Labels: "{foo=\"bar\"}", @@ -596,7 +693,127 @@ func Test_store_SelectSample(t *testing.T) { }, }, }, + { + "delete covers whole time range", + newSampleQuery( + "count_over_time({foo=~\"ba.*\"}[5m])", + from, + from.Add(6*time.Millisecond), + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "6"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(7 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Series{ + { + Labels: "{foo=\"bazz\"}", + Samples: []logproto.Sample{ + { + Timestamp: from.UnixNano(), + Hash: xxhash.Sum64String("1"), + Value: 1., + }, + + { + Timestamp: from.Add(time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("2"), + Value: 1., + }, + { + Timestamp: from.Add(2 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("3"), + Value: 1., + }, + { + Timestamp: from.Add(3 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("4"), + Value: 1., + }, + + { + Timestamp: from.Add(4 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("5"), + Value: 1., + }, + }, + }, + }, + }, + { + "delete covers partial time range", + newSampleQuery( + "count_over_time({foo=~\"ba.*\"}[5m])", + from, + from.Add(6*time.Millisecond), + []*logproto.Delete{ + { + Selector: `{foo="bar"}`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + { + Selector: `{foo="bazz"} |= "2"`, + Start: from.Add(-1 * time.Millisecond).UnixNano(), + End: from.Add(3 * time.Millisecond).UnixNano(), + }, + }), + []logproto.Series{ + { + Labels: "{foo=\"bar\"}", + Samples: []logproto.Sample{ + { + Timestamp: from.Add(4 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("5"), + Value: 1., + }, + { + Timestamp: from.Add(5 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("6"), + Value: 1., + }, + }, + }, + { + Labels: "{foo=\"bazz\"}", + Samples: []logproto.Sample{ + { + Timestamp: from.UnixNano(), + Hash: xxhash.Sum64String("1"), + Value: 1., + }, + { + Timestamp: from.Add(2 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("3"), + Value: 1., + }, + { + Timestamp: from.Add(3 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("4"), + Value: 1., + }, + { + Timestamp: from.Add(4 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("5"), + Value: 1., + }, + { + Timestamp: from.Add(5 * time.Millisecond).UnixNano(), + Hash: xxhash.Sum64String("6"), + Value: 1., + }, + }, + }, + }, + }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &store{ @@ -644,7 +861,7 @@ func Test_ChunkFilterer(t *testing.T) { } s.SetChunkFilterer(&fakeChunkFilterer{}) ctx = user.InjectOrgID(context.Background(), "test-user") - it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour))}) + it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), nil)}) if err != nil { t.Errorf("store.SelectSamples() error = %v", err) return @@ -655,7 +872,7 @@ func Test_ChunkFilterer(t *testing.T) { require.NotEqual(t, "bazz", v) } - logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)}) + logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil, nil)}) if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -665,7 +882,7 @@ func Test_ChunkFilterer(t *testing.T) { v := mustParseLabels(it.Labels())["foo"] require.NotEqual(t, "bazz", v) } - ids, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)}) + ids, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil, nil)}) require.NoError(t, err) for _, id := range ids { v := id.Labels["foo"] @@ -682,7 +899,7 @@ func Test_store_GetSeries(t *testing.T) { }{ { "all", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, {Labels: mustParseLabels("{foo=\"bazz\"}")}, @@ -691,7 +908,7 @@ func Test_store_GetSeries(t *testing.T) { }, { "all-single-batch", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, {Labels: mustParseLabels("{foo=\"bazz\"}")}, @@ -700,7 +917,7 @@ func Test_store_GetSeries(t *testing.T) { }, { "regexp filter (post chunk fetching)", - newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, }, @@ -708,7 +925,7 @@ func Test_store_GetSeries(t *testing.T) { }, { "filter matcher", - newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil), []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, }, @@ -743,7 +960,7 @@ func Test_store_decodeReq_Matchers(t *testing.T) { }{ { "unsharded", - newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil, nil), []*labels.Matcher{ labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"), labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "logs"), @@ -756,6 +973,7 @@ func Test_store_decodeReq_Matchers(t *testing.T) { []astmapper.ShardAnnotation{ {Shard: 1, Of: 2}, }, + nil, ), []*labels.Matcher{ labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"), diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index e23270c46311..c58c05e535ab 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -120,13 +120,14 @@ func newMatchers(matchers string) []*labels.Matcher { return res } -func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnotation) *logproto.QueryRequest { +func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnotation, deletes []*logproto.Delete) *logproto.QueryRequest { req := &logproto.QueryRequest{ Selector: query, Start: start, Limit: 1000, End: end, Direction: logproto.FORWARD, + Deletes: deletes, } for _, shard := range shards { req.Shards = append(req.Shards, shard.String()) @@ -134,11 +135,12 @@ func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnota return req } -func newSampleQuery(query string, start, end time.Time) *logproto.SampleQueryRequest { +func newSampleQuery(query string, start, end time.Time, deletes []*logproto.Delete) *logproto.SampleQueryRequest { req := &logproto.SampleQueryRequest{ Selector: query, Start: start, End: end, + Deletes: deletes, } return req } From 67113d08fe5425eddd8a77a77b194082a16e8718 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Tue, 15 Mar 2022 10:26:21 -0600 Subject: [PATCH 04/18] implement ingester filtering --- pkg/ingester/instance.go | 64 ++++++++++++++ pkg/ingester/instance_test.go | 154 ++++++++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 2d24f4f5e0f4..be14d1f4c29a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -7,6 +7,8 @@ import ( "sync" "syscall" + "github.com/grafana/loki/pkg/logql/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -317,11 +319,17 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E if err != nil { return nil, err } + pipeline, err := expr.Pipeline() if err != nil { return nil, err } + pipeline, err = setupPipelineDelete(req, pipeline) + if err != nil { + return nil, err + } + stats := stats.FromContext(ctx) var iters []iter.EntryIterator @@ -350,16 +358,59 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E return iter.NewSortEntryIterator(iters, req.Direction), nil } +func setupPipelineDelete(req logql.SelectLogParams, p log.Pipeline) (log.Pipeline, error) { + if len(req.Deletes) == 0 { + return p, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return log.NewFilteringPipeline(filters, p), nil +} + +func deleteFilters(deletes []*logproto.Delete) ([]log.PipelineFilter, error) { + var filters []log.PipelineFilter + for _, d := range deletes { + expr, err := syntax.ParseLogSelector(d.Selector, true) + if err != nil { + return nil, err + } + + pipeline, err := expr.Pipeline() + if err != nil { + return nil, err + } + + filters = append(filters, log.PipelineFilter{ + Start: d.Start, + End: d.End, + Matchers: expr.Matchers(), + Pipeline: pipeline, + }) + } + + return filters, nil +} + func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { expr, err := req.Expr() if err != nil { return nil, err } + extractor, err := expr.Extractor() if err != nil { return nil, err } + extractor, err = setupExtractorDelete(req, extractor) + if err != nil { + return nil, err + } + stats := stats.FromContext(ctx) var iters []iter.SampleIterator @@ -395,6 +446,19 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams return iter.NewSortSampleIterator(iters), nil } +func setupExtractorDelete(req logql.SelectSampleParams, se log.SampleExtractor) (log.SampleExtractor, error) { + if len(req.Deletes) == 0 { + return se, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return log.NewFilteringSampleExtractor(filters, se), nil +} + // Label returns the label names or values depending on the given request // Without label matchers the label names and values are retrieved from the index directly. // If label matchers are given only the matching streams are fetched from the index. diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index ec2db47f5627..233aa27b756f 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -599,6 +599,160 @@ func Test_ChunkFilter(t *testing.T) { } } +func Test_QueryWithDelete(t *testing.T) { + ingesterConfig := defaultIngesterTestConfig(t) + defaultLimits := defaultLimitsTestConfig() + overrides, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + instance := newInstance( + &ingesterConfig, + "fake", + NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), + loki_runtime.DefaultTenantConfigs(), + noopWAL{}, + NilMetrics, + nil, + nil, + ) + ctx := context.TODO() + direction := logproto.BACKWARD + limit := uint32(2) + + // insert data. + for i := 0; i < 10; i++ { + stream := "dispatcher" + if i%2 == 0 { + stream = "worker" + } + require.NoError(t, + instance.Push(ctx, &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, + }, + }, + }, + }), + ) + } + + // prepare iterators. + it, err := instance.Query(ctx, + logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: `{job="3"}`, + Limit: limit, + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Direction: direction, + Deletes: []*logproto.Delete{ + { + Selector: `{log_stream="worker"}`, + Start: 0, + End: 10, + }, + { + Selector: `{log_stream="dispatcher"}`, + Start: 0, + End: 5, + }, + { + Selector: `{log_stream="dispatcher"} |= "9"`, + Start: 0, + End: 10, + }, + }, + }, + }, + ) + require.NoError(t, err) + defer it.Close() + + var logs []string + for it.Next() { + logs = append(logs, it.Entry().Line) + } + + require.Equal(t, logs, []string{`msg="dispatcher_7"`}) +} + +func Test_QuerySampleWithDelete(t *testing.T) { + ingesterConfig := defaultIngesterTestConfig(t) + defaultLimits := defaultLimitsTestConfig() + overrides, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + instance := newInstance( + &ingesterConfig, + "fake", + NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), + loki_runtime.DefaultTenantConfigs(), + noopWAL{}, + NilMetrics, + nil, + nil, + ) + ctx := context.TODO() + + // insert data. + for i := 0; i < 10; i++ { + stream := "dispatcher" + if i%2 == 0 { + stream = "worker" + } + require.NoError(t, + instance.Push(ctx, &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, + }, + }, + }, + }), + ) + } + + // prepare iterators. + it, err := instance.QuerySample(ctx, + logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: `count_over_time({job="3"}[5m])`, + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Deletes: []*logproto.Delete{ + { + Selector: `{log_stream="worker"}`, + Start: 0, + End: 10, + }, + { + Selector: `{log_stream="dispatcher"}`, + Start: 0, + End: 5, + }, + { + Selector: `{log_stream="dispatcher"} |= "9"`, + Start: 0, + End: 10, + }, + }, + }, + }, + ) + require.NoError(t, err) + defer it.Close() + + var samples []float64 + for it.Next() { + samples = append(samples, it.Sample().Value) + } + + require.Equal(t, samples, []float64{1.}) +} + type fakeQueryServer func(*logproto.QueryResponse) error func (f fakeQueryServer) Send(res *logproto.QueryResponse) error { From d10c2c536634cd97823f7e4b8c214bee103280e0 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Tue, 15 Mar 2022 10:49:38 -0600 Subject: [PATCH 05/18] test cleanup --- pkg/ingester/instance_test.go | 205 ++++++++++------------------------ 1 file changed, 59 insertions(+), 146 deletions(-) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 233aa27b756f..b53a53ea34fd 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -466,45 +466,16 @@ func Benchmark_OnceSwitch(b *testing.B) { } func Test_Iterator(t *testing.T) { - ingesterConfig := defaultIngesterTestConfig(t) - defaultLimits := defaultLimitsTestConfig() - overrides, err := validation.NewOverrides(defaultLimits, nil) - require.NoError(t, err) - instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil) - ctx := context.TODO() - direction := logproto.BACKWARD - limit := uint32(2) + instance := defaultInstance(t) - // insert data. - for i := 0; i < 10; i++ { - // nolint - stream := "dispatcher" - if i%2 == 0 { - stream = "worker" - } - require.NoError(t, - instance.Push(ctx, &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, - }, - }, - }, - }), - ) - } - - // prepare iterators. - it, err := instance.Query(ctx, + it, err := instance.Query(context.TODO(), logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ Selector: `{job="3"} | logfmt`, - Limit: limit, + Limit: uint32(2), Start: time.Unix(0, 0), End: time.Unix(0, 100000000), - Direction: direction, + Direction: logproto.BACKWARD, }, }, ) @@ -513,14 +484,14 @@ func Test_Iterator(t *testing.T) { // assert the order is preserved. var res *logproto.QueryResponse require.NoError(t, - sendBatches(ctx, it, + sendBatches(context.TODO(), it, fakeQueryServer( func(qr *logproto.QueryResponse) error { res = qr return nil }, ), - limit), + uint32(2)), ) require.Equal(t, 2, len(res.Streams)) // each entry translated into a unique stream @@ -546,45 +517,17 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool { } func Test_ChunkFilter(t *testing.T) { - ingesterConfig := defaultIngesterTestConfig(t) - defaultLimits := defaultLimitsTestConfig() - overrides, err := validation.NewOverrides(defaultLimits, nil) - require.NoError(t, err) - instance := newInstance( - &ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{}) - ctx := context.TODO() - direction := logproto.BACKWARD - limit := uint32(2) - - // insert data. - for i := 0; i < 10; i++ { - stream := "dispatcher" - if i%2 == 0 { - stream = "worker" - } - require.NoError(t, - instance.Push(ctx, &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, - }, - }, - }, - }), - ) - } + instance := defaultInstance(t) + instance.chunkFilter = &testFilter{} - // prepare iterators. - it, err := instance.Query(ctx, + it, err := instance.Query(context.TODO(), logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ Selector: `{job="3"}`, - Limit: limit, + Limit: uint32(2), Start: time.Unix(0, 0), End: time.Unix(0, 100000000), - Direction: direction, + Direction: logproto.BACKWARD, }, }, ) @@ -600,53 +543,16 @@ func Test_ChunkFilter(t *testing.T) { } func Test_QueryWithDelete(t *testing.T) { - ingesterConfig := defaultIngesterTestConfig(t) - defaultLimits := defaultLimitsTestConfig() - overrides, err := validation.NewOverrides(defaultLimits, nil) - require.NoError(t, err) - instance := newInstance( - &ingesterConfig, - "fake", - NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), - loki_runtime.DefaultTenantConfigs(), - noopWAL{}, - NilMetrics, - nil, - nil, - ) - ctx := context.TODO() - direction := logproto.BACKWARD - limit := uint32(2) + instance := defaultInstance(t) - // insert data. - for i := 0; i < 10; i++ { - stream := "dispatcher" - if i%2 == 0 { - stream = "worker" - } - require.NoError(t, - instance.Push(ctx, &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, - }, - }, - }, - }), - ) - } - - // prepare iterators. - it, err := instance.Query(ctx, + it, err := instance.Query(context.TODO(), logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ Selector: `{job="3"}`, - Limit: limit, + Limit: uint32(2), Start: time.Unix(0, 0), End: time.Unix(0, 100000000), - Direction: direction, + Direction: logproto.BACKWARD, Deletes: []*logproto.Delete{ { Selector: `{log_stream="worker"}`, @@ -679,44 +585,9 @@ func Test_QueryWithDelete(t *testing.T) { } func Test_QuerySampleWithDelete(t *testing.T) { - ingesterConfig := defaultIngesterTestConfig(t) - defaultLimits := defaultLimitsTestConfig() - overrides, err := validation.NewOverrides(defaultLimits, nil) - require.NoError(t, err) - instance := newInstance( - &ingesterConfig, - "fake", - NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), - loki_runtime.DefaultTenantConfigs(), - noopWAL{}, - NilMetrics, - nil, - nil, - ) - ctx := context.TODO() + instance := defaultInstance(t) - // insert data. - for i := 0; i < 10; i++ { - stream := "dispatcher" - if i%2 == 0 { - stream = "worker" - } - require.NoError(t, - instance.Push(ctx, &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, - }, - }, - }, - }), - ) - } - - // prepare iterators. - it, err := instance.QuerySample(ctx, + it, err := instance.QuerySample(context.TODO(), logql.SelectSampleParams{ SampleQueryRequest: &logproto.SampleQueryRequest{ Selector: `count_over_time({job="3"}[5m])`, @@ -753,6 +624,48 @@ func Test_QuerySampleWithDelete(t *testing.T) { require.Equal(t, samples, []float64{1.}) } +func defaultInstance(t *testing.T) *instance { + ingesterConfig := defaultIngesterTestConfig(t) + defaultLimits := defaultLimitsTestConfig() + overrides, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + instance := newInstance( + &ingesterConfig, + "fake", + NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), + loki_runtime.DefaultTenantConfigs(), + noopWAL{}, + NilMetrics, + nil, + nil, + ) + insertData(t, instance) + + return instance +} + +func insertData(t *testing.T, instance *instance) { + for i := 0; i < 10; i++ { + // nolint + stream := "dispatcher" + if i%2 == 0 { + stream = "worker" + } + require.NoError(t, + instance.Push(context.TODO(), &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, + }, + }, + }, + }), + ) + } +} + type fakeQueryServer func(*logproto.QueryResponse) error func (f fakeQueryServer) Send(res *logproto.QueryResponse) error { From 560a42e0b9afd442eb98656413c59e755b359e36 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Tue, 15 Mar 2022 10:59:58 -0600 Subject: [PATCH 06/18] Refactor pipeline setup to util --- pkg/ingester/instance.go | 56 ++------------------------------- pkg/storage/store.go | 57 ++-------------------------------- pkg/util/deletion/deletion.go | 58 +++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 107 deletions(-) create mode 100644 pkg/util/deletion/deletion.go diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index be14d1f4c29a..1626716fcf05 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -7,7 +7,7 @@ import ( "sync" "syscall" - "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/deletion" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -325,7 +325,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E return nil, err } - pipeline, err = setupPipelineDelete(req, pipeline) + pipeline, err = deletion.SetupPipeline(req, pipeline) if err != nil { return nil, err } @@ -358,43 +358,6 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E return iter.NewSortEntryIterator(iters, req.Direction), nil } -func setupPipelineDelete(req logql.SelectLogParams, p log.Pipeline) (log.Pipeline, error) { - if len(req.Deletes) == 0 { - return p, nil - } - - filters, err := deleteFilters(req.Deletes) - if err != nil { - return nil, err - } - - return log.NewFilteringPipeline(filters, p), nil -} - -func deleteFilters(deletes []*logproto.Delete) ([]log.PipelineFilter, error) { - var filters []log.PipelineFilter - for _, d := range deletes { - expr, err := syntax.ParseLogSelector(d.Selector, true) - if err != nil { - return nil, err - } - - pipeline, err := expr.Pipeline() - if err != nil { - return nil, err - } - - filters = append(filters, log.PipelineFilter{ - Start: d.Start, - End: d.End, - Matchers: expr.Matchers(), - Pipeline: pipeline, - }) - } - - return filters, nil -} - func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { expr, err := req.Expr() if err != nil { @@ -406,7 +369,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams return nil, err } - extractor, err = setupExtractorDelete(req, extractor) + extractor, err = deletion.SetupExtractor(req, extractor) if err != nil { return nil, err } @@ -446,19 +409,6 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams return iter.NewSortSampleIterator(iters), nil } -func setupExtractorDelete(req logql.SelectSampleParams, se log.SampleExtractor) (log.SampleExtractor, error) { - if len(req.Deletes) == 0 { - return se, nil - } - - filters, err := deleteFilters(req.Deletes) - if err != nil { - return nil, err - } - - return log.NewFilteringSampleExtractor(filters, se), nil -} - // Label returns the label names or values depending on the given request // Without label matchers the label names and values are retrieved from the index directly. // If label matchers are given only the matching streams are fetched from the index. diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 2b10b90712b9..5b95c1031997 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -7,7 +7,7 @@ import ( "sort" "time" - "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/util/deletion" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -21,7 +21,6 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - logql_log "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/chunk" @@ -369,7 +368,7 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return nil, err } - pipeline, err = setupPipelineDelete(req, pipeline) + pipeline, err = deletion.SetupPipeline(req, pipeline) if err != nil { return nil, err } @@ -382,43 +381,6 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return newLogBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, pipeline, req.Direction, req.Start, req.End, chunkFilterer) } -func setupPipelineDelete(req logql.SelectLogParams, p logql_log.Pipeline) (logql_log.Pipeline, error) { - if len(req.Deletes) == 0 { - return p, nil - } - - filters, err := deleteFilters(req.Deletes) - if err != nil { - return nil, err - } - - return logql_log.NewFilteringPipeline(filters, p), nil -} - -func deleteFilters(deletes []*logproto.Delete) ([]logql_log.PipelineFilter, error) { - var filters []logql_log.PipelineFilter - for _, d := range deletes { - expr, err := syntax.ParseLogSelector(d.Selector, true) - if err != nil { - return nil, err - } - - pipeline, err := expr.Pipeline() - if err != nil { - return nil, err - } - - filters = append(filters, logql_log.PipelineFilter{ - Start: d.Start, - End: d.End, - Matchers: expr.Matchers(), - Pipeline: pipeline, - }) - } - - return filters, nil -} - func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { matchers, from, through, err := decodeReq(req) if err != nil { @@ -444,7 +406,7 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) return nil, err } - extractor, err = setupExtractorDelete(req, extractor) + extractor, err = deletion.SetupExtractor(req, extractor) if err != nil { return nil, err } @@ -457,19 +419,6 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) return newSampleBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer) } -func setupExtractorDelete(req logql.SelectSampleParams, se logql_log.SampleExtractor) (logql_log.SampleExtractor, error) { - if len(req.Deletes) == 0 { - return se, nil - } - - filters, err := deleteFilters(req.Deletes) - if err != nil { - return nil, err - } - - return logql_log.NewFilteringSampleExtractor(filters, se), nil -} - func (s *store) GetSchemaConfigs() []chunk.PeriodConfig { return s.schemaCfg.Configs } diff --git a/pkg/util/deletion/deletion.go b/pkg/util/deletion/deletion.go new file mode 100644 index 000000000000..e90b6a4c2f07 --- /dev/null +++ b/pkg/util/deletion/deletion.go @@ -0,0 +1,58 @@ +package deletion + +import ( + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/logql/syntax" +) + +func SetupPipeline(req logql.SelectLogParams, p log.Pipeline) (log.Pipeline, error) { + if len(req.Deletes) == 0 { + return p, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return log.NewFilteringPipeline(filters, p), nil +} + +func SetupExtractor(req logql.SelectSampleParams, se log.SampleExtractor) (log.SampleExtractor, error) { + if len(req.Deletes) == 0 { + return se, nil + } + + filters, err := deleteFilters(req.Deletes) + if err != nil { + return nil, err + } + + return log.NewFilteringSampleExtractor(filters, se), nil +} + +func deleteFilters(deletes []*logproto.Delete) ([]log.PipelineFilter, error) { + var filters []log.PipelineFilter + for _, d := range deletes { + expr, err := syntax.ParseLogSelector(d.Selector, true) + if err != nil { + return nil, err + } + + pipeline, err := expr.Pipeline() + if err != nil { + return nil, err + } + + filters = append(filters, log.PipelineFilter{ + Start: d.Start, + End: d.End, + Matchers: expr.Matchers(), + Pipeline: pipeline, + }) + } + + return filters, nil +} From 69dfb3044b8692a8772b14ee44d5d2690a3be340 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Tue, 15 Mar 2022 14:12:51 -0600 Subject: [PATCH 07/18] linter --- pkg/logql/log/pipeline_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index c6be01d289bd..e136124b4926 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -81,6 +81,7 @@ func TestFilteringPipeline(t *testing.T) { } } +//nolint:unparam func newPipelineFilter(start, end int64, lbls labels.Labels, filter string) PipelineFilter { var stages []Stage var matchers []*labels.Matcher From b66f4af2e6af28c786d334b39a76523e757d59a8 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 6 Apr 2022 09:19:21 -0600 Subject: [PATCH 08/18] enable filtering based on mode --- pkg/loki/modules.go | 7 ++++++- pkg/storage/stores/shipper/compactor/deletion/mode.go | 9 +++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 312bd636c7f5..c1dfe88501a4 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -823,8 +823,13 @@ func (t *Loki) initUsageReport() (services.Service, error) { } func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) { + filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode) + if err != nil { + return nil, err + } + deleteStore := deletion.NewNoOpDeleteRequestsStore() - if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { + if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled { indexClient, err := chunk_storage.NewIndexClient(shipper.BoltDBShipperType, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer) if err != nil { return nil, err diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode.go b/pkg/storage/stores/shipper/compactor/deletion/mode.go index 9a6afe1f8ab3..ca0dd624f8b9 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/mode.go +++ b/pkg/storage/stores/shipper/compactor/deletion/mode.go @@ -48,3 +48,12 @@ func ParseMode(in string) (Mode, error) { } return 0, errUnknownMode } + +func FilteringEnabled(in string) (bool, error) { + deleteMode, err := ParseMode(in) + if err != nil { + return false, err + } + + return deleteMode == FilterOnly || deleteMode == FilterAndDelete, nil +} From e02d552ec4d85cb4178a10447e2a9cadbbf9e21e Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Thu, 7 Apr 2022 13:12:25 -0600 Subject: [PATCH 09/18] review feedback --- pkg/ingester/instance.go | 3 +-- pkg/logql/log/metrics_extraction_test.go | 14 ++++++++++---- pkg/logql/log/pipeline.go | 6 +++--- pkg/logql/log/pipeline_test.go | 20 ++++++++++++-------- pkg/storage/store.go | 3 +-- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 1626716fcf05..eb5206e3beb7 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -7,8 +7,6 @@ import ( "sync" "syscall" - "github.com/grafana/loki/pkg/util/deletion" - "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -31,6 +29,7 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/deletion" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/math" "github.com/grafana/loki/pkg/validation" diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index 85e4c43a7e4a..5d7913a6c6f7 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -146,11 +146,13 @@ func mustSampleExtractor(ex SampleExtractor, err error) SampleExtractor { func TestNewLineSampleExtractor(t *testing.T) { se, err := NewLineSampleExtractor(CountExtractor, nil, nil, false, false) require.NoError(t, err) + lbs := labels.Labels{ {Name: "namespace", Value: "dev"}, {Name: "cluster", Value: "us-central1"}, } sort.Sort(lbs) + sse := se.ForStream(lbs) f, l, ok := sse.Process(0, []byte(`foo`)) require.True(t, ok) @@ -162,16 +164,16 @@ func TestNewLineSampleExtractor(t *testing.T) { require.Equal(t, 1., f) assertLabelResult(t, lbs, l) - filter := mustFilter(NewFilter("foo", labels.MatchEqual)).ToStage() + stage := mustFilter(NewFilter("foo", labels.MatchEqual)).ToStage() + se, err = NewLineSampleExtractor(BytesExtractor, []Stage{stage}, []string{"namespace"}, false, false) require.NoError(t, err) - se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter}, []string{"namespace"}, false, false) - require.NoError(t, err) sse = se.ForStream(lbs) f, l, ok = sse.Process(0, []byte(`foo`)) require.True(t, ok) require.Equal(t, 3., f) assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "dev"}}, l) + sse = se.ForStream(lbs) _, _, ok = sse.Process(0, []byte(`nope`)) require.False(t, ok) @@ -190,9 +192,11 @@ func TestFilteringSampleExtractor(t *testing.T) { labels labels.Labels ok bool }{ - {"it doesn't fall in the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it is before the timerange", 6, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it is before the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, {"it doesn't match the filter", 3, "all good", labels.Labels{{Name: "baz", Value: "foo"}}, true}, {"it doesn't match all the selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}}, true}, + {"it doesn't match any selectors", 3, "line", labels.Labels{{Name: "beep", Value: "boop"}}, true}, {"it matches all selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, false}, {"it tries all the filters", 5, "line", labels.Labels{{Name: "baz", Value: "foo"}}, false}, } @@ -214,6 +218,7 @@ func newStubExtractor() *stubExtractor { } } +// A stub always returns the same data type stubExtractor struct { sp *stubStreamExtractor } @@ -222,6 +227,7 @@ func (p *stubExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { return p.sp } +// A stub always returns the same data type stubStreamExtractor struct{} func (p *stubStreamExtractor) BaseLabels() LabelsResult { diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index cbcd3f782868..9d97b94054d5 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -160,7 +160,7 @@ func (p *streamPipeline) BaseLabels() LabelsResult { return p.builder.currentRes // PipelineFilter contains a set of matchers and a pipeline that, when matched, // causes an entry from a log stream to be skipped. Matching entries must also -// fall between 'start' and 'end' +// fall between 'start' and 'end', inclusive type PipelineFilter struct { Start int64 End int64 @@ -235,7 +235,7 @@ func (sp *filteringStreamPipeline) Process(ts int64, line []byte) ([]byte, Label } _, _, skip := filter.pipeline.Process(ts, line) - if skip { //When the filter matches, don't run the next step + if skip { // When the filter matches, don't run the next step return nil, nil, false } } @@ -250,7 +250,7 @@ func (sp *filteringStreamPipeline) ProcessString(ts int64, line string) (string, } _, _, skip := filter.pipeline.ProcessString(ts, line) - if skip { //When the filter matches, don't run the next step + if skip { // When the filter matches, don't run the next step return "", nil, false } } diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index e136124b4926..b7dbf80e1b0c 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -57,25 +57,27 @@ func TestFilteringPipeline(t *testing.T) { }, newStubPipeline()) tt := []struct { - name string - ts int64 - line string - labels labels.Labels - ok bool + name string + ts int64 + line string + inputStreamLabels labels.Labels + ok bool }{ - {"it doesn't fall in the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it is before the timerange", 1, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, + {"it is after the timerange", 6, "line", labels.Labels{{Name: "baz", Value: "foo"}}, true}, {"it doesn't match the filter", 3, "all good", labels.Labels{{Name: "baz", Value: "foo"}}, true}, {"it doesn't match all the selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}}, true}, + {"it doesn't match any selectors", 3, "line", labels.Labels{{Name: "beep", Value: "boop"}}, true}, {"it matches all selectors", 3, "line", labels.Labels{{Name: "foo", Value: "bar"}, {Name: "bar", Value: "baz"}}, false}, {"it tries all the filters", 5, "line", labels.Labels{{Name: "baz", Value: "foo"}}, false}, } for _, test := range tt { t.Run(test.name, func(t *testing.T) { - _, _, ok := p.ForStream(test.labels).Process(test.ts, []byte(test.line)) + _, _, ok := p.ForStream(test.inputStreamLabels).Process(test.ts, []byte(test.line)) require.Equal(t, test.ok, ok) - _, _, ok = p.ForStream(test.labels).ProcessString(test.ts, test.line) + _, _, ok = p.ForStream(test.inputStreamLabels).ProcessString(test.ts, test.line) require.Equal(t, test.ok, ok) }) } @@ -101,6 +103,7 @@ func newStubPipeline() *stubPipeline { } } +// A stub always returns the same data type stubPipeline struct { sp *stubStreamPipeline } @@ -109,6 +112,7 @@ func (p *stubPipeline) ForStream(labels labels.Labels) StreamPipeline { return p.sp } +// A stub always returns the same data type stubStreamPipeline struct{} func (p *stubStreamPipeline) BaseLabels() LabelsResult { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 5b95c1031997..b7436958019a 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -7,8 +7,6 @@ import ( "sort" "time" - "github.com/grafana/loki/pkg/util/deletion" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" @@ -29,6 +27,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/deletion" ) var ( From cc359ea90706c2c8d171e7deb8b3b020ee79bcf9 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Fri, 8 Apr 2022 10:40:18 -0600 Subject: [PATCH 10/18] review comments --- pkg/chunkenc/memchunk_test.go | 4 ++-- pkg/logql/log/metrics_extraction.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index df714a21ab54..9a539ca0e040 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -678,10 +678,10 @@ func BenchmarkWrite(b *testing.B) { type nomatchPipeline struct{} func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } -func (nomatchPipeline) Process(ts int64, line []byte) ([]byte, log.LabelsResult, bool) { +func (nomatchPipeline) Process(_ int64, line []byte) ([]byte, log.LabelsResult, bool) { return line, nil, false } -func (nomatchPipeline) ProcessString(ts int64, line string) (string, log.LabelsResult, bool) { +func (nomatchPipeline) ProcessString(_ int64, line string) (string, log.LabelsResult, bool) { return line, nil, false } diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 2d51a82e3103..f1f61ca72bbc 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -80,7 +80,7 @@ type streamLineSampleExtractor struct { builder *LabelsBuilder } -func (l *streamLineSampleExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) Process(_ int64, line []byte) (float64, LabelsResult, bool) { // short circuit. if l.Stage == NoopStage { return l.LineExtractor(line), l.builder.GroupedLabels(), true @@ -168,7 +168,7 @@ func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtra return res } -func (l *streamLabelSampleExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) Process(_ int64, line []byte) (float64, LabelsResult, bool) { // Apply the pipeline first. l.builder.Reset() line, ok := l.preStage.Process(line, l.builder) From cca364ef78c81717d756384059de37057203b59f Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Fri, 8 Apr 2022 12:27:17 -0600 Subject: [PATCH 11/18] review feedback --- pkg/logql/log/pipeline.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 9d97b94054d5..c4c185100190 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -204,10 +204,8 @@ func (p *filteringPipeline) ForStream(labels labels.Labels) StreamPipeline { func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { for _, m := range matchers { - for _, l := range labels { - if m.Name == l.Name && !m.Matches(l.Value) { - return false - } + if !m.Matches(labels.Get(m.Name)) { + return false } } return true From 4ce2599a3515b4b840ce8768b487d7ab80502897 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Mon, 11 Apr 2022 07:59:49 -0600 Subject: [PATCH 12/18] no need to include label filters in the test pipeline --- pkg/logql/log/pipeline_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index b7dbf80e1b0c..912e03ad5081 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -89,7 +89,6 @@ func newPipelineFilter(start, end int64, lbls labels.Labels, filter string) Pipe var matchers []*labels.Matcher for _, l := range lbls { m := labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value) - stages = append(stages, NewStringLabelFilter(m)) matchers = append(matchers, m) } stages = append(stages, mustFilter(NewFilter(filter, labels.MatchEqual)).ToStage()) From 1982c6b5b6dc5e652547c91866237918d38c7ca3 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Tue, 12 Apr 2022 10:55:04 +0100 Subject: [PATCH 13/18] Build e2e deletion test --- .../deletion/integration/deletion_test.go | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go new file mode 100644 index 000000000000..39fb2ea3831a --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go @@ -0,0 +1,116 @@ +package integration + +import ( + "flag" + "fmt" + "io/ioutil" + "net" + "os" + "testing" + "time" + + "github.com/grafana/loki/pkg/loki" + "github.com/grafana/loki/pkg/util/cfg" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func getFreePort() (port int, err error) { + var a *net.TCPAddr + if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err == nil { + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil + } + } + return +} + +func newCfg(t *testing.T) string { + httpPort, err := getFreePort() + require.NoError(t, err) + + grpcPort, err := getFreePort() + require.NoError(t, err) + + file, err := ioutil.TempFile("", "loki-config") + require.NoError(t, err) + + _, err = file.Write([]byte(fmt.Sprintf(` +auth_enabled: false + +server: + http_listen_port: %d + grpc_listen_port: %d + +common: + path_prefix: /tmp/loki + storage: + filesystem: + chunks_directory: /tmp/loki/chunks + rules_directory: /tmp/loki/rules + replication_factor: 1 + ring: + instance_addr: 127.0.0.1 + kvstore: + store: inmemory + +schema_config: + configs: + - from: 2020-10-24 + store: boltdb-shipper + object_store: filesystem + schema: v11 + index: + prefix: index_ + period: 24h + +analytics: + reporting_enabled: false + +ruler: + alertmanager_url: http://localhost:9093 +`, httpPort, grpcPort))) + require.NoError(t, err) + + t.Cleanup(func() { + t.Logf("remove config %s", file.Name()) + os.Remove(file.Name()) + }) + + return file.Name() +} + +func TestFilterOnlyMonolithCompactor(t *testing.T) { + + for _, n := range []string{"a", "b"} { + t.Run(n, func(t *testing.T) { + var config loki.ConfigWrapper + + var flagset = flag.NewFlagSet("test-flags", flag.ExitOnError) + myCfg := newCfg(t) + + require.NoError(t, cfg.DynamicUnmarshal(&config, []string{ + "-config.file", myCfg, + }, flagset)) + + require.NoError(t, config.Validate()) + + // hack in a fresh registry + reg := prometheus.NewRegistry() + prometheus.DefaultGatherer = reg + prometheus.DefaultRegisterer = reg + + l, err := loki.New(config.Config) + require.NoError(t, err) + + go func() { + require.NoError(t, l.Run(loki.RunOpts{})) + }() + }) + } + + time.Sleep(30 * time.Second) + +} From 6f4cce74142324946018678fc9636c3a566ce95e Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Tue, 12 Apr 2022 16:38:06 +0100 Subject: [PATCH 14/18] x --- .../deletion/integration/deletion_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go index 39fb2ea3831a..b17585ce2c30 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go @@ -15,6 +15,23 @@ import ( "github.com/stretchr/testify/require" ) +type testCluster struct { + sharedPath string + components []testComponent +} + +type testComponent struct { + loki *loki.Loki + cluster *testCluster + + httpPort int + grpcPort int +} + +func (c *testCluster) addComponent(name string, flags ...string) *testComponent { + return nil +} + func getFreePort() (port int, err error) { var a *net.TCPAddr if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { From 4106855cf2b2c78d6c3beafd25d2a68e74ed6510 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 14 Apr 2022 10:06:04 +0100 Subject: [PATCH 15/18] Sort out signal handlers --- pkg/loki/loki.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 6ef8b8c78229..0abd34032105 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -225,6 +225,7 @@ type Loki struct { ModuleManager *modules.Manager serviceMap map[string]services.Service deps map[string][]string + SignalHandler *signals.Handler Server *server.Server ring *ring.Ring @@ -391,9 +392,9 @@ func (t *Loki) Run(opts RunOpts) error { sm.AddListener(services.NewManagerListener(healthy, stopped, serviceFailed)) // Setup signal handler. If signal arrives, we stop the manager, which stops all the services. - handler := signals.NewHandler(t.Server.Log) + t.SignalHandler = signals.NewHandler(t.Server.Log) go func() { - handler.Loop() + t.SignalHandler.Loop() sm.StopAsync() }() From 88c8bc36b5968e818ae2d131be7bc6cbe03e6b9c Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 14 Apr 2022 10:06:21 +0100 Subject: [PATCH 16/18] Enable filter only on frontend --- pkg/loki/modules.go | 14 ++++++++++---- pkg/storage/stores/shipper/compactor/compactor.go | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5d678d623959..903366d2812d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -750,10 +750,16 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor) // TODO: update this when the other deletion modes are available - if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() == deletion.WholeStreamDeletion { - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) + if t.Cfg.CompactorConfig.RetentionEnabled { + switch t.compactor.DeleteMode() { + case deletion.WholeStreamDeletion, deletion.FilterOnly: + //, deletion.FilterOnly: + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) + default: + break + } } return t.compactor, nil diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 324a8db49bab..480ada925a29 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -229,7 +229,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } - if c.deleteMode == deletion.WholeStreamDeletion { + if c.deleteMode == deletion.WholeStreamDeletion || c.deleteMode == deletion.FilterOnly { deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) From db01476ce8140eed9f9df7d9a4c0534ea55c9782 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 14 Apr 2022 10:16:16 +0100 Subject: [PATCH 17/18] More progress on tests --- .../deletion/integration/deletion_test.go | 337 +++++++++++++++--- 1 file changed, 283 insertions(+), 54 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go index b17585ce2c30..0c0451ca114c 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go @@ -1,60 +1,143 @@ package integration import ( + "bytes" + "errors" "flag" "fmt" "io/ioutil" "net" + "net/http" + "net/url" "os" + "strconv" + "sync" "testing" "time" - "github.com/grafana/loki/pkg/loki" - "github.com/grafana/loki/pkg/util/cfg" + "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/loki" + "github.com/grafana/loki/pkg/util/cfg" ) +func init() { + // hack in a duplication ingoring registry + prometheus.DefaultRegisterer = &wrappedRegisterer{Registerer: prometheus.DefaultRegisterer} +} + +type wrappedRegisterer struct { + prometheus.Registerer +} + +func (w *wrappedRegisterer) Register(collector prometheus.Collector) error { + if err := w.Registerer.Register(collector); err != nil { + var aErr prometheus.AlreadyRegisteredError + if errors.As(err, &aErr) { + return nil + } + return err + } + return nil +} + +func (w *wrappedRegisterer) MustRegister(collectors ...prometheus.Collector) { + for _, c := range collectors { + if err := w.Register(c); err != nil { + panic(err.Error()) + } + } +} + type testCluster struct { sharedPath string - components []testComponent + components []*testComponent + waitGroup sync.WaitGroup +} + +func newTestCluster() *testCluster { + sharedPath, err := ioutil.TempDir("", "loki-sharded-data") + if err != nil { + panic(err.Error()) + } + + return &testCluster{ + sharedPath: sharedPath, + } +} + +func (c *testCluster) run() error { + for _, component := range c.components { + if err := component.run(); err != nil { + return err + } + } + return nil +} +func (c *testCluster) cleanup() error { + errs := multierror.New() + for _, component := range c.components { + errs.Add(component.cleanup()) + } + if c.sharedPath != "" { + errs.Add(os.RemoveAll(c.sharedPath)) + } + if err := errs.Err(); err != nil { + return err + } + c.waitGroup.Wait() + + return nil +} + +func (c *testCluster) addComponent(name string, flags ...string) *testComponent { + component := &testComponent{ + cluster: c, + flags: flags, + } + c.components = append(c.components, component) + return component } type testComponent struct { loki *loki.Loki cluster *testCluster + flags []string httpPort int grpcPort int -} -func (c *testCluster) addComponent(name string, flags ...string) *testComponent { - return nil + configFile string + dataPath string } -func getFreePort() (port int, err error) { - var a *net.TCPAddr - if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { - var l *net.TCPListener - if l, err = net.ListenTCP("tcp", a); err == nil { - defer l.Close() - return l.Addr().(*net.TCPAddr).Port, nil - } +func (c *testComponent) writeConfig() error { + var err error + c.httpPort, err = getFreePort() + if err != nil { + return fmt.Errorf("error allocating HTTP port: %w", err) } - return -} -func newCfg(t *testing.T) string { - httpPort, err := getFreePort() - require.NoError(t, err) + c.grpcPort, err = getFreePort() + if err != nil { + return fmt.Errorf("error allocating GRPC port: %w", err) + } - grpcPort, err := getFreePort() - require.NoError(t, err) + configFile, err := ioutil.TempFile("", "loki-config") + if err != nil { + return fmt.Errorf("error creating config file: %w", err) + } - file, err := ioutil.TempFile("", "loki-config") - require.NoError(t, err) + c.dataPath, err = ioutil.TempDir("", "loki-data") + if err != nil { + return fmt.Errorf("error creating config file: %w", err) + } - _, err = file.Write([]byte(fmt.Sprintf(` + if _, err := configFile.Write([]byte(fmt.Sprintf(` auth_enabled: false server: @@ -62,11 +145,11 @@ server: grpc_listen_port: %d common: - path_prefix: /tmp/loki + path_prefix: %s storage: filesystem: - chunks_directory: /tmp/loki/chunks - rules_directory: /tmp/loki/rules + chunks_directory: %s/chunks + rules_directory: %s/rules replication_factor: 1 ring: instance_addr: 127.0.0.1 @@ -83,51 +166,197 @@ schema_config: prefix: index_ period: 24h +compactor: + working_directory: %s/retention + shared_store: filesystem + retention_enabled: true + analytics: reporting_enabled: false ruler: alertmanager_url: http://localhost:9093 -`, httpPort, grpcPort))) - require.NoError(t, err) +`, + c.httpPort, + c.grpcPort, + c.dataPath, + c.cluster.sharedPath, + c.cluster.sharedPath, + c.dataPath, + ))); err != nil { + return fmt.Errorf("error writing config file: %w", err) + } + if err := configFile.Close(); err != nil { + return fmt.Errorf("error writing config file: %w", err) + } + + c.configFile = configFile.Name() + + return nil +} + +func (c *testComponent) run() error { + if err := c.writeConfig(); err != nil { + return err + } + + var config loki.ConfigWrapper + + var flagset = flag.NewFlagSet("test-flags", flag.ExitOnError) - t.Cleanup(func() { - t.Logf("remove config %s", file.Name()) - os.Remove(file.Name()) + if err := cfg.DynamicUnmarshal(&config, append( + c.flags, + "-config.file", + c.configFile, + ), flagset); err != nil { + return err + } + + if err := config.Validate(); err != nil { + return err + } + + var err error + c.loki, err = loki.New(config.Config) + if err != nil { + return err + } + + var ( + readyCh = make(chan struct{}) + errCh = make(chan error, 1) + ) + + c.loki.ModuleManager.RegisterModule("test-ready", func() (services.Service, error) { + close(readyCh) + return nil, nil }) + c.loki.ModuleManager.AddDependency("test-ready", loki.Server) + c.loki.ModuleManager.AddDependency(loki.All, "test-ready") + c.loki.ModuleManager.AddDependency(loki.Compactor, "test-ready") + + c.cluster.waitGroup.Add(1) + go func() { + defer c.cluster.waitGroup.Done() + err := c.loki.Run(loki.RunOpts{}) + if err != nil { + errCh <- err + } + }() + + select { + case <-readyCh: + break + case err := <-errCh: + return err + } + + return nil +} - return file.Name() +func (c *testComponent) cleanup() error { + errs := multierror.New() + if c.loki != nil { + c.loki.SignalHandler.Stop() + } + if c.dataPath != "" { + errs.Add(os.RemoveAll(c.dataPath)) + } + if c.configFile != "" { + errs.Add(os.Remove(c.configFile)) + } + return errs.Err() +} + +func getFreePort() (port int, err error) { + var a *net.TCPAddr + if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err == nil { + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil + } + } + return } func TestFilterOnlyMonolithCompactor(t *testing.T) { + cluster := newTestCluster() + defer cluster.cleanup() - for _, n := range []string{"a", "b"} { - t.Run(n, func(t *testing.T) { - var config loki.ConfigWrapper + flags := []string{ + "-boltdb.shipper.compactor.deletion-mode=filter-only", + } - var flagset = flag.NewFlagSet("test-flags", flag.ExitOnError) - myCfg := newCfg(t) + var ( + tAll = cluster.addComponent("all", append(flags, "-target=all")...) + tCompactor = cluster.addComponent("compactor", append(flags, "-target=compactor")...) + ) - require.NoError(t, cfg.DynamicUnmarshal(&config, []string{ - "-config.file", myCfg, - }, flagset)) + require.NoError(t, cluster.run()) - require.NoError(t, config.Validate()) + now := time.Now().Add(-5 * time.Minute) - // hack in a fresh registry - reg := prometheus.NewRegistry() - prometheus.DefaultGatherer = reg - prometheus.DefaultRegisterer = reg + // TODO: do not sleep + time.Sleep(time.Second) - l, err := loki.New(config.Config) - require.NoError(t, err) + t.Run("ingest-logs", func(t *testing.T) { + // ingest some log lines + var jsonData = []byte(fmt.Sprintf(`{ + "streams": [ + { + "stream": { + "job": "fake" + }, + "values": [ + [ "%d", "lineA" ], + [ "%d", "lineB" ] + ] + } + ] +}`, now.UnixNano(), now.UnixNano())) + req, err := http.NewRequest("POST", fmt.Sprintf("http://127.0.0.1:%d/loki/api/v1/push", tAll.httpPort), bytes.NewBuffer(jsonData)) + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + require.NoError(t, err) - go func() { - require.NoError(t, l.Run(loki.RunOpts{})) - }() - }) - } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + assert.Equal(t, 204, resp.StatusCode) + }) - time.Sleep(30 * time.Second) + // request a deletion + t.Run("request-deletion", func(t *testing.T) { + reqU, _ := url.Parse(fmt.Sprintf("http://127.0.0.1:%d/loki/api/v1/delete", tCompactor.httpPort)) + reqQ := reqU.Query() + reqQ.Set("query", `{job="fake"}`) // TODO support a real filter with |= lineA`) + // TODO: Investigate why this is not nano seconds as for querying + reqQ.Set("start", strconv.FormatInt(now.Add(-time.Hour).Unix(), 10)) + reqQ.Set("end", strconv.FormatInt(now.Add(time.Millisecond).Unix(), 10)) + reqU.RawQuery = reqQ.Encode() + resp, err := http.Post(reqU.String(), "", nil) + require.NoError(t, err) + assert.Equal(t, 204, resp.StatusCode) + }) + + // TODO: do not sleep + time.Sleep(time.Second) + + t.Run("query", func(t *testing.T) { + reqU, _ := url.Parse(fmt.Sprintf("http://127.0.0.1:%d/loki/api/v1/query_range", tAll.httpPort)) + reqQ := reqU.Query() + reqQ.Set("query", `{job="fake"}`) + reqQ.Set("start", strconv.FormatInt(now.Add(-time.Hour).UnixNano(), 10)) + reqQ.Set("end", strconv.FormatInt(now.Add(time.Millisecond).UnixNano(), 10)) + reqU.RawQuery = reqQ.Encode() + + resp, err := http.Post(reqU.String(), "", nil) + require.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, "", string(body)) + }) } From e79f7ea3622cf789c74534e17196d9f8797f8174 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 14 Apr 2022 10:23:39 +0100 Subject: [PATCH 18/18] test single binary only --- .../deletion/integration/deletion_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go index 0c0451ca114c..c04b5763f4cf 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/integration/deletion_test.go @@ -289,8 +289,8 @@ func TestFilterOnlyMonolithCompactor(t *testing.T) { } var ( - tAll = cluster.addComponent("all", append(flags, "-target=all")...) - tCompactor = cluster.addComponent("compactor", append(flags, "-target=compactor")...) + tAll = cluster.addComponent("all", append(flags, "-target=all")...) + // tCompactor = cluster.addComponent("compactor", append(flags, "-target=compactor")...) ) require.NoError(t, cluster.run()) @@ -326,7 +326,7 @@ func TestFilterOnlyMonolithCompactor(t *testing.T) { // request a deletion t.Run("request-deletion", func(t *testing.T) { - reqU, _ := url.Parse(fmt.Sprintf("http://127.0.0.1:%d/loki/api/v1/delete", tCompactor.httpPort)) + reqU, _ := url.Parse(fmt.Sprintf("http://127.0.0.1:%d/loki/api/v1/delete", tAll.httpPort)) reqQ := reqU.Query() reqQ.Set("query", `{job="fake"}`) // TODO support a real filter with |= lineA`) // TODO: Investigate why this is not nano seconds as for querying @@ -337,6 +337,14 @@ func TestFilterOnlyMonolithCompactor(t *testing.T) { resp, err := http.Post(reqU.String(), "", nil) require.NoError(t, err) assert.Equal(t, 204, resp.StatusCode) + + reqU.RawQuery = "" + resp, err = http.Get(reqU.String()) + assert.Equal(t, 200, resp.StatusCode) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, "", string(body)) }) // TODO: do not sleep