Skip to content

Commit

Permalink
Query filtering in the ingester and storage (#5629)
Browse files Browse the repository at this point in the history
* Add timestamp to pipeline and extractor interfaces

* implement filter extractor and pipeline

* implement storage filtering

* implement ingester filtering

* test cleanup

* Refactor pipeline setup to util

* linter

* enable filtering based on mode

* review feedback

* review comments

* review feedback

* no need to include label filters in the test pipeline

* review feedpack
  • Loading branch information
MasslessParticle authored Apr 18, 2022
1 parent d1aff7b commit 5939d5e
Show file tree
Hide file tree
Showing 22 changed files with 858 additions and 159 deletions.
8 changes: 4 additions & 4 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
return
}
stats.AddHeadChunkBytes(int64(len(e.s)))
newLine, parsedLbs, ok := pipeline.ProcessString(e.s)
newLine, parsedLbs, ok := pipeline.ProcessString(e.t, e.s)
if !ok {
return
}
Expand Down Expand Up @@ -1056,7 +1056,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra

for _, e := range hb.entries {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, parsedLabels, ok := extractor.ProcessString(e.s)
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s)
if !ok {
continue
}
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, ok := e.pipeline.Process(e.currLine)
newLine, lbs, ok := e.pipeline.Process(e.currTs, e.currLine)
if !ok {
continue
}
Expand Down Expand Up @@ -1294,7 +1294,7 @@ type sampleBufferedIterator struct {

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currLine)
val, labels, ok := e.extractor.Process(e.currTs, e.currLine)
if !ok {
continue
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,11 @@ func BenchmarkWrite(b *testing.B) {

type nomatchPipeline struct{}

func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult }
func (nomatchPipeline) Process(line []byte) ([]byte, log.LabelsResult, bool) { return line, nil, false }
func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, bool) {
func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult }
func (nomatchPipeline) Process(_ int64, line []byte) ([]byte, log.LabelsResult, bool) {
return line, nil, false
}
func (nomatchPipeline) ProcessString(_ int64, line string) (string, log.LabelsResult, bool) {
return line, nil, false
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (hb *unorderedHeadBlock) Iterator(
mint,
maxt,
func(ts int64, line string) error {
newLine, parsedLbs, ok := pipeline.ProcessString(line)
newLine, parsedLbs, ok := pipeline.ProcessString(ts, line)
if !ok {
return nil
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
mint,
maxt,
func(ts int64, line string) error {
value, parsedLabels, ok := extractor.ProcessString(line)
value, parsedLabels, ok := extractor.ProcessString(ts, line)
if !ok {
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/math"
"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -317,11 +318,17 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
if err != nil {
return nil, err
}

pipeline, err := expr.Pipeline()
if err != nil {
return nil, err
}

pipeline, err = deletion.SetupPipeline(req, pipeline)
if err != nil {
return nil, err
}

stats := stats.FromContext(ctx)
var iters []iter.EntryIterator

Expand Down Expand Up @@ -355,11 +362,17 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
if err != nil {
return nil, err
}

extractor, err := expr.Extractor()
if err != nil {
return nil, err
}

extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}

stats := stats.FromContext(ctx)
var iters []iter.SampleIterator

Expand Down
193 changes: 130 additions & 63 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,45 +466,16 @@ func Benchmark_OnceSwitch(b *testing.B) {
}

func Test_Iterator(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)
instance := defaultInstance(t)

// insert data.
for i := 0; i < 10; i++ {
// nolint
stream := "dispatcher"
if i%2 == 0 {
stream = "worker"
}
require.NoError(t,
instance.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)},
},
},
},
}),
)
}

