Skip to content

Commit

Permalink
Share buffer pool across all partitions (#310)
Browse files Browse the repository at this point in the history
## Motivation

When a producer is publishing on many partitions, there can be significant memory overhead in maintaining a per-partition pool. Instead, there's not significant perf impact in using a single shared buffer pool.
  • Loading branch information
merlimat authored Jul 9, 2020
1 parent 6fcaf26 commit 9cbe36f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
4 changes: 4 additions & 0 deletions perf/perf-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ProduceArgs struct {
Topic string
Rate int
BatchingTimeMillis int
BatchingMaxSize int
MessageSize int
ProducerQueueSize int
}
Expand All @@ -62,6 +63,8 @@ func newProducerCommand() *cobra.Command {
"Publish rate. Set to 0 to go unthrottled")
flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
"Batching grouping time in millis")
flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "", 128,
"Max size of a batch (in KB)")
flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
"Message size")
flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
Expand All @@ -86,6 +89,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
Topic: produceArgs.Topic,
MaxPendingMessages: produceArgs.ProducerQueueSize,
BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
BatchingMaxSize: uint(produceArgs.BatchingMaxSize * 1024),
})
if err != nil {
log.Fatal(err)
Expand Down
3 changes: 3 additions & 0 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
bb.msgMetadata.UncompressedSize = &uncompressedSize

buffer := bb.buffersPool.GetBuffer()
if buffer == nil {
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
}
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)

callbacks = bb.callbacks
Expand Down
32 changes: 15 additions & 17 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
var (
errFailAddBatch = errors.New("message send failed")
errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")

buffersPool sync.Pool
)

type partitionProducer struct {
Expand All @@ -62,8 +64,7 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker

// Channel where app is posting messages to be published
eventsChan chan interface{}
buffersPool sync.Pool
eventsChan chan interface{}

publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
Expand All @@ -89,18 +90,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}

p := &partitionProducer{
state: producerInit,
log: log.WithField("topic", topic),
client: client,
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
buffersPool: sync.Pool{
New: func() interface{} {
return internal.NewBuffer(1024)
},
},
state: producerInit,
log: log.WithField("topic", topic),
client: client,
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
Expand Down Expand Up @@ -189,8 +185,10 @@ func (p *partitionProducer) grabCnx() error {
type connectionClosed struct{}

func (p *partitionProducer) GetBuffer() internal.Buffer {
b := p.buffersPool.Get().(internal.Buffer)
b.Clear()
b, ok := buffersPool.Get().(internal.Buffer)
if ok {
b.Clear()
}
return b
}

Expand Down Expand Up @@ -452,7 +450,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// Mark this pending item as done
pi.completed = true
// Return buffer to the pool since we're now done using it
p.buffersPool.Put(pi.batchData)
buffersPool.Put(pi.batchData)
}

func (p *partitionProducer) internalClose(req *closeProducer) {
Expand Down

0 comments on commit 9cbe36f

Please sign in to comment.