diff --git a/consumer_test.go b/consumer_test.go index 58bbbca48..6f4fe6b00 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -114,6 +114,9 @@ func TestConsumerFunnyOffsets(t *testing.T) { } consumer, err := master.ConsumePartition("my_topic", 0, 2) + if err != nil { + t.Fatal(err) + } message := <-consumer.Messages() if message.Offset != 3 { @@ -245,6 +248,48 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { seedBroker.Close() } +func TestConsumerInterleavedClose(t *testing.T) { + t.Skip("Enable once bug #325 is fixed.") + + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + config := NewConfig() + config.ChannelBufferSize = 0 + master, err := NewConsumer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + c0, err := master.ConsumePartition("my_topic", 0, 0) + if err != nil { + t.Fatal(err) + } + + fetchResponse := new(FetchResponse) + fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) + leader.Returns(fetchResponse) + + c1, err := master.ConsumePartition("my_topic", 1, 0) + if err != nil { + t.Fatal(err) + } + + fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) + leader.Returns(fetchResponse) + + safeClose(t, c1) + safeClose(t, c0) + leader.Close() + seedBroker.Close() +} + // This example shows how to use a consumer with a select statement // dealing with the different channels. func ExampleConsumer_select() {