Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(test): add an fvt for broker deadlock #2144

Merged
merged 3 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default: fmt get update test lint

GO := go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := $(GO) test -race -timeout 10m -coverprofile=profile.out -covermode=atomic
GOTEST := $(GO) test -v -race -timeout 10m -coverprofile=profile.out -covermode=atomic

FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
Expand Down
61 changes: 61 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,67 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, client)
}

// TestAsyncProducerRemoteBrokerClosed ensures that the async producer can
// cleanly recover if network connectivity to the remote brokers is lost and
// then subsequently resumed.
//
// https://github.com/Shopify/sarama/issues/2129
func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewTestConfig()
config.ClientID = t.Name()
config.Net.MaxOpenRequests = 1
config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1024
config.Producer.Retry.Backoff = time.Millisecond * 50
config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)

producer, err := NewAsyncProducer(
FunctionalTestEnv.KafkaBrokerAddrs,
config,
)
if err != nil {
t.Fatal(err)
}

// produce some more messages and ensure success
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
<-producer.Successes()
}

// shutdown all the active tcp connections
for _, proxy := range FunctionalTestEnv.Proxies {
_ = proxy.Disable()
}

// produce some more messages
for i := 10; i < 20; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
}

// re-open the proxies
for _, proxy := range FunctionalTestEnv.Proxies {
_ = proxy.Enable()
}

// ensure the previously produced messages succeed
for i := 10; i < 20; i++ {
<-producer.Successes()
}

// produce some more messages and ensure success
for i := 20; i < 30; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
<-producer.Successes()
}

closeProducer(t, producer)
}

func validateMetrics(t *testing.T, client Client) {
// Get the broker used by test1 topic
var broker *Broker
Expand Down