// prepare iterators.
it, err := instance.Query(ctx,
it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"} | logfmt`,
Limit: limit,
Limit: uint32(2),
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: direction,
Direction: logproto.BACKWARD,
},
},
)
Expand All @@ -513,14 +484,14 @@ func Test_Iterator(t *testing.T) {
// assert the order is preserved.
var res *logproto.QueryResponse
require.NoError(t,
sendBatches(ctx, it,
sendBatches(context.TODO(), it,
fakeQueryServer(
func(qr *logproto.QueryResponse) error {
res = qr
return nil
},
),
limit),
uint32(2)),
)
require.Equal(t, 2, len(res.Streams))
// each entry translated into a unique stream
Expand All @@ -546,24 +517,142 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool {
}

func Test_ChunkFilter(t *testing.T) {
instance := defaultInstance(t)
instance.chunkFilter = &testFilter{}

it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`,
Limit: uint32(2),
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
},
},
)
require.NoError(t, err)
defer it.Close()

for it.Next() {
require.NoError(t, it.Error())
lbs, err := syntax.ParseLabels(it.Labels())
require.NoError(t, err)
require.NotEqual(t, "dispatcher", lbs.Get("log_stream"))
}
}

func Test_QueryWithDelete(t *testing.T) {
instance := defaultInstance(t)

it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`,
Limit: uint32(2),
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
Deletes: []*logproto.Delete{
{
Selector: `{log_stream="worker"}`,
Start: 0,
End: 10,
},
{
Selector: `{log_stream="dispatcher"}`,
Start: 0,
End: 5,
},
{
Selector: `{log_stream="dispatcher"} |= "9"`,
Start: 0,
End: 10,
},
},
},
},
)
require.NoError(t, err)
defer it.Close()

var logs []string
for it.Next() {
logs = append(logs, it.Entry().Line)
}

require.Equal(t, logs, []string{`msg="dispatcher_7"`})
}

func Test_QuerySampleWithDelete(t *testing.T) {
instance := defaultInstance(t)

it, err := instance.QuerySample(context.TODO(),
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `count_over_time({job="3"}[5m])`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Deletes: []*logproto.Delete{
{
Selector: `{log_stream="worker"}`,
Start: 0,
End: 10,
},
{
Selector: `{log_stream="dispatcher"}`,
Start: 0,
End: 5,
},
{
Selector: `{log_stream="dispatcher"} |= "9"`,
Start: 0,
End: 10,
},
},
},
},
)
require.NoError(t, err)
defer it.Close()

var samples []float64
for it.Next() {
samples = append(samples, it.Sample().Value)
}

require.Equal(t, samples, []float64{1.})
}

func defaultInstance(t *testing.T) *instance {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(
&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)
&ingesterConfig,
"fake",
NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1),
loki_runtime.DefaultTenantConfigs(),
noopWAL{},
NilMetrics,
nil,
nil,
)
insertData(t, instance)

return instance
}

// insert data.
func insertData(t *testing.T, instance *instance) {
for i := 0; i < 10; i++ {
// nolint
stream := "dispatcher"
if i%2 == 0 {
stream = "worker"
}
require.NoError(t,
instance.Push(ctx, &logproto.PushRequest{
instance.Push(context.TODO(), &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream),
Expand All @@ -575,28 +664,6 @@ func Test_ChunkFilter(t *testing.T) {
}),
)
}

// prepare iterators.
it, err := instance.Query(ctx,
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`,
Limit: limit,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: direction,
},
},
)
require.NoError(t, err)
defer it.Close()

for it.Next() {
require.NoError(t, it.Error())
lbs, err := syntax.ParseLabels(it.Labels())
require.NoError(t, err)
require.NotEqual(t, "dispatcher", lbs.Get("log_stream"))
}
}

type fakeQueryServer func(*logproto.QueryResponse) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log

sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.ProcessString(e.Line)
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line)
if !ok {
continue
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func newFileIterator(
streams := map[uint64]*logproto.Stream{}

processLine := func(line string) {
parsedLine, parsedLabels, ok := pipeline.ProcessString(line)
ts := time.Now()
parsedLine, parsedLabels, ok := pipeline.ProcessString(ts.UnixNano(), line)
if !ok {
return
}
Expand All @@ -248,7 +249,7 @@ func newFileIterator(
}

stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Now(),
Timestamp: ts,
Line: parsedLine,
})
}
Expand Down
Loading

0 comments on commit 5939d5e

Please sign in to comment.