diff --git a/functional_test.go b/functional_test.go index 4c8b35218..fad313a5d 100644 --- a/functional_test.go +++ b/functional_test.go @@ -442,12 +442,69 @@ func SaveProxy(t *testing.T, px string) { func setupFunctionalTest(t testing.TB) { resetProxies(t) + ensureFullyReplicated(t, 60*time.Second, 5*time.Second) } func teardownFunctionalTest(t testing.TB) { resetProxies(t) } +func ensureFullyReplicated(t testing.TB, timeout time.Duration, retry time.Duration) { + config := NewTestConfig() + config.Metadata.Retry.Max = 5 + config.Metadata.Retry.Backoff = 10 * time.Second + config.ClientID = "sarama-ensureFullyReplicated" + config.Version = V2_6_0_0 + + var testTopicNames []string + for topic := range testTopicDetails { + testTopicNames = append(testTopicNames, topic) + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + tick := time.NewTicker(retry) + defer tick.Stop() + + for { + resp, err := func() (*MetadataResponse, error) { + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + return nil, fmt.Errorf("failed to connect to kafka: %w", err) + } + defer client.Close() + + controller, err := client.Controller() + if err != nil { + return nil, fmt.Errorf("failed to connect to kafka controller: %w", err) + } + defer controller.Close() + return controller.GetMetadata(&MetadataRequest{Version: 5, Topics: testTopicNames}) + }() + if err != nil { + Logger.Printf("failed to get metadata during test setup: %v\n", err) + } else { + ok := true + for _, topic := range resp.Topics { + for _, partition := range topic.Partitions { + if len(partition.Isr) != 3 { + ok = false + Logger.Printf("topic %s/%d is not fully-replicated Isr=%v Offline=%v\n", topic.Name, partition.ID, partition.Isr, partition.OfflineReplicas) + } + } + } + if ok { + return + } + } + select { + case <-timer.C: + t.Fatalf("timeout waiting for test topics to be fully replicated") + case <-tick.C: + } + } +} + type kafkaVersion []int func (kv kafkaVersion) satisfies(other kafkaVersion) bool {