Skip to content

Commit

Permalink
producer: bugfix for aggregators getting stuck
Browse files Browse the repository at this point in the history
In circumstances where Flush.Messages and/or Flush.Bytes were set but
Flush.Frequency was not, the producer's aggregator could get stuck on a retry
because a metadata-only chaser message would not be enough on its own to trigger
a flush, and so it would sit in limbo forever.

Always trigger a flush in the aggregator when the message is a chaser. This has
the additional benefit of reducing retry latency when Flush.Frequency *is* set.

Add a test for this case.
  • Loading branch information
eapache committed Mar 19, 2015
1 parent c42b7aa commit a381b36
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
Bug Fixes:
- Fix the producer's internal reference counting in certain unusual scenarios
([#367](https://github.com/Shopify/sarama/pull/367)).
- Fix a condition where the producer's internal control messages could have
gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).

#### Version 1.0.0 (2015-03-17)

Expand Down
1 change: 1 addition & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
bytesAccumulated += msg.byteSize()

if defaultFlush ||
msg.flags&chaser == chaser ||
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
doFlush = flusher
Expand Down
62 changes: 62 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,68 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
leader.Close()
}

func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Producer.Flush.Messages = 5
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
config.Producer.Retry.Max = 1
config.Producer.Partitioner = NewManualPartitioner
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// prime partitions
for p := int32(0); p < 2; p++ {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
}
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)
}

// send more messages on partition 0
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)

// succeed this time
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)

// put five more through
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)

// shutdown
closeProducer(t, producer)
seedBroker.Close()
leader.Close()
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down

0 comments on commit a381b36

Please sign in to comment.