From 769744f558cf93819539e45e88170d6dd712de68 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Wed, 29 May 2024 14:47:25 -0700 Subject: [PATCH] [exporterhelper] Fix potential deadlocks in BatcherSender shutdown Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/10255 --- .../fix-batcher-sender-shutdown-deadlock.yaml | 20 ++++++++ exporter/exporterhelper/batch_sender.go | 10 +++- exporter/exporterhelper/batch_sender_test.go | 49 +++++++++++++++++++ exporter/exporterhelper/common_test.go | 12 ++--- 4 files changed, 82 insertions(+), 9 deletions(-) create mode 100644 .chloggen/fix-batcher-sender-shutdown-deadlock.yaml diff --git a/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml b/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml new file mode 100644 index 00000000000..76baecac6b5 --- /dev/null +++ b/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix potential deadlocks in BatcherSender shutdown + +# One or more tracking issues or pull requests related to the change +issues: [10255] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go index 29065bfe980..2370aba54c9 100644 --- a/exporter/exporterhelper/batch_sender.go +++ b/exporter/exporterhelper/batch_sender.go @@ -118,6 +118,12 @@ func (bs *batchSender) exportActiveBatch() { bs.activeBatch = newEmptyBatch() } +func (bs *batchSender) resetTimer() { + if !bs.stopped.Load() { + bs.resetTimerCh <- struct{}{} + } +} + // isActiveBatchReady returns true if the active batch is ready to be exported. // The batch is ready if it has reached the minimum size or the concurrency limit is reached. // Caller must hold the lock. @@ -154,7 +160,7 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err batch := bs.activeBatch if bs.isActiveBatchReady() || len(reqs) > 1 { bs.exportActiveBatch() - bs.resetTimerCh <- struct{}{} + bs.resetTimer() } bs.mu.Unlock() <-batch.done @@ -194,7 +200,7 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { batch := bs.activeBatch if bs.isActiveBatchReady() { bs.exportActiveBatch() - bs.resetTimerCh <- struct{}{} + bs.resetTimer() } bs.mu.Unlock() <-batch.done diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index bc5720a99f8..5f882ec1e96 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -439,6 +439,55 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { } } +// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being +// merged. +func TestBatchSender_ShutdownDeadlock(t *testing.T) { + blockMerge := make(chan struct{}) + + // blockedBatchMergeFunc blocks until the blockMerge channel is closed + blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { + <-blockMerge + return r1, nil + } + + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, + WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.Nil(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // Send 10 concurrent requests and wait for them to start + startWG := sync.WaitGroup{} + for i := 0; i < 10; i++ { + startWG.Add(1) + go func() { + startWG.Done() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + } + startWG.Wait() + + // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, + // then wait for the exporter to finish. + startShutdown := make(chan struct{}) + doneShutdown := make(chan struct{}) + go func() { + close(startShutdown) + require.Nil(t, be.Shutdown(context.Background())) + close(doneShutdown) + }() + <-startShutdown + close(blockMerge) + <-doneShutdown + + // The exporter should have sent only one "merged" batch + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + + // blockedBatchMergeFunc just returns the first request, so the items count should be 4 + assert.Equal(t, uint64(4), sink.itemsCount.Load()) +} + func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index d79b7c07918..7d412256d85 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -6,20 +6,18 @@ package exporterhelper import ( "context" "errors" - "testing" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/codes" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + "testing" ) var (