Skip to content

Commit

Permalink
Merge pull request #328 from Shopify/more-skipped-tests
Browse files Browse the repository at this point in the history
Add a test for bug #294
  • Loading branch information
eapache committed Mar 10, 2015
2 parents 7cbd3ea + 437cd2f commit f732a1b
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,69 @@ func TestProducerMultipleRetries(t *testing.T) {
closeProducer(t, producer)
}

func TestProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.AckSuccesses = true
config.Producer.Retry.Backoff = 0
config.Producer.Retry.Max = 0
producer, err := NewProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
if msg.Err != ErrNotLeaderForPartition {
t.Error(msg.Err)
}
case <-producer.Successes():
t.Error("Unexpected success")
}
}

seedBroker.Returns(metadataResponse)

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
}

leader.Close()
seedBroker.Close()
safeClose(t, producer)
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleProducer_select() {
Expand Down

0 comments on commit f732a1b

Please sign in to comment.