Skip to content

Commit

Permalink
chore(fvt): tidyup broker await
Browse files Browse the repository at this point in the history
- ensure test clients are closed after use
- increase initial delay, will be at least 10s to become ready so no
  point polling too early
- bump default KAFKA_VERSION to 3.5.1 if not set

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 11, 2023
1 parent 8d0df91 commit bd81a11
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err
if version, ok := os.LookupEnv("KAFKA_VERSION"); ok {
env.KafkaVersion = version
} else {
env.KafkaVersion = "3.3.2"
env.KafkaVersion = "3.5.1"
}

c := exec.Command("docker-compose", "up", "-d")
Expand All @@ -179,9 +179,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err
}

config := NewFunctionalTestConfig()
if err != nil {
return err
}
config.Net.DialTimeout = 1 * time.Second
config.Net.ReadTimeout = 1 * time.Second
config.Net.WriteTimeout = 1 * time.Second
Expand All @@ -190,9 +187,14 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err
// wait for the kafka brokers to come up
allBrokersUp := false

Logger.Printf("waiting for kafka %s brokers to come up...\n", env.KafkaVersion)
time.Sleep(10 * time.Second)

mainLoop:
for i := 0; i < 30 && !allBrokersUp; i++ {
Logger.Println("waiting for kafka brokers to come up")
if i > 0 {
Logger.Printf("still waiting for kafka %s brokers to come up...\n", env.KafkaVersion)
}
time.Sleep(3 * time.Second)
brokersOk := make([]bool, len(env.KafkaBrokerAddrs))

Expand All @@ -213,22 +215,28 @@ mainLoop:
}
err = client.RefreshMetadata()
if err != nil {
client.Close()
continue
}
brokers := client.Brokers()
if len(brokers) < expectedBrokers {
client.Close()
continue
}
for _, broker := range brokers {
err := broker.Open(client.Config())
if err != nil {
client.Close()
continue retryLoop
}
connected, err := broker.Connected()
if err != nil || !connected {
broker.Close()
client.Close()
continue retryLoop
}
}
client.Close()
brokersOk[j] = true
}

Expand Down

0 comments on commit bd81a11

Please sign in to comment.