Skip to content

Commit

Permalink
[exporterhelper] Fix potential deadlocks in BatcherSender shutdown
Browse files Browse the repository at this point in the history
Fixes #10255
  • Loading branch information
dmitryax committed May 29, 2024
1 parent 8da8d0a commit 56f36e9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 9 deletions.
20 changes: 20 additions & 0 deletions .chloggen/fix-batcher-sender-shutdown-deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix potential deadlocks in BatcherSender shutdown

# One or more tracking issues or pull requests related to the change
issues: [10255]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
10 changes: 8 additions & 2 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func (bs *batchSender) exportActiveBatch() {
bs.activeBatch = newEmptyBatch()
}

func (bs *batchSender) resetTimer() {
if !bs.stopped.Load() {
bs.resetTimerCh <- struct{}{}
}
}

// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
Expand Down Expand Up @@ -154,7 +160,7 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand Down Expand Up @@ -194,7 +200,7 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand Down
49 changes: 49 additions & 0 deletions exporter/exporterhelper/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,55 @@ func TestBatchSender_WithBatcherOption(t *testing.T) {
}
}

// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being
// merged.
func TestBatchSender_ShutdownDeadlock(t *testing.T) {
blockMerge := make(chan struct{})

// blockedBatchMergeFunc blocks until the blockMerge channel is closed
blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) {
<-blockMerge
return r1, nil
}

be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc)))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

sink := newFakeRequestSink()

// Send 10 concurrent requests and wait for them to start
startWG := sync.WaitGroup{}
for i := 0; i < 10; i++ {
startWG.Add(1)
go func() {
startWG.Done()
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
}
startWG.Wait()

// Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks,
// then wait for the exporter to finish.
startShutdown := make(chan struct{})
doneShutdown := make(chan struct{})
go func() {
close(startShutdown)
require.Nil(t, be.Shutdown(context.Background()))
close(doneShutdown)
}()
<-startShutdown
close(blockMerge)
<-doneShutdown

// The exporter should have sent only one "merged" batch
assert.Equal(t, uint64(1), sink.requestsCount.Load())

// blockedBatchMergeFunc just returns the first request, so the items count should be 4
assert.Equal(t, uint64(4), sink.itemsCount.Load())
}

func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter {
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption,
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))
Expand Down
12 changes: 5 additions & 7 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ package exporterhelper
import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"testing"
)

var (
Expand Down

0 comments on commit 56f36e9

Please sign in to comment.