From 437cd2f8c4af921fa59a846426b547c48d1a603e Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 10 Mar 2015 13:08:03 +0000 Subject: [PATCH] Add a test for bug #294 Skip it until the bug is fixed, but at least it will keep up with the API changes now. Before it was inline in the ticket, and was falling behind. --- producer_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/producer_test.go b/producer_test.go index 12146b79d..b4b96fe31 100644 --- a/producer_test.go +++ b/producer_test.go @@ -509,6 +509,69 @@ func TestProducerMultipleRetries(t *testing.T) { closeProducer(t, producer) } +func TestProducerOutOfRetries(t *testing.T) { + t.Skip("Enable once bug #294 is fixed.") + + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.AckSuccesses = true + config.Producer.Retry.Backoff = 0 + config.Producer.Retry.Max = 0 + producer, err := NewProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + prodNotLeader := new(ProduceResponse) + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) + leader.Returns(prodNotLeader) + + for i := 0; i < 10; i++ { + select { + case msg := <-producer.Errors(): + if msg.Err != ErrNotLeaderForPartition { + t.Error(msg.Err) + } + case <-producer.Successes(): + t.Error("Unexpected success") + } + } + + seedBroker.Returns(metadataResponse) + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + + for i := 0; i < 10; i++ { + select { + case msg := <-producer.Errors(): + t.Error(msg.Err) + case <-producer.Successes(): + } + } + + leader.Close() + seedBroker.Close() + safeClose(t, producer) +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleProducer_select() {