From 0c7fffeaa87b5edad0b3e94d0340cd472952ec5d Mon Sep 17 00:00:00 2001 From: Daniel Ferstay Date: Fri, 12 Jun 2020 15:46:06 -0700 Subject: [PATCH] Expose BatchingMaxSize from ProducerOptions Previously, the producer maximum batch size was hard-coded to 128 KB. Now, the produdcer maximum batch size is exposed via ProducerOptions and defaults to 128 KB --- pulsar/internal/batch_builder.go | 22 +++++++++++++++------- pulsar/producer.go | 7 ++++++- pulsar/producer_partition.go | 4 ++-- pulsar/producer_test.go | 2 +- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 3b54eba8df..35cdf71f81 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -29,10 +29,9 @@ import ( ) const ( - // MaxBatchSize will be the largest size for a batch sent from this particular producer. - // This is used as a baseline to allocate a new buffer that can hold the entire batch - // without needing costly re-allocations. - MaxBatchSize = 128 * 1024 + // DefaultMaxBatchSize init default for maximum number of bytes per batch + DefaultMaxBatchSize = 128 * 1024 + // DefaultMaxMessagesPerBatch init default num of entries in per batch. DefaultMaxMessagesPerBatch = 1000 ) @@ -47,6 +46,11 @@ type BatchBuilder struct { // Max number of message allowed in the batch maxMessages uint + // The largest size for a batch sent from this praticular producer. + // This is used as a baseline to allocate a new buffer that can hold the entire batch + // without needing costly re-allocations. + maxBatchSize uint + producerName string producerID uint64 @@ -58,15 +62,19 @@ type BatchBuilder struct { } // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. -func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64, +func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType) (*BatchBuilder, error) { if maxMessages == 0 { maxMessages = DefaultMaxMessagesPerBatch } + if maxBatchSize == 0 { + maxBatchSize = DefaultMaxBatchSize + } bb := &BatchBuilder{ buffer: NewBuffer(4096), numMessages: 0, maxMessages: maxMessages, + maxBatchSize: maxBatchSize, producerName: producerName, producerID: producerID, cmdSend: baseCommand(pb.BaseCommand_SEND, @@ -93,12 +101,12 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64, // IsFull check if the size in the current batch exceeds the maximum size allowed by the batch func (bb *BatchBuilder) IsFull() bool { - return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize + return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > uint32(bb.maxBatchSize) } func (bb *BatchBuilder) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize + return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > uint32(bb.maxBatchSize) } // Add will add single message to batch. diff --git a/pulsar/producer.go b/pulsar/producer.go index a951426381..2d630f1110 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -109,8 +109,13 @@ type ProducerOptions struct { // BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000) // If set to a value greater than 1, messages will be queued until this threshold is reached or - // batch interval has elapsed. + // BatchingMaxSize (see below) has been reached or the batch interval has elapsed. BatchingMaxMessages uint + + // BatchingMaxSize sets the maximum number of bytes permitted in a batch. (default 128 KB) + // If set to a value greater than 1, messages will be queued until this threshold is reached or + // BatchingMaxMessages (see above) has been reached or the batch interval has elapsed. + BatchingMaxSize uint } // Producer is used to publish messages on a topic diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 2eb3696f61..0f3d1983bc 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -151,8 +151,8 @@ func (p *partitionProducer) grabCnx() error { p.producerName = res.Response.ProducerSuccess.GetProducerName() if p.batchBuilder == nil { - p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName, - p.producerID, pb.CompressionType(p.options.CompressionType)) + p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, + p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType)) if err != nil { return err } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 9f76873357..2c028c2be6 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -661,7 +661,7 @@ func TestBatchMessageFlushing(t *testing.T) { } defer producer.Close() - maxBytes := internal.MaxBatchSize + maxBytes := internal.DefaultMaxBatchSize genbytes := func(n int) []byte { c := []byte("a")[0] bytes := make([]byte, n)