Skip to content

Commit

Permalink
Merge pull request #2144 from Shopify/dnwe/add-producer-broker-discon…
Browse files Browse the repository at this point in the history
…nect-fvt

feat(test): add an fvt for broker deadlock
  • Loading branch information
dnwe authored Feb 25, 2022
2 parents 59d2a43 + 22f9584 commit 4b9b976
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default: fmt get update test lint

GO := go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := $(GO) test -race -timeout 10m -coverprofile=profile.out -covermode=atomic
GOTEST := $(GO) test -v -race -timeout 10m -coverprofile=profile.out -covermode=atomic

FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
Expand Down
61 changes: 61 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,67 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, client)
}

// TestAsyncProducerRemoteBrokerClosed ensures that the async producer can
// cleanly recover if network connectivity to the remote brokers is lost and
// then subsequently resumed.
//
// https://github.com/Shopify/sarama/issues/2129
func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewTestConfig()
config.ClientID = t.Name()
config.Net.MaxOpenRequests = 1
config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1024
config.Producer.Retry.Backoff = time.Millisecond * 50
config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)

producer, err := NewAsyncProducer(
FunctionalTestEnv.KafkaBrokerAddrs,
config,
)
if err != nil {
t.Fatal(err)
}

// produce some more messages and ensure success
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
<-producer.Successes()
}

// shutdown all the active tcp connections
for _, proxy := range FunctionalTestEnv.Proxies {
_ = proxy.Disable()
}

// produce some more messages
for i := 10; i < 20; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
}

// re-open the proxies
for _, proxy := range FunctionalTestEnv.Proxies {
_ = proxy.Enable()
}

// ensure the previously produced messages succeed
for i := 10; i < 20; i++ {
<-producer.Successes()
}

// produce some more messages and ensure success
for i := 20; i < 30; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
<-producer.Successes()
}

closeProducer(t, producer)
}

func validateMetrics(t *testing.T, client Client) {
// Get the broker used by test1 topic
var broker *Broker
Expand Down

0 comments on commit 4b9b976

Please sign in to comment.