Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Test log entry deletion #5922

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
return
}
stats.AddHeadChunkBytes(int64(len(e.s)))
newLine, parsedLbs, ok := pipeline.ProcessString(e.s)
newLine, parsedLbs, ok := pipeline.ProcessString(e.t, e.s)
if !ok {
return
}
Expand Down Expand Up @@ -1056,7 +1056,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra

for _, e := range hb.entries {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, parsedLabels, ok := extractor.ProcessString(e.s)
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s)
if !ok {
continue
}
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, ok := e.pipeline.Process(e.currLine)
newLine, lbs, ok := e.pipeline.Process(e.currTs, e.currLine)
if !ok {
continue
}
Expand Down Expand Up @@ -1294,7 +1294,7 @@ type sampleBufferedIterator struct {

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currLine)
val, labels, ok := e.extractor.Process(e.currTs, e.currLine)
if !ok {
continue
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,11 @@ func BenchmarkWrite(b *testing.B) {

type nomatchPipeline struct{}

func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult }
func (nomatchPipeline) Process(line []byte) ([]byte, log.LabelsResult, bool) { return line, nil, false }
func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, bool) {
func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult }
func (nomatchPipeline) Process(_ int64, line []byte) ([]byte, log.LabelsResult, bool) {
return line, nil, false
}
func (nomatchPipeline) ProcessString(_ int64, line string) (string, log.LabelsResult, bool) {
return line, nil, false
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (hb *unorderedHeadBlock) Iterator(
mint,
maxt,
func(ts int64, line string) error {
newLine, parsedLbs, ok := pipeline.ProcessString(line)
newLine, parsedLbs, ok := pipeline.ProcessString(ts, line)
if !ok {
return nil
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
mint,
maxt,
func(ts int64, line string) error {
value, parsedLabels, ok := extractor.ProcessString(line)
value, parsedLabels, ok := extractor.ProcessString(ts, line)
if !ok {
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/math"
"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -317,11 +318,17 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
if err != nil {
return nil, err
}

pipeline, err := expr.Pipeline()
if err != nil {
return nil, err
}

pipeline, err = deletion.SetupPipeline(req, pipeline)
if err != nil {
return nil, err
}

stats := stats.FromContext(ctx)
var iters []iter.EntryIterator

Expand Down Expand Up @@ -355,11 +362,17 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
if err != nil {
return nil, err
}

extractor, err := expr.Extractor()
if err != nil {
return nil, err
}

extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}

stats := stats.FromContext(ctx)
var iters []iter.SampleIterator

Expand Down
193 changes: 130 additions & 63 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,45 +466,16 @@ func Benchmark_OnceSwitch(b *testing.B) {
}

func Test_Iterator(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)
instance := defaultInstance(t)

// insert data.
for i := 0; i < 10; i++ {
// nolint
stream := "dispatcher"
if i%2 == 0 {
stream = "worker"
}
require.NoError(t,
instance.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)},
},
},
},
}),
)
}

// prepare iterators.
it, err := instance.Query(ctx,
it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"} | logfmt`,
Limit: limit,
Limit: uint32(2),
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: direction,
Direction: logproto.BACKWARD,
},
},
)
Expand All @@ -513,14 +484,14 @@ func Test_Iterator(t *testing.T) {
// assert the order is preserved.
var res *logproto.QueryResponse
require.NoError(t,
sendBatches(ctx, it,
sendBatches(context.TODO(), it,
fakeQueryServer(
func(qr *logproto.QueryResponse) error {
res = qr
return nil
},
),
limit),
uint32(2)),
)
require.Equal(t, 2, len(res.Streams))
// each entry translated into a unique stream
Expand All @@ -546,24 +517,142 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool {
}

func Test_ChunkFilter(t *testing.T) {
instance := defaultInstance(t)
instance.chunkFilter = &testFilter{}

it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`,
Limit: uint32(2),
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
},
},
)
require.NoError(t, err)
defer it.Close()

for it.Next() {
require.NoError(t, it.Error())
lbs, err := syntax.ParseLabels(it.Labels())
require.NoError(t, err)
require.NotEqual(t, "dispatcher", lbs.Get("log_stream"))
}
}

func Test_QueryWithDelete(t *testing.T) {
instance := defaultInstance(t)

it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`,
Limit: uint32(2),
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
Deletes: []*logproto.Delete{
{
Selector: `{log_stream="worker"}`,
Start: 0,
End: 10,
},
{
Selector: `{log_stream="dispatcher"}`,
Start: 0,
End: 5,
},
{
Selector: `{log_stream="dispatcher"} |= "9"`,
Start: 0,
End: 10,
},
},
},
},
)
require.NoError(t, err)
defer it.Close()

var logs []string
for it.Next() {
logs = append(logs, it.Entry().Line)
}

require.Equal(t, logs, []string{`msg="dispatcher_7"`})
}

func Test_QuerySampleWithDelete(t *testing.T) {
instance := defaultInstance(t)

it, err := instance.QuerySample(context.TODO(),
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `count_over_time({job="3"}[5m])`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Deletes: []*logproto.Delete{
{
Selector: `{log_stream="worker"}`,
Start: 0,
End: 10,
},
{
Selector: `{log_stream="dispatcher"}`,
Start: 0,
End: 5,
},
{
Selector: `{log_stream="dispatcher"} |= "9"`,
Start: 0,
End: 10,
},
},
},
},
)
require.NoError(t, err)
defer it.Close()

var samples []float64
for it.Next() {
samples = append(samples, it.Sample().Value)
}

require.Equal(t, samples, []float64{1.})
}

func defaultInstance(t *testing.T) *instance {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(
&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)
&ingesterConfig,
"fake",
NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1),
loki_runtime.DefaultTenantConfigs(),
noopWAL{},
NilMetrics,
nil,
nil,
)
insertData(t, instance)

return instance
}

// insert data.
func insertData(t *testing.T, instance *instance) {
for i := 0; i < 10; i++ {
// nolint
stream := "dispatcher"
if i%2 == 0 {
stream = "worker"
}
require.NoError(t,
instance.Push(ctx, &logproto.PushRequest{
instance.Push(context.TODO(), &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream),
Expand All @@ -575,28 +664,6 @@ func Test_ChunkFilter(t *testing.T) {
}),
)
}

// prepare iterators.
it, err := instance.Query(ctx,
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`,
Limit: limit,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: direction,
},
},
)
require.NoError(t, err)
defer it.Close()

for it.Next() {
require.NoError(t, it.Error())
lbs, err := syntax.ParseLabels(it.Labels())
require.NoError(t, err)
require.NotEqual(t, "dispatcher", lbs.Get("log_stream"))
}
}

type fakeQueryServer func(*logproto.QueryResponse) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log

sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.ProcessString(e.Line)
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line)
if !ok {
continue
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func newFileIterator(
streams := map[uint64]*logproto.Stream{}

processLine := func(line string) {
parsedLine, parsedLabels, ok := pipeline.ProcessString(line)
ts := time.Now()
parsedLine, parsedLabels, ok := pipeline.ProcessString(ts.UnixNano(), line)
if !ok {
return
}
Expand All @@ -248,7 +249,7 @@ func newFileIterator(
}

stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Now(),
Timestamp: ts,
Line: parsedLine,
})
}
Expand Down
Loading