From 46f027fbb02e10d590cc7f99fc24b8f0764cbde8 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 28 Jun 2021 15:57:15 -0400 Subject: [PATCH] [Elastic Agent] Use http2 to connect to Fleet Server. (#26474) * Use http2 to connect to Fleet Server. * Add changelog. * Fix import formatting. * Fix issue with tls and http2. (cherry picked from commit 638b62df72bd8d69cd596a3e55b1f72f63f882ca) --- libbeat/common/transport/tls.go | 57 ++++++++++++++++++++ x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + x-pack/elastic-agent/pkg/remote/client.go | 32 ++++++----- 3 files changed, 77 insertions(+), 13 deletions(-) diff --git a/libbeat/common/transport/tls.go b/libbeat/common/transport/tls.go index 10ece84dc47..edef5a6ab9f 100644 --- a/libbeat/common/transport/tls.go +++ b/libbeat/common/transport/tls.go @@ -73,6 +73,63 @@ func TestTLSDialer( }), nil } +type DialerH2 interface { + Dial(network, address string, cfg *tls.Config) (net.Conn, error) +} + +type DialerFuncH2 func(network, address string, cfg *tls.Config) (net.Conn, error) + +func (d DialerFuncH2) Dial(network, address string, cfg *tls.Config) (net.Conn, error) { + return d(network, address, cfg) +} + +func TLSDialerH2(forward Dialer, config *tlscommon.TLSConfig, timeout time.Duration) (DialerH2, error) { + return TestTLSDialerH2(testing.NullDriver, forward, config, timeout) +} + +func TestTLSDialerH2( + d testing.Driver, + forward Dialer, + config *tlscommon.TLSConfig, + timeout time.Duration, +) (DialerH2, error) { + var lastTLSConfig *tls.Config + var lastNetwork string + var lastAddress string + var m sync.Mutex + + return DialerFuncH2(func(network, address string, cfg *tls.Config) (net.Conn, error) { + switch network { + case "tcp", "tcp4", "tcp6": + default: + return nil, fmt.Errorf("unsupported network type %v", network) + } + + host, _, err := net.SplitHostPort(address) + if err != nil { + return nil, err + } + + var tlsConfig *tls.Config + m.Lock() + if network == lastNetwork && address == lastAddress { + tlsConfig = lastTLSConfig + } + if tlsConfig == nil { + tlsConfig = config.BuildModuleClientConfig(host) + lastNetwork = network + lastAddress = address + lastTLSConfig = tlsConfig + } + m.Unlock() + + // NextProtos must be set from the passed h2 connection or it will fail + tlsConfig.NextProtos = cfg.NextProtos + + return tlsDialWith(d, forward, network, address, timeout, tlsConfig, config) + }), nil +} + func tlsDialWith( d testing.Driver, dialer Dialer, diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 3842938d827..23978aa6800 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -116,3 +116,4 @@ - Use `filestream` input for internal log collection. {pull}25660[25660] - Enable agent to send custom headers to kibana/ES {pull}26275[26275] - Set `agent.id` to the Fleet Agent ID in events published from inputs backed by Beats. {issue}21121[21121] {pull}26394[26394] +- Communicate with Fleet Server over HTTP2. {pull}26474[26474] diff --git a/x-pack/elastic-agent/pkg/remote/client.go b/x-pack/elastic-agent/pkg/remote/client.go index 281b3798944..ad5f136f7e0 100644 --- a/x-pack/elastic-agent/pkg/remote/client.go +++ b/x-pack/elastic-agent/pkg/remote/client.go @@ -15,6 +15,7 @@ import ( "time" "github.com/pkg/errors" + "golang.org/x/net/http2" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport" @@ -113,8 +114,16 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client hosts := cfg.GetHosts() clients := make([]*requestClient, len(hosts)) for i, host := range cfg.GetHosts() { - var transport http.RoundTripper - transport, err := makeTransport(cfg.Timeout, cfg.TLS) + connStr, err := common.MakeURL(string(cfg.Protocol), p, host, 0) + if err != nil { + return nil, errors.Wrap(err, "invalid fleet-server endpoint") + } + addr, err := url.Parse(connStr) + if err != nil { + return nil, errors.Wrap(err, "invalid fleet-server endpoint") + } + + transport, err := makeTransport(addr.Scheme, cfg.Timeout, cfg.TLS) if err != nil { return nil, err } @@ -136,12 +145,8 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client Timeout: cfg.Timeout, } - url, err := common.MakeURL(string(cfg.Protocol), p, host, 0) - if err != nil { - return nil, errors.Wrap(err, "invalid fleet-server endpoint") - } clients[i] = &requestClient{ - request: prefixRequestFactory(url), + request: prefixRequestFactory(connStr), client: httpClient, } } @@ -272,17 +277,18 @@ func prefixRequestFactory(URL string) requestFunc { } // makeTransport create a transport object based on the TLS configuration. -func makeTransport(timeout time.Duration, tls *tlscommon.Config) (*http.Transport, error) { +func makeTransport(scheme string, timeout time.Duration, tls *tlscommon.Config) (http.RoundTripper, error) { + dialer := transport.NetDialer(timeout) + if scheme == "http" { + return &http.Transport{Dial: dialer.Dial}, nil + } tlsConfig, err := tlscommon.LoadTLSConfig(tls) if err != nil { return nil, errors.Wrap(err, "invalid TLS configuration") } - dialer := transport.NetDialer(timeout) - tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout) + tlsDialer, err := transport.TLSDialerH2(dialer, tlsConfig, timeout) if err != nil { return nil, errors.Wrap(err, "fail to create TLS dialer") } - - // TODO: Dial is deprecated we need to move to DialContext. - return &http.Transport{Dial: dialer.Dial, DialTLS: tlsDialer.Dial}, nil + return &http2.Transport{DialTLS: tlsDialer.Dial}, nil }