diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 2255fad02af..c15e3d01446 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -24,6 +24,23 @@ import ( "go.opentelemetry.io/otel/log" ) +type concurrentBuffer struct { + b bytes.Buffer + m sync.Mutex +} + +func (b *concurrentBuffer) Write(p []byte) (n int, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.Write(p) +} + +func (b *concurrentBuffer) String() string { + b.m.Lock() + defer b.m.Unlock() + return b.b.String() +} + func TestEmptyBatchConfig(t *testing.T) { assert.NotPanics(t, func() { var bp BatchProcessor @@ -421,7 +438,8 @@ func TestBatchProcessor(t *testing.T) { t.Run("DroppedLogs", func(t *testing.T) { orig := global.GetLogger() t.Cleanup(func() { global.SetLogger(orig) }) - buf := new(bytes.Buffer) + // Use concurrentBuffer for concurrent-safe reading. + buf := new(concurrentBuffer) stdr.SetVerbosity(1) global.SetLogger(stdr.New(stdlog.New(buf, "", 0))) @@ -436,15 +454,23 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) var r Record - assert.NoError(t, b.OnEmit(ctx, r), "queued") - assert.NoError(t, b.OnEmit(ctx, r), "dropped") - - var n int + // First record will be blocked by testExporter.Export + assert.NoError(t, b.OnEmit(ctx, r), "exported record") require.Eventually(t, func() bool { - n = e.ExportN() - return n > 0 + return e.ExportN() > 0 }, 2*time.Second, time.Microsecond, "blocked export not attempted") + // Second record will be written to export queue + assert.NoError(t, b.OnEmit(ctx, r), "export queue record") + require.Eventually(t, func() bool { + return len(b.exporter.input) == cap(b.exporter.input) + }, 2*time.Second, time.Microsecond, "blocked queue read not attempted") + + // Third record will be written to BatchProcessor.q + assert.NoError(t, b.OnEmit(ctx, r), "first queued") + // The previous record will be dropped, as the new one will be written to BatchProcessor.q + assert.NoError(t, b.OnEmit(ctx, r), "second queued") + wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1` assert.Eventually(t, func() bool { return strings.Contains(buf.String(), wantMsg)