Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose BatchingMaxSize from ProducerOptions #280

Merged
merged 1 commit into from
Jun 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ import (
)

const (
// MaxBatchSize will be the largest size for a batch sent from this particular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
MaxBatchSize = 128 * 1024
// DefaultMaxBatchSize init default for maximum number of bytes per batch
DefaultMaxBatchSize = 128 * 1024

// DefaultMaxMessagesPerBatch init default num of entries in per batch.
DefaultMaxMessagesPerBatch = 1000
)
Expand All @@ -47,6 +46,11 @@ type BatchBuilder struct {
// Max number of message allowed in the batch
maxMessages uint

// The largest size for a batch sent from this praticular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
maxBatchSize uint

producerName string
producerID uint64

Expand All @@ -58,15 +62,19 @@ type BatchBuilder struct {
}

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
if maxBatchSize == 0 {
maxBatchSize = DefaultMaxBatchSize
}
bb := &BatchBuilder{
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
maxBatchSize: maxBatchSize,
producerName: producerName,
producerID: producerID,
cmdSend: baseCommand(pb.BaseCommand_SEND,
Expand All @@ -93,12 +101,12 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,

// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
func (bb *BatchBuilder) IsFull() bool {
return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > uint32(bb.maxBatchSize)
}

func (bb *BatchBuilder) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > uint32(bb.maxBatchSize)
}

// Add will add single message to batch.
Expand Down
7 changes: 6 additions & 1 deletion pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ type ProducerOptions struct {

// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000)
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// batch interval has elapsed.
// BatchingMaxSize (see below) has been reached or the batch interval has elapsed.
BatchingMaxMessages uint

// BatchingMaxSize sets the maximum number of bytes permitted in a batch. (default 128 KB)
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
BatchingMaxSize uint
}

// Producer is used to publish messages on a topic
Expand Down
4 changes: 2 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (p *partitionProducer) grabCnx() error {

p.producerName = res.Response.ProducerSuccess.GetProducerName()
if p.batchBuilder == nil {
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName,
p.producerID, pb.CompressionType(p.options.CompressionType))
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ func TestBatchMessageFlushing(t *testing.T) {
}
defer producer.Close()

maxBytes := internal.MaxBatchSize
maxBytes := internal.DefaultMaxBatchSize
genbytes := func(n int) []byte {
c := []byte("a")[0]
bytes := make([]byte, n)
Expand Down