diff --git a/queue/queued_channel.go b/queue/queued_channel.go index 502d5237..1b045389 100644 --- a/queue/queued_channel.go +++ b/queue/queued_channel.go @@ -9,6 +9,7 @@ import ( // buffer size should be. type QueuedChannel[T any] struct { ch chan T + stopCh chan struct{} items []T cond *sync.Cond closed atomicBool // Should use atomic.Bool once we use Go 1.19! @@ -16,9 +17,10 @@ type QueuedChannel[T any] struct { func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T] { queue := &QueuedChannel[T]{ - ch: make(chan T, chanBufferSize), - items: make([]T, 0, queueCapacity), - cond: sync.NewCond(&sync.Mutex{}), + ch: make(chan T, chanBufferSize), + stopCh: make(chan struct{}), + items: make([]T, 0, queueCapacity), + cond: sync.NewCond(&sync.Mutex{}), } // The queue is initially not closed. @@ -33,7 +35,13 @@ func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T return } - queue.ch <- item + select { + case queue.ch <- item: + continue + + case <-queue.stopCh: + return + } } }() @@ -68,6 +76,12 @@ func (q *QueuedChannel[T]) Close() { q.cond.Broadcast() } +// CloseAndDiscardQueued force closes the channel and does not guarantee that the remaining queued items will be read. +func (q *QueuedChannel[T]) CloseAndDiscardQueued() { + close(q.stopCh) + q.Close() +} + func (q *QueuedChannel[T]) pop() (T, bool) { q.cond.L.Lock() defer q.cond.L.Unlock() diff --git a/queue/queued_channel_test.go b/queue/queued_channel_test.go index e28a3a62..382a3f49 100644 --- a/queue/queued_channel_test.go +++ b/queue/queued_channel_test.go @@ -38,3 +38,15 @@ func TestQueuedChannel(t *testing.T) { // Enqueuing more items after the queue is closed should return false. require.False(t, queue.Enqueue(7, 8, 9)) } + +func TestQueuedChannelDoesNotLeakIfThereAreNoReadersOnCloseAndDiscard(t *testing.T) { + defer goleak.VerifyNone(t) + + // Create a new queued channel. + queue := NewQueuedChannel[int](1, 3) + + // Push some items to the queue. + require.True(t, queue.Enqueue(1, 2, 3)) + + queue.CloseAndDiscardQueued() +}