Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement resolve_canonical_bootstrap_servers_only #2156

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package sarama

import (
"context"
"errors"
"math"
"math/rand"
"net"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/proxy"
)

// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
Expand Down Expand Up @@ -191,6 +196,14 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
transactionCoordinators: make(map[string]int32),
}

if conf.Net.ResolveCanonicalBootstrapServers {
var err error
addrs, err = client.resolveCanonicalNames(addrs)
if err != nil {
return nil, err
}
}

client.randomizeSeedBrokers(addrs)

if conf.Metadata.Full {
Expand Down Expand Up @@ -1227,6 +1240,53 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
}

func (client *client) resolveCanonicalNames(addrs []string) ([]string, error) {
ctx := context.Background()

dialer := client.Config().getDialer()
resolver := net.Resolver{
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
// dial func should only be called once, so switching within is acceptable
switch d := dialer.(type) {
case proxy.ContextDialer:
return d.DialContext(ctx, network, address)
default:
// we have no choice but to ignore the context
return d.Dial(network, address)
}
},
}

canonicalAddrs := make(map[string]struct{}, len(addrs)) // dedupe as we go
for _, addr := range addrs {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err // message includes addr
}

ips, err := resolver.LookupHost(ctx, host)
if err != nil {
return nil, err // message includes host
}
for _, ip := range ips {
ptrs, err := resolver.LookupAddr(ctx, ip)
if err != nil {
return nil, err // message includes ip
}

// unlike the Java client, we do not further check that PTRs resolve
ptr := strings.TrimSuffix(ptrs[0], ".") // trailing dot breaks GSSAPI
canonicalAddrs[net.JoinHostPort(ptr, port)] = struct{}{}
}
}

addrs = make([]string, 0, len(canonicalAddrs))
for addr := range canonicalAddrs {
addrs = append(addrs, addr)
}
return addrs, nil
}

// nopCloserClient embeds an existing Client, but disables
// the Close method (yet all other methods pass
// through unchanged). This is for use in larger structs
Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ type Config struct {
ReadTimeout time.Duration // How long to wait for a response.
WriteTimeout time.Duration // How long to wait for a transmit.

// ResolveCanonicalBootstrapServers turns each bootstrap broker address
// into a set of IPs, then does a reverse lookup on each one to get its
// canonical hostname. This list of hostnames then replaces the
// original address list. Similar to the `client.dns.lookup` option in
// the JVM client, this is especially useful with GSSAPI, where it
// allows providing an alias record instead of individual broker
// hostnames. Defaults to false.
ResolveCanonicalBootstrapServers bool

TLS struct {
// Whether or not to use TLS when connecting to the broker
// (defaults to false).
Expand Down
Loading