Skip to content

Commit

Permalink
refactor: improve kafka test container allocation
Browse files Browse the repository at this point in the history
previously it attempted to share one kafka container across all parallel tests using a mutex and a global, but this poses major issues with shutdown, forcing a timed expiry, which is unreliable or slow depending on the constant expiry time
  • Loading branch information
endigma committed Feb 12, 2025
1 parent 12f478c commit 2d9689e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 59 deletions.
81 changes: 32 additions & 49 deletions router-tests/testenv/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@ import (
"github.com/nats-io/nats.go"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
)

var (
kafkaMux sync.Mutex
kafkaData *KafkaData
)

type KafkaData struct {
Client *kgo.Client
Brokers []string
}

Expand All @@ -44,26 +39,19 @@ func getHostPort(resource *dockertest.Resource, id string) string {
return u.Hostname() + ":" + resource.GetPort(id)
}

func setupKafkaServers(t testing.TB) (*KafkaData, error) {
kafkaMux.Lock()
defer kafkaMux.Unlock()

if kafkaData != nil {
return kafkaData, nil
func setupKafkaServer(t testing.TB) (*KafkaData, error) {
pool, err := dockertest.NewPool("")
if err != nil {
return nil, err
}

kafkaData = &KafkaData{}

dockerPool, err := dockertest.NewPool("")
require.NoError(t, err, "could not connect to docker")
require.NoError(t, dockerPool.Client.Ping(), "could not ping docker")

ports, err := freeport.Take(1)
require.NoError(t, err, "could not get free port for kafka")
if err := pool.Client.Ping(); err != nil {
return nil, err
}

port := ports[0]
port := freeport.GetOne(t)

kafkaResource, err := dockerPool.RunWithOptions(&dockertest.RunOptions{
kafkaResource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "bitnami/kafka",
Tag: "3.7.0",
PortBindings: map[docker.Port][]docker.PortBinding{
Expand All @@ -82,40 +70,35 @@ func setupKafkaServers(t testing.TB) (*KafkaData, error) {
"ALLOW_PLAINTEXT_LISTENER=yes",
"KAFKA_KRAFT_CLUSTER_ID=XkpGZQ27R3eTl3OdTm2LYA",
},
}, func(hc *docker.HostConfig) {
hc.AutoRemove = true
hc.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
require.NoError(t, err, "could not start kafka")

// Tried using t.Cleanup here, but it was running far too early, not sure why.
// This can mean running tests quickly in succession will make freeport errors
require.NoError(t, kafkaResource.Expire(30))

kafkaData.Brokers = []string{getHostPort(kafkaResource, "9092/tcp")}
if err != nil {
return nil, err
}

t.Logf("kafka has brokers: %v", kafkaData.Brokers)
t.Cleanup(func() {
if err := pool.Purge(kafkaResource); err != nil {
t.Fatalf("could not purge kafka container: %s", err.Error())
}
})

client, err := kgo.NewClient(
kgo.SeedBrokers(kafkaData.Brokers...),
kgo.SeedBrokers(getHostPort(kafkaResource, "9092/tcp")),
)
require.NoError(t, err, "could not create kafka client")

require.Eventually(t, func() bool {
err := client.Ping(context.Background())
if err != nil {
t.Logf("could not ping kafka: %s", err)
return false
}

t.Logf("kafka is up")
if err != nil {
return nil, err
}

return true
}, 60*time.Second, time.Second)
err = pool.Retry(func() error {
return client.Ping(context.Background())
})
if err != nil {
t.Fatalf("could not ping kafka: %s", err.Error())
}

return kafkaData, nil
return &KafkaData{
Client: client,
Brokers: []string{getHostPort(kafkaResource, "9092/tcp")},
}, nil
}

var (
Expand Down
15 changes: 5 additions & 10 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,20 +367,15 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) {
defer kafkaStarted.Done()

var kafkaSetupErr error
kafkaSetup, kafkaSetupErr = setupKafkaServers(t)
kafkaSetup, kafkaSetupErr = setupKafkaServer(t)
if kafkaSetupErr != nil || kafkaSetup == nil {
t.Fatalf("could not setup kafka: %s", kafkaSetupErr.Error())
return
}
client, err := kgo.NewClient(
kgo.SeedBrokers(kafkaSetup.Brokers...),
)
if err != nil {
t.Fatalf("could not create kafka client: %s", err.Error())
return
}
kafkaClient = client
kafkaAdminClient = kadm.NewClient(client)

kafkaClient = kafkaSetup.Client
kafkaAdminClient = kadm.NewClient(kafkaClient)

cfg.KafkaSeeds = kafkaSetup.Brokers
}()
}
Expand Down

0 comments on commit 2d9689e

Please sign in to comment.