Skip to content

Commit

Permalink
perf: Replace channel check with atomic bool in tailer.send() (#12976)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored May 15, 2024
1 parent e7fdeb9 commit 4a5edf1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
13 changes: 6 additions & 7 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"hash/fnv"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -46,6 +47,7 @@ type tailer struct {
// and the loop and senders should stop
closeChan chan struct{}
closeOnce sync.Once
closed atomic.Bool

blockedAt *time.Time
blockedMtx sync.RWMutex
Expand Down Expand Up @@ -74,6 +76,7 @@ func newTailer(orgID string, expr syntax.LogSelectorExpr, conn TailServer, maxDr
maxDroppedStreams: maxDroppedStreams,
id: generateUniqueID(orgID, expr.String()),
closeChan: make(chan struct{}),
closed: atomic.Bool{},
pipeline: pipeline,
}, nil
}
Expand Down Expand Up @@ -227,17 +230,13 @@ func isMatching(lbs labels.Labels, matchers []*labels.Matcher) bool {
}

func (t *tailer) isClosed() bool {
select {
case <-t.closeChan:
return true
default:
return false
}
return t.closed.Load()
}

func (t *tailer) close() {
t.closeOnce.Do(func() {
// Signal the close channel
// Signal the close channel & flip the atomic bool so tailers will exit
t.closed.Store(true)
close(t.closeChan)

// We intentionally do not close sendChan in order to avoid a panic on
Expand Down
23 changes: 23 additions & 0 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
)

func TestTailer_RoundTrip(t *testing.T) {
t.Parallel()
server := &fakeTailServer{}

lbs := makeRandomLabels()
Expand Down Expand Up @@ -66,6 +67,7 @@ func TestTailer_RoundTrip(t *testing.T) {
}

func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
t.Parallel()
runs := 100

stream := logproto.Stream{
Expand Down Expand Up @@ -103,6 +105,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
}

func Test_dropstream(t *testing.T) {
t.Parallel()
maxDroppedStreams := 10

entry := logproto.Entry{Timestamp: time.Now(), Line: "foo"}
Expand Down Expand Up @@ -224,6 +227,7 @@ func Test_TailerSendRace(t *testing.T) {
}

func Test_IsMatching(t *testing.T) {
t.Parallel()
for _, tt := range []struct {
name string
lbs labels.Labels
Expand All @@ -241,6 +245,7 @@ func Test_IsMatching(t *testing.T) {
}

func Test_StructuredMetadata(t *testing.T) {
t.Parallel()
lbs := makeRandomLabels()

for _, tc := range []struct {
Expand Down Expand Up @@ -364,3 +369,21 @@ func Test_StructuredMetadata(t *testing.T) {
})
}
}

func Benchmark_isClosed(t *testing.B) {
var server fakeTailServer
expr, err := syntax.ParseLogSelector(`{app="foo"}`, true)
require.NoError(t, err)
tail, err := newTailer("foo", expr, &server, 0)
require.NoError(t, err)

require.Equal(t, false, tail.isClosed())

t.ResetTimer()
for i := 0; i < t.N; i++ {
tail.isClosed()
}

tail.close()
require.Equal(t, true, tail.isClosed())
}

0 comments on commit 4a5edf1

Please sign in to comment.