Skip to content

Commit

Permalink
[Elastic Agent] Use http2 to connect to Fleet Server. (#26474)
Browse files Browse the repository at this point in the history
* Use http2 to connect to Fleet Server.

* Add changelog.

* Fix import formatting.

* Fix issue with tls and http2.

(cherry picked from commit 638b62d)
  • Loading branch information
blakerouse authored and mergify-bot committed Jun 28, 2021
1 parent eb6df62 commit 46f027f
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 13 deletions.
57 changes: 57 additions & 0 deletions libbeat/common/transport/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,63 @@ func TestTLSDialer(
}), nil
}

type DialerH2 interface {
Dial(network, address string, cfg *tls.Config) (net.Conn, error)
}

type DialerFuncH2 func(network, address string, cfg *tls.Config) (net.Conn, error)

func (d DialerFuncH2) Dial(network, address string, cfg *tls.Config) (net.Conn, error) {
return d(network, address, cfg)
}

func TLSDialerH2(forward Dialer, config *tlscommon.TLSConfig, timeout time.Duration) (DialerH2, error) {
return TestTLSDialerH2(testing.NullDriver, forward, config, timeout)
}

func TestTLSDialerH2(
d testing.Driver,
forward Dialer,
config *tlscommon.TLSConfig,
timeout time.Duration,
) (DialerH2, error) {
var lastTLSConfig *tls.Config
var lastNetwork string
var lastAddress string
var m sync.Mutex

return DialerFuncH2(func(network, address string, cfg *tls.Config) (net.Conn, error) {
switch network {
case "tcp", "tcp4", "tcp6":
default:
return nil, fmt.Errorf("unsupported network type %v", network)
}

host, _, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}

var tlsConfig *tls.Config
m.Lock()
if network == lastNetwork && address == lastAddress {
tlsConfig = lastTLSConfig
}
if tlsConfig == nil {
tlsConfig = config.BuildModuleClientConfig(host)
lastNetwork = network
lastAddress = address
lastTLSConfig = tlsConfig
}
m.Unlock()

// NextProtos must be set from the passed h2 connection or it will fail
tlsConfig.NextProtos = cfg.NextProtos

return tlsDialWith(d, forward, network, address, timeout, tlsConfig, config)
}), nil
}

func tlsDialWith(
d testing.Driver,
dialer Dialer,
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,4 @@
- Use `filestream` input for internal log collection. {pull}25660[25660]
- Enable agent to send custom headers to kibana/ES {pull}26275[26275]
- Set `agent.id` to the Fleet Agent ID in events published from inputs backed by Beats. {issue}21121[21121] {pull}26394[26394]
- Communicate with Fleet Server over HTTP2. {pull}26474[26474]
32 changes: 19 additions & 13 deletions x-pack/elastic-agent/pkg/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/net/http2"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport"
Expand Down Expand Up @@ -113,8 +114,16 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client
hosts := cfg.GetHosts()
clients := make([]*requestClient, len(hosts))
for i, host := range cfg.GetHosts() {
var transport http.RoundTripper
transport, err := makeTransport(cfg.Timeout, cfg.TLS)
connStr, err := common.MakeURL(string(cfg.Protocol), p, host, 0)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}
addr, err := url.Parse(connStr)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}

transport, err := makeTransport(addr.Scheme, cfg.Timeout, cfg.TLS)
if err != nil {
return nil, err
}
Expand All @@ -136,12 +145,8 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client
Timeout: cfg.Timeout,
}

url, err := common.MakeURL(string(cfg.Protocol), p, host, 0)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}
clients[i] = &requestClient{
request: prefixRequestFactory(url),
request: prefixRequestFactory(connStr),
client: httpClient,
}
}
Expand Down Expand Up @@ -272,17 +277,18 @@ func prefixRequestFactory(URL string) requestFunc {
}

// makeTransport create a transport object based on the TLS configuration.
func makeTransport(timeout time.Duration, tls *tlscommon.Config) (*http.Transport, error) {
func makeTransport(scheme string, timeout time.Duration, tls *tlscommon.Config) (http.RoundTripper, error) {
dialer := transport.NetDialer(timeout)
if scheme == "http" {
return &http.Transport{Dial: dialer.Dial}, nil
}
tlsConfig, err := tlscommon.LoadTLSConfig(tls)
if err != nil {
return nil, errors.Wrap(err, "invalid TLS configuration")
}
dialer := transport.NetDialer(timeout)
tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout)
tlsDialer, err := transport.TLSDialerH2(dialer, tlsConfig, timeout)
if err != nil {
return nil, errors.Wrap(err, "fail to create TLS dialer")
}

// TODO: Dial is deprecated we need to move to DialContext.
return &http.Transport{Dial: dialer.Dial, DialTLS: tlsDialer.Dial}, nil
return &http2.Transport{DialTLS: tlsDialer.Dial}, nil
}

0 comments on commit 46f027f

Please sign in to comment.