diff --git a/Makefile b/Makefile index a9b95e332..0066b4e99 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ default: fmt get update test lint GO := go GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -GOTEST := $(GO) test -race -timeout 10m -coverprofile=profile.out -covermode=atomic +GOTEST := $(GO) test -v -race -timeout 10m -coverprofile=profile.out -covermode=atomic FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go') TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') diff --git a/functional_producer_test.go b/functional_producer_test.go index 7c0cd28cc..eec20ff36 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -336,6 +336,67 @@ func testProducingMessages(t *testing.T, config *Config) { safeClose(t, client) } +// TestAsyncProducerRemoteBrokerClosed ensures that the async producer can +// cleanly recover if network connectivity to the remote brokers is lost and +// then subsequently resumed. +// +// https://github.com/Shopify/sarama/issues/2129 +func TestAsyncProducerRemoteBrokerClosed(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + config := NewTestConfig() + config.ClientID = t.Name() + config.Net.MaxOpenRequests = 1 + config.Producer.Flush.MaxMessages = 1 + config.Producer.Return.Successes = true + config.Producer.Retry.Max = 1024 + config.Producer.Retry.Backoff = time.Millisecond * 50 + config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion) + + producer, err := NewAsyncProducer( + FunctionalTestEnv.KafkaBrokerAddrs, + config, + ) + if err != nil { + t.Fatal(err) + } + + // produce some more messages and ensure success + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} + <-producer.Successes() + } + + // shutdown all the active tcp connections + for _, proxy := range FunctionalTestEnv.Proxies { + _ = proxy.Disable() + } + + // produce some more messages + for i := 10; i < 20; i++ { + producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} + } + + // re-open the proxies + for _, proxy := range FunctionalTestEnv.Proxies { + _ = proxy.Enable() + } + + // ensure the previously produced messages succeed + for i := 10; i < 20; i++ { + <-producer.Successes() + } + + // produce some more messages and ensure success + for i := 20; i < 30; i++ { + producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} + <-producer.Successes() + } + + closeProducer(t, producer) +} + func validateMetrics(t *testing.T, client Client) { // Get the broker used by test1 topic var broker *Broker