diff --git a/CHANGELOG.md b/CHANGELOG.md index 3516e388dcb..82c4288d7f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337) - Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338) - Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349) +- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 43d5b042303..5922048fe62 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -16,7 +16,6 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" - "runtime" "sync" "sync/atomic" "time" @@ -84,6 +83,7 @@ type batchSpanProcessor struct { stopWait sync.WaitGroup stopOnce sync.Once stopCh chan struct{} + stopped atomic.Bool } var _ SpanProcessor = (*batchSpanProcessor)(nil) @@ -137,6 +137,11 @@ func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) // OnEnd method enqueues a ReadOnlySpan for later processing. func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) { + // Do not enqueue spans after Shutdown. + if bsp.stopped.Load() { + return + } + // Do not enqueue spans if we are just going to drop them. if bsp.e == nil { return @@ -149,6 +154,7 @@ func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) { func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { var err error bsp.stopOnce.Do(func() { + bsp.stopped.Store(true) wait := make(chan struct{}) go func() { close(bsp.stopCh) @@ -181,11 +187,19 @@ func (f forceFlushSpan) SpanContext() trace.SpanContext { // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { + // Do nothing after Shutdown. + if bsp.stopped.Load() { + return nil + } + var err error if bsp.e != nil { flushCh := make(chan struct{}) if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { select { + case <-bsp.stopCh: + // The batchSpanProcessor is Shutdown. + return nil case <-flushCh: // Processed any items in queue prior to ForceFlush being called case <-ctx.Done(): @@ -326,11 +340,9 @@ func (bsp *batchSpanProcessor) drainQueue() { for { select { case sd := <-bsp.queue: - if sd == nil { - if err := bsp.exportSpans(ctx); err != nil { - otel.Handle(err) - } - return + if _, ok := sd.(forceFlushSpan); ok { + // Ignore flush requests as they are not valid spans. + continue } bsp.batchMutex.Lock() @@ -344,7 +356,11 @@ func (bsp *batchSpanProcessor) drainQueue() { } } default: - close(bsp.queue) + // There are no more enqueued spans. Make final export. + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + return } } } @@ -358,34 +374,11 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { } } -func recoverSendOnClosedChan() { - x := recover() - switch err := x.(type) { - case nil: - return - case runtime.Error: - if err.Error() == "send on closed channel" { - return - } - } - panic(x) -} - func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool { if !sd.SpanContext().IsSampled() { return false } - // This ensures the bsp.queue<- below does not panic as the - // processor shuts down. - defer recoverSendOnClosedChan() - - select { - case <-bsp.stopCh: - return false - default: - } - select { case bsp.queue <- sd: return true @@ -399,16 +392,6 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) return false } - // This ensures the bsp.queue<- below does not panic as the - // processor shuts down. - defer recoverSendOnClosedChan() - - select { - case <-bsp.stopCh: - return false - default: - } - select { case bsp.queue <- sd: return true diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index a033b6a0082..a7430335fbc 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -594,6 +594,49 @@ func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) { } } +func TestBatchSpanProcessorConcurrentSafe(t *testing.T) { + ctx := context.Background() + var bp testBatchExporter + bsp := sdktrace.NewBatchSpanProcessor(&bp) + tp := basicTracerProvider(t) + tp.RegisterSpanProcessor(bsp) + tr := tp.Tracer(t.Name()) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + generateSpan(t, tr, testOption{genNumSpans: 1}) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = bsp.ForceFlush(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = bsp.Shutdown(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = tp.ForceFlush(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = tp.Shutdown(ctx) + }() + + wg.Wait() +} + func BenchmarkSpanProcessor(b *testing.B) { tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(