Skip to content

Commit

Permalink
Add a test for bug #294
Browse files Browse the repository at this point in the history
Skip it until the bug is fixed, but at least it will keep up with the API
changes now. Before it was inline in the ticket, and was falling behind.
  • Loading branch information
eapache committed Mar 10, 2015
1 parent 7cbd3ea commit 437cd2f
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,69 @@ func TestProducerMultipleRetries(t *testing.T) {
closeProducer(t, producer)
}

func TestProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.AckSuccesses = true
config.Producer.Retry.Backoff = 0
config.Producer.Retry.Max = 0
producer, err := NewProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
if msg.Err != ErrNotLeaderForPartition {
t.Error(msg.Err)
}
case <-producer.Successes():
t.Error("Unexpected success")
}
}

seedBroker.Returns(metadataResponse)

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
}

leader.Close()
seedBroker.Close()
safeClose(t, producer)
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleProducer_select() {
Expand Down

0 comments on commit 437cd2f

Please sign in to comment.