Skip to content

Commit

Permalink
Avoid parsing labels when tailer is sending from a stream. (#2973)
Browse files Browse the repository at this point in the history
* Avoid parsing labels when tailer is sending from a stream.

This also include some code cleanup.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* I got linted.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Nov 24, 2020
1 parent e14463b commit 9807f7c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 37 deletions.
12 changes: 6 additions & 6 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func TestConcurrentPushes(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < concurrent; i++ {
l := makeRandomLabels()
for uniqueLabels[l] {
for uniqueLabels[l.String()] {
l = makeRandomLabels()
}
uniqueLabels[l] = true
uniqueLabels[l.String()] = true

wg.Add(1)
go func(labels string) {
Expand All @@ -91,7 +91,7 @@ func TestConcurrentPushes(t *testing.T) {

tt = tt.Add(entriesPerIteration * time.Nanosecond)
}
}(l)
}(l.String())
}

time.Sleep(100 * time.Millisecond) // ready
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestSyncPeriod(t *testing.T) {
result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)})
tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds())))
}
pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls, Entries: result}}}
pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls.String(), Entries: result}}}
err = inst.Push(context.Background(), pr)
require.NoError(t, err)

Expand Down Expand Up @@ -250,12 +250,12 @@ func entries(n int, t time.Time) []logproto.Entry {

var labelNames = []string{"app", "instance", "namespace", "user", "cluster"}

func makeRandomLabels() string {
func makeRandomLabels() labels.Labels {
ls := labels.NewBuilder(nil)
for _, ln := range labelNames {
ls.Set(ln, fmt.Sprintf("%d", rand.Int31()))
}
return ls.Labels().String()
return ls.Labels()
}

func Benchmark_PushInstance(b *testing.B) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
closedTailers = append(closedTailers, tailer.getID())
continue
}
if err := tailer.send(stream); err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to send stream to tailer", "err", err)
}
tailer.send(stream, s.labels)
}
s.tailerMtx.RUnlock()

Expand Down
41 changes: 17 additions & 24 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,52 +110,45 @@ func (t *tailer) loop() {
}
}

func (t *tailer) send(stream logproto.Stream) error {
func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
if t.isClosed() {
return nil
return
}

// if we are already dropping streams due to blocked connection, drop new streams directly to save some effort
if blockedSince := t.blockedSince(); blockedSince != nil {
if blockedSince.Before(time.Now().Add(-time.Second * 15)) {
t.close()
return nil
return
}
t.dropStream(stream)
return nil
return
}

streams, err := t.processStream(stream)
if err != nil {
return err
}
streams := t.processStream(stream, lbs)
if len(streams) == 0 {
return nil
return
}
for _, s := range streams {
select {
case t.sendChan <- &logproto.Stream{Labels: s.Labels, Entries: s.Entries}:
case t.sendChan <- s:
default:
t.dropStream(s)
t.dropStream(*s)
}
}
return nil
}

func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error) {
func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream {
// Optimization: skip filtering entirely, if no filter is set
if log.IsNoopPipeline(t.pipeline) {
return []logproto.Stream{stream}, nil
return []*logproto.Stream{&stream}
}
// pipeline are not thread safe and tailer can process multiple stream at once.
t.pipelineMtx.Lock()
defer t.pipelineMtx.Unlock()

streams := map[uint64]*logproto.Stream{}
lbs, err := logql.ParseLabels(stream.Labels)
if err != nil {
return nil, err
}

sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.Process([]byte(e.Line))
Expand All @@ -174,11 +167,11 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error
Line: string(newLine),
})
}
streamsResult := make([]logproto.Stream, 0, len(streams))
streamsResult := make([]*logproto.Stream, 0, len(streams))
for _, stream := range streams {
streamsResult = append(streamsResult, *stream)
streamsResult = append(streamsResult, stream)
}
return streamsResult, nil
return streamsResult
}

// Returns true if tailer is interested in the passed labelset
Expand Down Expand Up @@ -232,12 +225,12 @@ func (t *tailer) dropStream(stream logproto.Stream) {
blockedAt := time.Now()
t.blockedAt = &blockedAt
}
droppedStream := logproto.DroppedStream{

t.droppedStreams = append(t.droppedStreams, &logproto.DroppedStream{
From: stream.Entries[0].Timestamp,
To: stream.Entries[len(stream.Entries)-1].Timestamp,
Labels: stream.Labels,
}
t.droppedStreams = append(t.droppedStreams, &droppedStream)
})
}

func (t *tailer) popDroppedStreams() []*logproto.DroppedStream {
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -35,7 +36,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
go assert.NotPanics(t, func() {
defer routines.Done()
time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
_ = tailer.send(stream)
tailer.send(stream, labels.Labels{{Name: "type", Value: "test"}})
})

go assert.NotPanics(t, func() {
Expand All @@ -61,14 +62,15 @@ func Test_TailerSendRace(t *testing.T) {
for i := 1; i <= 20; i++ {
wg.Add(1)
go func() {
_ = tail.send(logproto.Stream{
Labels: makeRandomLabels(),
lbs := makeRandomLabels()
tail.send(logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 3), Line: "3"},
},
})
}, lbs)
wg.Done()
}()
}
Expand Down

0 comments on commit 9807f7c

Please sign in to comment.