Skip to content

Commit

Permalink
Merge pull request #368 from Shopify/flush-retry-recovery
Browse files Browse the repository at this point in the history
producer: bugfix for aggregators getting stuck
  • Loading branch information
eapache committed Mar 19, 2015
2 parents e8ecee7 + a381b36 commit 963015e
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 125 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
Bug Fixes:
- Fix the producer's internal reference counting in certain unusual scenarios
([#367](https://github.com/Shopify/sarama/pull/367)).
- Fix a condition where the producer's internal control messages could have
gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).

#### Version 1.0.0 (2015-03-17)

Expand Down
1 change: 1 addition & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
bytesAccumulated += msg.byteSize()

if defaultFlush ||
msg.flags&chaser == chaser ||
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
doFlush = flusher
Expand Down
214 changes: 89 additions & 125 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ func closeProducer(t *testing.T, p AsyncProducer) {
wg.Wait()
}

func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
for i := 0; i < successes; i++ {
select {
case msg := <-p.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-p.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
}

func TestAsyncProducer(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
Expand Down Expand Up @@ -103,19 +119,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
for i := 0; i < 5; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 5)
}

closeProducer(t, producer)
Expand Down Expand Up @@ -155,19 +159,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

closeProducer(t, producer)
leader1.Close()
Expand Down Expand Up @@ -210,38 +202,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)
leader1.Close()

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

leader2.Close()
closeProducer(t, producer)
Expand Down Expand Up @@ -276,19 +244,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)
seedBroker.Close()
leader.Close()

Expand Down Expand Up @@ -331,19 +287,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)
seedBroker.Close()
leader2.Close()

Expand Down Expand Up @@ -391,37 +335,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

seedBroker.Close()
leader1.Close()
Expand Down Expand Up @@ -479,13 +399,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
}
expectSuccesses(t, producer, 10)

leader.Close()
seedBroker.Close()
Expand Down Expand Up @@ -518,22 +432,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
expectSuccesses(t, producer, 1)

// prime partition 1
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
expectSuccesses(t, producer, 1)

// reboot the broker (the producer will get EOF on its existing connection)
leader.Close()
Expand All @@ -549,11 +455,69 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
expectSuccesses(t, producer, 1)

// shutdown
closeProducer(t, producer)
seedBroker.Close()
leader.Close()
}

func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Producer.Flush.Messages = 5
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
config.Producer.Retry.Max = 1
config.Producer.Partitioner = NewManualPartitioner
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// prime partitions
for p := int32(0); p < 2; p++ {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
}
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)
}

// send more messages on partition 0
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)

// succeed this time
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)

// put five more through
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)

// shutdown
closeProducer(t, producer)
Expand Down

0 comments on commit 963015e

Please sign in to comment.