Skip to content

Commit

Permalink
sdk/log: Fix TestBatchProcessor/DroppedLogs flaky test (#5421)
Browse files Browse the repository at this point in the history
Fix #5384

Run of `go test  -count=1000000 -run="TestBatchProcessor/DroppedLogs"`
**Before**:
Failed with either `Condition never satisfied` or panic
**After**:
Passed

First, bytes.Buffer is not thread-safe, so writing log and reading
(`bytes.String()`) caused panic. Added `concurrentBuffer`
Second, fixed flaky test with 4 records:
1. Record goes to `testExporter.Export` function and blocks in this
function because of `ExportTrigger`
(https://github.com/open-telemetry/opentelemetry-go/blob/19ee6d4775d578357e251828215213782eafed54/sdk/log/exporter_test.go#L87)
2. Record goes to `bufferExporter.input`
(https://github.com/open-telemetry/opentelemetry-go/blob/7c5e64cccc16710b2e75f3d60e96acc07dacd7a7/sdk/log/exporter.go#L129)
3. Record goes to `BatchProcessor.q` queue and it could not be enqueued
to export, because `bufferExporter.input` is full
4. Record goes to `BatchProcessor.q` and because of overfill, drops
third record

---------

Co-authored-by: Sam Xie <sam@samxie.me>
  • Loading branch information
amanakin and XSAM committed May 29, 2024
1 parent 1002078 commit 982e96d
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ import (
"go.opentelemetry.io/otel/log"
)

type concurrentBuffer struct {
b bytes.Buffer
m sync.Mutex
}

func (b *concurrentBuffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}

func (b *concurrentBuffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}

func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
Expand Down Expand Up @@ -421,7 +438,8 @@ func TestBatchProcessor(t *testing.T) {
t.Run("DroppedLogs", func(t *testing.T) {
orig := global.GetLogger()
t.Cleanup(func() { global.SetLogger(orig) })
buf := new(bytes.Buffer)
// Use concurrentBuffer for concurrent-safe reading.
buf := new(concurrentBuffer)
stdr.SetVerbosity(1)
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))

Expand All @@ -436,15 +454,23 @@ func TestBatchProcessor(t *testing.T) {
WithExportTimeout(time.Hour),
)
var r Record
assert.NoError(t, b.OnEmit(ctx, r), "queued")
assert.NoError(t, b.OnEmit(ctx, r), "dropped")

var n int
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")
require.Eventually(t, func() bool {
n = e.ExportN()
return n > 0
return e.ExportN() > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")

// Second record will be written to export queue
assert.NoError(t, b.OnEmit(ctx, r), "export queue record")
require.Eventually(t, func() bool {
return len(b.exporter.input) == cap(b.exporter.input)
}, 2*time.Second, time.Microsecond, "blocked queue read not attempted")

// Third record will be written to BatchProcessor.q
assert.NoError(t, b.OnEmit(ctx, r), "first queued")
// The previous record will be dropped, as the new one will be written to BatchProcessor.q
assert.NoError(t, b.OnEmit(ctx, r), "second queued")

wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1`
assert.Eventually(t, func() bool {
return strings.Contains(buf.String(), wantMsg)
Expand Down

0 comments on commit 982e96d

Please sign in to comment.