From 7d7ac520c44adcdfae1a2b8aa63f02da5071e644 Mon Sep 17 00:00:00 2001 From: George Brighton Date: Mon, 31 Jul 2023 19:42:38 +0100 Subject: [PATCH] Implement resolve_canonical_bootstrap_servers_only (#2156) Signed-off-by: George Brighton --- client.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ config.go | 9 +++++++++ 2 files changed, 69 insertions(+) diff --git a/client.go b/client.go index d9fb77d64..48e22342f 100644 --- a/client.go +++ b/client.go @@ -1,13 +1,18 @@ package sarama import ( + "context" "errors" "math" "math/rand" + "net" "sort" + "strings" "sync" "sync/atomic" "time" + + "golang.org/x/net/proxy" ) // Client is a generic Kafka client. It manages connections to one or more Kafka brokers. @@ -191,6 +196,14 @@ func NewClient(addrs []string, conf *Config) (Client, error) { transactionCoordinators: make(map[string]int32), } + if conf.Net.ResolveCanonicalBootstrapServers { + var err error + addrs, err = client.resolveCanonicalNames(addrs) + if err != nil { + return nil, err + } + } + client.randomizeSeedBrokers(addrs) if conf.Metadata.Full { @@ -1227,6 +1240,53 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo return retry(Wrap(ErrOutOfBrokers, brokerErrors...)) } +func (client *client) resolveCanonicalNames(addrs []string) ([]string, error) { + ctx := context.Background() + + dialer := client.Config().getDialer() + resolver := net.Resolver{ + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + // dial func should only be called once, so switching within is acceptable + switch d := dialer.(type) { + case proxy.ContextDialer: + return d.DialContext(ctx, network, address) + default: + // we have no choice but to ignore the context + return d.Dial(network, address) + } + }, + } + + canonicalAddrs := make(map[string]struct{}, len(addrs)) // dedupe as we go + for _, addr := range addrs { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err // message includes addr + } + + ips, err := resolver.LookupHost(ctx, host) + if err != nil { + return nil, err // message includes host + } + for _, ip := range ips { + ptrs, err := resolver.LookupAddr(ctx, ip) + if err != nil { + return nil, err // message includes ip + } + + // unlike the Java client, we do not further check that PTRs resolve + ptr := strings.TrimSuffix(ptrs[0], ".") // trailing dot breaks GSSAPI + canonicalAddrs[net.JoinHostPort(ptr, port)] = struct{}{} + } + } + + addrs = make([]string, 0, len(canonicalAddrs)) + for addr := range canonicalAddrs { + addrs = append(addrs, addr) + } + return addrs, nil +} + // nopCloserClient embeds an existing Client, but disables // the Close method (yet all other methods pass // through unchanged). This is for use in larger structs diff --git a/config.go b/config.go index eb27d98ac..19241e2e6 100644 --- a/config.go +++ b/config.go @@ -50,6 +50,15 @@ type Config struct { ReadTimeout time.Duration // How long to wait for a response. WriteTimeout time.Duration // How long to wait for a transmit. + // ResolveCanonicalBootstrapServers turns each bootstrap broker address + // into a set of IPs, then does a reverse lookup on each one to get its + // canonical hostname. This list of hostnames then replaces the + // original address list. Similar to the `client.dns.lookup` option in + // the JVM client, this is especially useful with GSSAPI, where it + // allows providing an alias record instead of individual broker + // hostnames. Defaults to false. + ResolveCanonicalBootstrapServers bool + TLS struct { // Whether or not to use TLS when connecting to the broker // (defaults to false).