Skip to content

Commit

Permalink
fix(redpanda): wait for
Browse files Browse the repository at this point in the history
Wait for the admin interface to response to HTTP to avoid failures in
configuring the instance when its not fully ready.

Clean up error wrapping.
  • Loading branch information
stevenh committed Sep 21, 2024
1 parent b823aad commit 438261e
Showing 1 changed file with 40 additions and 30 deletions.
70 changes: 40 additions & 30 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) {
tmpDir, err := os.MkdirTemp("", "redpanda")
if err != nil {
return nil, fmt.Errorf("failed to create directory: %w", err)
return nil, fmt.Errorf("create temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)

Expand Down Expand Up @@ -121,24 +121,24 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// the Kafka API.
entrypointPath := filepath.Join(tmpDir, entrypointFile)
if err := os.WriteFile(entrypointPath, entrypoint, 0o700); err != nil {
return nil, fmt.Errorf("failed to create entrypoint file: %w", err)
return nil, fmt.Errorf("write entrypoint file: %w", err)
}

// 4. Register extra kafka listeners if provided, network aliases will be
// set
if err := registerListeners(settings, req); err != nil {
return nil, fmt.Errorf("failed to register listeners: %w", err)
return nil, fmt.Errorf("register listeners: %w", err)
}

// Bootstrap config file contains cluster configurations which will only be considered
// the very first time you start a cluster.
bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile)
bootstrapConfig, err := renderBootstrapConfig(settings)
if err != nil {
return nil, fmt.Errorf("failed to create bootstrap config file: %w", err)
return nil, err
}
if err := os.WriteFile(bootstrapConfigPath, bootstrapConfig, 0o600); err != nil {
return nil, fmt.Errorf("failed to create bootstrap config file: %w", err)
return nil, fmt.Errorf("write bootstrap config: %w", err)
}

req.Files = append(req.Files,
Expand All @@ -158,11 +158,11 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
if settings.EnableTLS {
certPath := filepath.Join(tmpDir, certFile)
if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil {
return nil, fmt.Errorf("failed to create certificate file: %w", err)
return nil, fmt.Errorf("write certificate file: %w", err)
}
keyPath := filepath.Join(tmpDir, keyFile)
if err := os.WriteFile(keyPath, settings.key, 0o600); err != nil {
return nil, fmt.Errorf("failed to create key file: %w", err)
return nil, fmt.Errorf("write key file: %w", err)
}

req.Files = append(req.Files,
Expand Down Expand Up @@ -192,34 +192,53 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// the Redpanda config with the advertised Kafka address.
hostIP, err := ctr.Host(ctx)
if err != nil {
return c, fmt.Errorf("failed to get container host: %w", err)
return c, fmt.Errorf("host: %w", err)
}

kafkaPort, err := ctr.MappedPort(ctx, nat.Port(defaultKafkaAPIPort))
if err != nil {
return c, fmt.Errorf("failed to get mapped Kafka port: %w", err)
return c, fmt.Errorf("mapped kafka port: %w", err)
}

// 7. Render redpanda.yaml config and mount it.
nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int())
if err != nil {
return c, fmt.Errorf("failed to render node config: %w", err)
return c, err
}

err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 600)
err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 0o600)
if err != nil {
return c, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err)
return c, fmt.Errorf("copy to container: %w", err)
}

// 8. Wait until Redpanda is ready to serve requests.
waitHTTP := wait.ForHTTP(defaultAdminAPIPort).
WithStatusCodeMatcher(func(status int) bool {
return status == http.StatusNotFound
})

var tlsConfig *tls.Config
if settings.EnableTLS {
cert, err := tls.X509KeyPair(settings.cert, settings.key)
if err != nil {
return c, fmt.Errorf("create admin cert: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(settings.cert)
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
waitHTTP = waitHTTP.WithTLS(true, tlsConfig)
}
err = wait.ForAll(
wait.ForListeningPort(defaultKafkaAPIPort),
wait.ForListeningPort(defaultAdminAPIPort),
waitHTTP,
wait.ForListeningPort(defaultSchemaRegistryPort),
wait.ForLog("Successfully started Redpanda!"),
).WaitUntilReady(ctx, ctr)
if err != nil {
return c, fmt.Errorf("failed to wait for Redpanda readiness: %w", err)
return c, fmt.Errorf("wait for readiness: %w", err)
}

c.urlScheme = "http"
Expand All @@ -231,34 +250,25 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
if len(settings.ServiceAccounts) > 0 {
adminAPIPort, err := ctr.MappedPort(ctx, nat.Port(defaultAdminAPIPort))
if err != nil {
return c, fmt.Errorf("failed to get mapped Admin API port: %w", err)
return c, fmt.Errorf("mapped admin port: %w", err)
}

adminAPIUrl := fmt.Sprintf("%s://%v:%d", c.urlScheme, hostIP, adminAPIPort.Int())
adminCl := NewAdminAPIClient(adminAPIUrl)
if settings.EnableTLS {
cert, err := tls.X509KeyPair(settings.cert, settings.key)
if err != nil {
return c, fmt.Errorf("failed to create admin client with cert: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(settings.cert)
adminCl = adminCl.WithHTTPClient(&http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
ForceAttemptHTTP2: true,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
},
TLSClientConfig: tlsConfig,
},
})
}

for username, password := range settings.ServiceAccounts {
if err := adminCl.CreateUser(ctx, username, password); err != nil {
return c, fmt.Errorf("failed to create service account with username %q: %w", username, err)
return c, fmt.Errorf("create user %q: %w", username, err)
}
}
}
Expand Down Expand Up @@ -299,12 +309,12 @@ func renderBootstrapConfig(settings options) ([]byte, error) {

tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl)
if err != nil {
return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err)
return nil, fmt.Errorf("parse bootstrap template: %w", err)
}

var bootstrapConfig bytes.Buffer
if err := tpl.Execute(&bootstrapConfig, bootstrapTplParams); err != nil {
return nil, fmt.Errorf("failed to render redpanda bootstrap config template: %w", err)
return nil, fmt.Errorf("render bootstrap template: %w", err)
}

return bootstrapConfig.Bytes(), nil
Expand Down Expand Up @@ -353,12 +363,12 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)

ncTpl, err := template.New("redpanda.yaml").Parse(nodeConfigTpl)
if err != nil {
return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err)
return nil, fmt.Errorf("parse node config template: %w", err)
}

var redpandaYaml bytes.Buffer
if err := ncTpl.Execute(&redpandaYaml, tplParams); err != nil {
return nil, fmt.Errorf("failed to render redpanda node config template: %w", err)
return nil, fmt.Errorf("render node config template: %w", err)
}

return redpandaYaml.Bytes(), nil
Expand Down

0 comments on commit 438261e

Please sign in to comment.