Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share buffer pool across all partitions #310

Merged
merged 3 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions perf/perf-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ProduceArgs struct {
Topic string
Rate int
BatchingTimeMillis int
BatchingMaxSize int
MessageSize int
ProducerQueueSize int
}
Expand All @@ -62,6 +63,8 @@ func newProducerCommand() *cobra.Command {
"Publish rate. Set to 0 to go unthrottled")
flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
"Batching grouping time in millis")
flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "", 128,
"Max size of a batch (in KB)")
Copy link
Member

@wolfstudy wolfstudy Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In pulsar-perf produce about --batch-max-messages is abbreviated and the default value is as follows, can we consider keeping the two consistent?

   -bm, --batch-max-messages
       Maximum number of messages per batch
       Default: 1000

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This are 2 different settings. One is the number of messages in 1 batch and the other is the max size of the batch

flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
"Message size")
flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
Expand All @@ -86,6 +89,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
Topic: produceArgs.Topic,
MaxPendingMessages: produceArgs.ProducerQueueSize,
BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
BatchingMaxSize: uint(produceArgs.BatchingMaxSize * 1024),
})
if err != nil {
log.Fatal(err)
Expand Down
3 changes: 3 additions & 0 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
bb.msgMetadata.UncompressedSize = &uncompressedSize

buffer := bb.buffersPool.GetBuffer()
if buffer == nil {
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
}
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)

callbacks = bb.callbacks
Expand Down
32 changes: 15 additions & 17 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
var (
errFailAddBatch = errors.New("message send failed")
errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")

buffersPool sync.Pool
)

type partitionProducer struct {
Expand All @@ -62,8 +64,7 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker

// Channel where app is posting messages to be published
eventsChan chan interface{}
buffersPool sync.Pool
eventsChan chan interface{}

publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
Expand All @@ -89,18 +90,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}

p := &partitionProducer{
state: producerInit,
log: log.WithField("topic", topic),
client: client,
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
buffersPool: sync.Pool{
New: func() interface{} {
return internal.NewBuffer(1024)
},
},
state: producerInit,
log: log.WithField("topic", topic),
client: client,
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
Expand Down Expand Up @@ -189,8 +185,10 @@ func (p *partitionProducer) grabCnx() error {
type connectionClosed struct{}

func (p *partitionProducer) GetBuffer() internal.Buffer {
b := p.buffersPool.Get().(internal.Buffer)
b.Clear()
b, ok := buffersPool.Get().(internal.Buffer)
if ok {
b.Clear()
}
return b
}

Expand Down Expand Up @@ -452,7 +450,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// Mark this pending item as done
pi.completed = true
// Return buffer to the pool since we're now done using it
p.buffersPool.Put(pi.batchData)
buffersPool.Put(pi.batchData)
}

func (p *partitionProducer) internalClose(req *closeProducer) {
Expand Down