diff --git a/functional_producer_test.go b/functional_producer_test.go index 4c441907cd..2ca28c8142 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -5,6 +5,7 @@ package sarama import ( "fmt" + "math" "os" "strconv" "strings" @@ -335,6 +336,67 @@ func testProducingMessages(t *testing.T, config *Config) { safeClose(t, client) } +// TestAsyncProducerRemoteBrokerClosed ensures that the async producer can +// cleanly recover if network connectivity to the remote brokers is lost and +// then subsequently resumed. +// +// https://github.com/Shopify/sarama/issues/2129 +func TestAsyncProducerRemoteBrokerClosed(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + config := NewTestConfig() + config.ClientID = t.Name() + config.Net.MaxOpenRequests = 1 + config.Producer.Flush.MaxMessages = 1 + config.Producer.Return.Successes = true + config.Producer.Retry.Max = math.MaxInt32 + config.Producer.Retry.Backoff = time.Millisecond + config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion) + + producer, err := NewAsyncProducer( + FunctionalTestEnv.KafkaBrokerAddrs, + config, + ) + if err != nil { + t.Fatal(err) + } + + // produce some more messages and ensure success + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} + <-producer.Successes() + } + + // shutdown all the active tcp connections + for _, proxy := range FunctionalTestEnv.Proxies { + _ = proxy.Disable() + } + + // produce some more messages + for i := 10; i < 20; i++ { + producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} + } + + // re-open the proxies + for _, proxy := range FunctionalTestEnv.Proxies { + _ = proxy.Enable() + } + + // ensure the previously produced messages succeed + for i := 10; i < 20; i++ { + <-producer.Successes() + } + + // produce some more messages and ensure success + for i := 20; i < 30; i++ { + producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} + <-producer.Successes() + } + + closeProducer(t, producer) +} + func validateMetrics(t *testing.T, client Client) { // Get the broker used by test1 topic var broker *Broker