diff --git a/pulsar/internal/blocking_queue.go b/pulsar/internal/blocking_queue.go index 37317a1a1a..8162301147 100644 --- a/pulsar/internal/blocking_queue.go +++ b/pulsar/internal/blocking_queue.go @@ -19,8 +19,6 @@ package internal import ( "sync" - - log "github.com/sirupsen/logrus" ) // BlockingQueue is a interface of block queue @@ -43,14 +41,8 @@ type BlockingQueue interface { // Size return the current size of the queue Size() int - // Iterator return an iterator for the queue - Iterator() BlockingQueueIterator -} - -// BlockingQueueIterator abstract a interface of block queue iterator. -type BlockingQueueIterator interface { - HasNext() bool - Next() interface{} + // ReadableSlice returns a new view of the readable items in the queue + ReadableSlice() []interface{} } type blockingQueue struct { @@ -65,12 +57,6 @@ type blockingQueue struct { isNotFull *sync.Cond } -type blockingQueueIterator struct { - bq *blockingQueue - readIdx int - toRead int -} - // NewBlockingQueue init block queue and returns a BlockingQueue func NewBlockingQueue(maxSize int) BlockingQueue { bq := &blockingQueue{ @@ -173,31 +159,19 @@ func (bq *blockingQueue) Size() int { return bq.size } -func (bq *blockingQueue) Iterator() BlockingQueueIterator { +func (bq *blockingQueue) ReadableSlice() []interface{} { bq.mutex.Lock() defer bq.mutex.Unlock() - return &blockingQueueIterator{ - bq: bq, - readIdx: bq.headIdx, - toRead: bq.size, - } -} - -func (bqi *blockingQueueIterator) HasNext() bool { - return bqi.toRead > 0 -} - -func (bqi *blockingQueueIterator) Next() interface{} { - if bqi.toRead == 0 { - log.Panic("Trying to read past the end of the iterator") + res := make([]interface{}, bq.size) + readIdx := bq.headIdx + for i := 0; i < bq.size; i++ { + res[i] = bq.items[readIdx] + readIdx++ + if readIdx == bq.maxSize { + readIdx = 0 + } } - item := bqi.bq.items[bqi.readIdx] - bqi.toRead-- - bqi.readIdx++ - if bqi.readIdx == bqi.bq.maxSize { - bqi.readIdx = 0 - } - return item + return res } diff --git a/pulsar/internal/blocking_queue_test.go b/pulsar/internal/blocking_queue_test.go index 12bb8fcb18..c93b1a6e80 100644 --- a/pulsar/internal/blocking_queue_test.go +++ b/pulsar/internal/blocking_queue_test.go @@ -119,17 +119,32 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) { close(ch) } -func TestBlockingQueueIterator(t *testing.T) { - q := NewBlockingQueue(10) +func TestBlockingQueue_ReadableSlice(t *testing.T) { + q := NewBlockingQueue(3) q.Put(1) q.Put(2) q.Put(3) assert.Equal(t, 3, q.Size()) - i := 1 - for it := q.Iterator(); it.HasNext(); { - assert.Equal(t, i, it.Next()) - i++ - } + items := q.ReadableSlice() + assert.Equal(t, len(items), 3) + assert.Equal(t, items[0], 1) + assert.Equal(t, items[1], 2) + assert.Equal(t, items[2], 3) + + q.Poll() + + items = q.ReadableSlice() + assert.Equal(t, len(items), 2) + assert.Equal(t, items[0], 2) + assert.Equal(t, items[1], 3) + + q.Put(4) + + items = q.ReadableSlice() + assert.Equal(t, len(items), 3) + assert.Equal(t, items[0], 2) + assert.Equal(t, items[1], 3) + assert.Equal(t, items[2], 4) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 18bb9a3977..814d8308bc 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -170,10 +170,11 @@ func (p *partitionProducer) grabCnx() error { p.cnx.RegisterListener(p.producerID, p) p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer") - if p.pendingQueue.Size() > 0 { - p.log.Infof("Resending %d pending batches", p.pendingQueue.Size()) - for it := p.pendingQueue.Iterator(); it.HasNext(); { - p.cnx.WriteData(it.Next().(*pendingItem).batchData) + pendingItems := p.pendingQueue.ReadableSlice() + if len(pendingItems) > 0 { + p.log.Infof("Resending %d pending batches", len(pendingItems)) + for _, pi := range pendingItems { + p.cnx.WriteData(pi.(*pendingItem).batchData) } } return nil