Skip to content

Commit

Permalink
Merge pull request #2122 from Shopify/dnwe/toxiproxy-addr
Browse files Browse the repository at this point in the history
fix(test): make it simpler to re-use toxiproxy
  • Loading branch information
dnwe authored Jan 31, 2022
2 parents 0e6579f + 27b616e commit 3e230d1
Showing 1 changed file with 29 additions and 33 deletions.
62 changes: 29 additions & 33 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -101,6 +100,31 @@ type testEnvironment struct {
KafkaVersion string
}

// setupToxiProxies will configure the toxiproxy proxies with routes for the
// kafka brokers if they don't already exist
func setupToxiProxies(env *testEnvironment, endpoint string) error {
env.ToxiproxyClient = toxiproxy.NewClient(endpoint)
env.Proxies = map[string]*toxiproxy.Proxy{}
env.KafkaBrokerAddrs = nil
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
if err != nil {
proxy, err = env.ToxiproxyClient.CreateProxy(
proxyName,
fmt.Sprintf("0.0.0.0:%d", 29090+i),
fmt.Sprintf("kafka-%d:%d", i, 29090+i),
)
if err != nil {
return fmt.Errorf("failed to create toxiproxy: %w", err)
}
}
env.Proxies[proxyName] = proxy
env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
}
return nil
}

func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
Logger.Println("bringing up docker-based test environment")

Expand Down Expand Up @@ -139,21 +163,8 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err
return fmt.Errorf("failed to run docker-compose to start test environment: %w", err)
}

// Set up toxiproxy Proxies
env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
env.Proxies = map[string]*toxiproxy.Proxy{}
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.CreateProxy(
proxyName,
fmt.Sprintf("0.0.0.0:%d", 29090+i),
fmt.Sprintf("kafka-%d:%d", i, 29090+i),
)
if err != nil {
return fmt.Errorf("failed to create toxiproxy: %w", err)
}
env.Proxies[proxyName] = proxy
env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
if err := setupToxiProxies(env, "http://localhost:8474"); err != nil {
return fmt.Errorf("failed to setup toxiproxies: %w", err)
}

// Wait for the kafka broker to come up
Expand Down Expand Up @@ -218,23 +229,8 @@ func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error
if err != nil {
return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
}
toxiproxyHost := toxiproxyURL.Hostname()

env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
env.Proxies = map[string]*toxiproxy.Proxy{}
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
if err != nil {
return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
}
env.Proxies[proxyName] = proxy
// get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
_, proxyPort, err := net.SplitHostPort(proxy.Listen)
if err != nil {
return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
}
env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, net.JoinHostPort(toxiproxyHost, proxyPort))
if err := setupToxiProxies(env, toxiproxyURL.String()); err != nil {
return false, fmt.Errorf("failed to setup toxiproxies: %w", err)
}

env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
Expand Down

0 comments on commit 3e230d1

Please sign in to comment.