Skip to content

Commit

Permalink
Improve tailer matching by using the index. (#3090)
Browse files Browse the repository at this point in the history
* Improve tailer matching by using the index.

```
❯ benchcmp  before.txt after
benchmark                              old ns/op     new ns/op     delta
Benchmark_instance_addNewTailer-16     537731        1479          -99.72%

benchmark                              old allocs     new allocs     delta
Benchmark_instance_addNewTailer-16     244            3              -98.77%

benchmark                              old bytes     new bytes     delta
Benchmark_instance_addNewTailer-16     22098         121           -99.45%
```

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Review feedback.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 7, 2021
1 parent fea5db1 commit cdf4a73
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 18 deletions.
4 changes: 3 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,9 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
return err
}

instance.addNewTailer(tailer)
if err := instance.addNewTailer(tailer); err != nil {
return err
}
tailer.loop()
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,18 +430,17 @@ outer:
return nil
}

func (i *instance) addNewTailer(t *tailer) {
i.streamsMtx.RLock()
for _, stream := range i.streams {
if stream.matchesTailer(t) {
stream.addTailer(t)
}
func (i *instance) addNewTailer(t *tailer) error {
if err := i.forMatchingStreams(t.matchers, func(s *stream) error {
s.addTailer(t)
return nil
}); err != nil {
return err
}
i.streamsMtx.RUnlock()

i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
i.tailers[t.getID()] = t
return nil
}

func (i *instance) addTailersToNewStream(stream *stream) {
Expand All @@ -455,7 +454,7 @@ func (i *instance) addTailersToNewStream(stream *stream) {
continue
}

if stream.matchesTailer(t) {
if isMatching(stream.labels, t.matchers) {
stream.addTailer(t)
}
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,32 @@ func Benchmark_PushInstance(b *testing.B) {
})
}
}

func Benchmark_instance_addNewTailer(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 100000}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

ctx := context.Background()

inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
require.NoError(b, inst.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{},
}))
}
b.Run("addNewTailer", func(b *testing.B) {
for n := 0; n < b.N; n++ {
_ = inst.addNewTailer(t)
}
})
lbs := makeRandomLabels()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, 0, lbs, NilMetrics))
}
})

}
4 changes: 0 additions & 4 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,3 @@ func (s *stream) addTailer(t *tailer) {

s.tailers[t.getID()] = t
}

func (s *stream) matchesTailer(t *tailer) bool {
return t.isWatchingLabels(s.labels)
}
7 changes: 3 additions & 4 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,13 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log
return streamsResult
}

// Returns true if tailer is interested in the passed labelset
func (t *tailer) isWatchingLabels(lbs labels.Labels) bool {
for _, matcher := range t.matchers {
// isMatching returns true if lbs matches all matchers.
func isMatching(lbs labels.Labels, matchers []*labels.Matcher) bool {
for _, matcher := range matchers {
if !matcher.Matches(lbs.Get(matcher.Name)) {
return false
}
}

return true
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,20 @@ func Test_TailerSendRace(t *testing.T) {
}
wg.Wait()
}

func Test_IsMatching(t *testing.T) {
for _, tt := range []struct {
name string
lbs labels.Labels
matchers []*labels.Matcher
matches bool
}{
{"not in lbs", labels.Labels{{Name: "job", Value: "foo"}}, []*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}, false},
{"equal", labels.Labels{{Name: "job", Value: "foo"}}, []*labels.Matcher{{Type: labels.MatchEqual, Name: "job", Value: "foo"}}, true},
{"regex", labels.Labels{{Name: "job", Value: "foo"}}, []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".+oo")}, true},
} {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.matches, isMatching(tt.lbs, tt.matchers))
})
}
}

0 comments on commit cdf4a73

Please sign in to comment.