From 0873e08af1be2517764849a65e73df72ee54a1a8 Mon Sep 17 00:00:00 2001 From: Kevin Schoonover Date: Mon, 3 Jan 2022 06:33:53 -0800 Subject: [PATCH] agent: support multiple http address in addresses.http (#11582) --- command/agent/agent.go | 2 +- command/agent/agent_test.go | 8 +- command/agent/command.go | 20 ++- command/agent/config.go | 73 ++++++++- command/agent/config_test.go | 74 ++++++++- command/agent/http.go | 140 ++++++++++-------- command/agent/http_test.go | 24 ++- command/agent/testagent.go | 18 ++- website/content/docs/configuration/consul.mdx | 6 +- 9 files changed, 275 insertions(+), 90 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index b070de555968..c608e1d8ba20 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -935,7 +935,7 @@ func (a *Agent) setupClient() error { // If no HTTP health check can be supported nil is returned. func (a *Agent) agentHTTPCheck(server bool) *structs.ServiceCheck { // Resolve the http check address - httpCheckAddr := a.config.normalizedAddrs.HTTP + httpCheckAddr := a.config.normalizedAddrs.HTTP[0] if *a.config.Consul.ChecksUseAdvertise { httpCheckAddr = a.config.AdvertiseAddrs.HTTP } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index f1a834cd4edb..f61e47c049c4 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -105,7 +105,7 @@ func TestAgent_ServerConfig(t *testing.T) { require.Equal(t, "127.0.0.2", conf.Addresses.HTTP) require.Equal(t, "127.0.0.2", conf.Addresses.RPC) require.Equal(t, "127.0.0.2", conf.Addresses.Serf) - require.Equal(t, "127.0.0.2:4646", conf.normalizedAddrs.HTTP) + require.Equal(t, []string{"127.0.0.2:4646"}, conf.normalizedAddrs.HTTP) require.Equal(t, "127.0.0.2:4003", conf.normalizedAddrs.RPC) require.Equal(t, "127.0.0.2:4004", conf.normalizedAddrs.Serf) require.Equal(t, "10.0.0.10:4646", conf.AdvertiseAddrs.HTTP) @@ -166,7 +166,7 @@ func TestAgent_ServerConfig(t *testing.T) { require.Equal(t, "127.0.0.3", conf.Addresses.HTTP) require.Equal(t, "127.0.0.3", conf.Addresses.RPC) require.Equal(t, "127.0.0.3", conf.Addresses.Serf) - require.Equal(t, "127.0.0.3:4646", conf.normalizedAddrs.HTTP) + require.Equal(t, []string{"127.0.0.3:4646"}, conf.normalizedAddrs.HTTP) require.Equal(t, "127.0.0.3:4647", conf.normalizedAddrs.RPC) require.Equal(t, "127.0.0.3:4648", conf.normalizedAddrs.Serf) @@ -563,7 +563,7 @@ func TestAgent_HTTPCheck(t *testing.T) { logger: logger, config: &Config{ AdvertiseAddrs: &AdvertiseAddrs{HTTP: "advertise:4646"}, - normalizedAddrs: &Addresses{HTTP: "normalized:4646"}, + normalizedAddrs: &NormalizedAddrs{HTTP: []string{"normalized:4646"}}, Consul: &config.ConsulConfig{ ChecksUseAdvertise: helper.BoolToPtr(false), }, @@ -587,7 +587,7 @@ func TestAgent_HTTPCheck(t *testing.T) { if check.Protocol != "http" { t.Errorf("expected http proto not: %q", check.Protocol) } - if expected := a.config.normalizedAddrs.HTTP; check.PortLabel != expected { + if expected := a.config.normalizedAddrs.HTTP[0]; check.PortLabel != expected { t.Errorf("expected normalized addr not %q", check.PortLabel) } }) diff --git a/command/agent/command.go b/command/agent/command.go index 558e63ee120e..0cdfc76aab82 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -51,7 +51,7 @@ type Command struct { args []string agent *Agent - httpServer *HTTPServer + httpServers []*HTTPServer logFilter *logutils.LevelFilter logOutput io.Writer retryJoinErrCh chan struct{} @@ -500,13 +500,13 @@ func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOu c.agent = agent // Setup the HTTP server - http, err := NewHTTPServer(agent, config) + httpServers, err := NewHTTPServers(agent, config) if err != nil { agent.Shutdown() c.Ui.Error(fmt.Sprintf("Error starting http server: %s", err)) return err } - c.httpServer = http + c.httpServers = httpServers // If DisableUpdateCheck is not enabled, set up update checking // (DisableUpdateCheck is false by default) @@ -700,8 +700,10 @@ func (c *Command) Run(args []string) int { // Shutdown the http server at the end, to ease debugging if // the agent takes long to shutdown - if c.httpServer != nil { - c.httpServer.Shutdown() + if len(c.httpServers) > 0 { + for _, srv := range c.httpServers { + srv.Shutdown() + } } }() @@ -902,13 +904,15 @@ WAIT: func (c *Command) reloadHTTPServer() error { c.agent.logger.Info("reloading HTTP server with new TLS configuration") - c.httpServer.Shutdown() + for _, srv := range c.httpServers { + srv.Shutdown() + } - http, err := NewHTTPServer(c.agent, c.agent.config) + httpServers, err := NewHTTPServers(c.agent, c.agent.config) if err != nil { return err } - c.httpServer = http + c.httpServers = httpServers return nil } diff --git a/command/agent/config.go b/command/agent/config.go index 740da6cb6b18..48573396c1f2 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -85,7 +85,7 @@ type Config struct { Addresses *Addresses `hcl:"addresses"` // normalizedAddr is set to the Address+Port by normalizeAddrs() - normalizedAddrs *Addresses + normalizedAddrs *NormalizedAddrs // AdvertiseAddrs is used to control the addresses we advertise. AdvertiseAddrs *AdvertiseAddrs `hcl:"advertise"` @@ -774,6 +774,15 @@ type Addresses struct { ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` } +// AdvertiseAddrs is used to control the addresses we advertise out for +// different network services. All are optional and default to BindAddr and +// their default Port. +type NormalizedAddrs struct { + HTTP []string + RPC string + Serf string +} + // AdvertiseAddrs is used to control the addresses we advertise out for // different network services. All are optional and default to BindAddr and // their default Port. @@ -1228,13 +1237,13 @@ func (c *Config) normalizeAddrs() error { c.BindAddr = ipStr } - addr, err := normalizeBind(c.Addresses.HTTP, c.BindAddr) + httpAddrs, err := normalizeMultipleBind(c.Addresses.HTTP, c.BindAddr) if err != nil { return fmt.Errorf("Failed to parse HTTP address: %v", err) } - c.Addresses.HTTP = addr + c.Addresses.HTTP = strings.Join(httpAddrs, " ") - addr, err = normalizeBind(c.Addresses.RPC, c.BindAddr) + addr, err := normalizeBind(c.Addresses.RPC, c.BindAddr) if err != nil { return fmt.Errorf("Failed to parse RPC address: %v", err) } @@ -1246,13 +1255,13 @@ func (c *Config) normalizeAddrs() error { } c.Addresses.Serf = addr - c.normalizedAddrs = &Addresses{ - HTTP: net.JoinHostPort(c.Addresses.HTTP, strconv.Itoa(c.Ports.HTTP)), + c.normalizedAddrs = &NormalizedAddrs{ + HTTP: joinHostPorts(httpAddrs, strconv.Itoa(c.Ports.HTTP)), RPC: net.JoinHostPort(c.Addresses.RPC, strconv.Itoa(c.Ports.RPC)), Serf: net.JoinHostPort(c.Addresses.Serf, strconv.Itoa(c.Ports.Serf)), } - addr, err = normalizeAdvertise(c.AdvertiseAddrs.HTTP, c.Addresses.HTTP, c.Ports.HTTP, c.DevMode) + addr, err = normalizeAdvertise(c.AdvertiseAddrs.HTTP, httpAddrs[0], c.Ports.HTTP, c.DevMode) if err != nil { return fmt.Errorf("Failed to parse HTTP advertise address (%v, %v, %v, %v): %v", c.AdvertiseAddrs.HTTP, c.Addresses.HTTP, c.Ports.HTTP, c.DevMode, err) } @@ -1335,6 +1344,22 @@ func parseSingleIPTemplate(ipTmpl string) (string, error) { } } +// parseMultipleIPTemplate is used as a helper function to parse out a multiple IP +// addresses from a config parameter. +func parseMultipleIPTemplate(ipTmpl string) ([]string, error) { + out, err := template.Parse(ipTmpl) + if err != nil { + return []string{}, fmt.Errorf("Unable to parse address template %q: %v", ipTmpl, err) + } + + ips := strings.Split(out, " ") + if len(ips) == 0 { + return []string{}, errors.New("No addresses found, please configure one.") + } + + return deduplicateAddrs(ips), nil +} + // normalizeBind returns a normalized bind address. // // If addr is set it is used, if not the default bind address is used. @@ -1345,6 +1370,16 @@ func normalizeBind(addr, bind string) (string, error) { return parseSingleIPTemplate(addr) } +// normalizeMultipleBind returns normalized bind addresses. +// +// If addr is set it is used, if not the default bind address is used. +func normalizeMultipleBind(addr, bind string) ([]string, error) { + if addr == "" { + return []string{bind}, nil + } + return parseMultipleIPTemplate(addr) +} + // normalizeAdvertise returns a normalized advertise address. // // If addr is set, it is used and the default port is appended if no port is @@ -1996,6 +2031,17 @@ func LoadConfigDir(dir string) (*Config, error) { return result, nil } +// joinHostPorts joins every addr in addrs with the specified port +func joinHostPorts(addrs []string, port string) []string { + localAddrs := make([]string, len(addrs)) + for i, k := range addrs { + localAddrs[i] = net.JoinHostPort(k, port) + + } + + return localAddrs +} + // isTemporaryFile returns true or false depending on whether the // provided file name is a temporary file for the following editors: // emacs or vim. @@ -2004,3 +2050,16 @@ func isTemporaryFile(name string) bool { strings.HasPrefix(name, ".#") || // emacs (strings.HasPrefix(name, "#") && strings.HasSuffix(name, "#")) // emacs } + +func deduplicateAddrs(addrs []string) []string { + keys := make(map[string]bool) + list := []string{} + + for _, entry := range addrs { + if _, value := keys[entry]; !value { + keys[entry] = true + list = append(list, entry) + } + } + return list +} diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 8bb39a98fed6..9dc7f9b8cab4 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -747,7 +747,7 @@ func TestConfig_normalizeAddrs_DevMode(t *testing.T) { t.Fatalf("expected BindAddr 127.0.0.1, got %s", c.BindAddr) } - if c.normalizedAddrs.HTTP != "127.0.0.1:4646" { + if c.normalizedAddrs.HTTP[0] != "127.0.0.1:4646" { t.Fatalf("expected HTTP address 127.0.0.1:4646, got %s", c.normalizedAddrs.HTTP) } @@ -880,6 +880,55 @@ func TestConfig_normalizeAddrs_IPv6Loopback(t *testing.T) { } } +// TestConfig_normalizeAddrs_MultipleInterface asserts that normalizeAddrs will +// handle normalizing multiple interfaces in a single protocol. +func TestConfig_normalizeAddrs_MultipleInterfaces(t *testing.T) { + testCases := []struct { + name string + addressConfig *Addresses + expectedNormalizedAddrs *NormalizedAddrs + expectErr bool + }{ + { + name: "multiple http addresses", + addressConfig: &Addresses{ + HTTP: "127.0.0.1 127.0.0.2", + }, + expectedNormalizedAddrs: &NormalizedAddrs{ + HTTP: []string{"127.0.0.1:4646", "127.0.0.2:4646"}, + RPC: "127.0.0.1:4647", + Serf: "127.0.0.1:4648", + }, + expectErr: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c := &Config{ + BindAddr: "127.0.0.1", + Ports: &Ports{ + HTTP: 4646, + RPC: 4647, + Serf: 4648, + }, + Addresses: tc.addressConfig, + AdvertiseAddrs: &AdvertiseAddrs{ + HTTP: "127.0.0.1", + RPC: "127.0.0.1", + Serf: "127.0.0.1", + }, + } + err := c.normalizeAddrs() + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedNormalizedAddrs, c.normalizedAddrs) + }) + } +} + func TestConfig_normalizeAddrs(t *testing.T) { c := &Config{ BindAddr: "169.254.1.5", @@ -1315,3 +1364,26 @@ func TestEventBroker_Parse(t *testing.T) { require.Equal(20000, *result.EventBufferSize) } } + +func TestParseMultipleIPTemplates(t *testing.T) { + testCases := []struct { + name string + tmpl string + expectedOut []string + expectErr bool + }{ + { + name: "deduplicates same ip", + tmpl: "127.0.0.1 127.0.0.1", + expectedOut: []string{"127.0.0.1"}, + expectErr: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + out, err := parseMultipleIPTemplate(tc.tmpl) + require.NoError(t, err) + require.Equal(t, tc.expectedOut, out) + }) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 0b656f959e59..142f3a6ef90c 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/go-connlimit" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" + multierror "github.com/hashicorp/go-multierror" "github.com/rs/cors" "github.com/hashicorp/nomad/helper/noxssrw" @@ -75,64 +76,24 @@ type HTTPServer struct { wsUpgrader *websocket.Upgrader } -// NewHTTPServer starts new HTTP server over the agent -func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) { - // Start the listener - lnAddr, err := net.ResolveTCPAddr("tcp", config.normalizedAddrs.HTTP) - if err != nil { - return nil, err - } - ln, err := config.Listener("tcp", lnAddr.IP.String(), lnAddr.Port) - if err != nil { - return nil, fmt.Errorf("failed to start HTTP listener: %v", err) - } - - // If TLS is enabled, wrap the listener with a TLS listener - if config.TLSConfig.EnableHTTP { - tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, config.TLSConfig.VerifyHTTPSClient, true) - if err != nil { - return nil, err - } - - tlsConfig, err := tlsConf.IncomingTLSConfig() - if err != nil { - return nil, err - } - ln = tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig) - } - - // Create the mux - mux := http.NewServeMux() - - wsUpgrader := &websocket.Upgrader{ - ReadBufferSize: 2048, - WriteBufferSize: 2048, - } - - // Create the server - srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: ln, - listenerCh: make(chan struct{}), - logger: agent.httpLogger, - Addr: ln.Addr().String(), - wsUpgrader: wsUpgrader, - } - srv.registerHandlers(config.EnableDebug) +// NewHTTPServers starts an HTTP server for every address.http configured in +// the agent. +func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { + var srvs []*HTTPServer + var serverInitializationErrors error // Handle requests with gzip compression gzip, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(0)) if err != nil { - return nil, err + return srvs, err } // Get connection handshake timeout limit handshakeTimeout, err := time.ParseDuration(config.Limits.HTTPSHandshakeTimeout) if err != nil { - return nil, fmt.Errorf("error parsing https_handshake_timeout: %v", err) + return srvs, fmt.Errorf("error parsing https_handshake_timeout: %v", err) } else if handshakeTimeout < 0 { - return nil, fmt.Errorf("https_handshake_timeout must be >= 0") + return srvs, fmt.Errorf("https_handshake_timeout must be >= 0") } // Get max connection limit @@ -141,23 +102,80 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) { maxConns = *mc } if maxConns < 0 { - return nil, fmt.Errorf("http_max_conns_per_client must be >= 0") + return srvs, fmt.Errorf("http_max_conns_per_client must be >= 0") } - // Create HTTP server with timeouts - httpServer := http.Server{ - Addr: srv.Addr, - Handler: gzip(mux), - ConnState: makeConnState(config.TLSConfig.EnableHTTP, handshakeTimeout, maxConns), - ErrorLog: newHTTPServerLogger(srv.logger), + tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, config.TLSConfig.VerifyHTTPSClient, true) + if err != nil && config.TLSConfig.EnableHTTP { + return srvs, fmt.Errorf("failed to initialize HTTP server TLS configuration: %s", err) } - go func() { - defer close(srv.listenerCh) - httpServer.Serve(ln) - }() + wsUpgrader := &websocket.Upgrader{ + ReadBufferSize: 2048, + WriteBufferSize: 2048, + } + + // Start the listener + for _, addr := range config.normalizedAddrs.HTTP { + // Create the mux + mux := http.NewServeMux() + + lnAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + serverInitializationErrors = multierror.Append(serverInitializationErrors, err) + continue + } + ln, err := config.Listener("tcp", lnAddr.IP.String(), lnAddr.Port) + if err != nil { + serverInitializationErrors = multierror.Append(serverInitializationErrors, fmt.Errorf("failed to start HTTP listener: %v", err)) + continue + } + + // If TLS is enabled, wrap the listener with a TLS listener + if config.TLSConfig.EnableHTTP { + tlsConfig, err := tlsConf.IncomingTLSConfig() + if err != nil { + serverInitializationErrors = multierror.Append(serverInitializationErrors, err) + continue + } + ln = tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig) + } + + // Create the server + srv := &HTTPServer{ + agent: agent, + mux: mux, + listener: ln, + listenerCh: make(chan struct{}), + logger: agent.httpLogger, + Addr: ln.Addr().String(), + wsUpgrader: wsUpgrader, + } + srv.registerHandlers(config.EnableDebug) + + // Create HTTP server with timeouts + httpServer := http.Server{ + Addr: srv.Addr, + Handler: gzip(mux), + ConnState: makeConnState(config.TLSConfig.EnableHTTP, handshakeTimeout, maxConns), + ErrorLog: newHTTPServerLogger(srv.logger), + } + + go func() { + defer close(srv.listenerCh) + httpServer.Serve(ln) + }() + + srvs = append(srvs, srv) + } + + if serverInitializationErrors != nil { + for _, srv := range srvs { + srv.Shutdown() + } + } - return srv, nil + return srvs, serverInitializationErrors } // makeConnState returns a ConnState func for use in an http.Server. If @@ -249,7 +267,7 @@ func (s *HTTPServer) Shutdown() { } // registerHandlers is used to attach our handlers to the mux -func (s *HTTPServer) registerHandlers(enableDebug bool) { +func (s HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsRequest)) s.mux.HandleFunc("/v1/jobs/parse", s.wrap(s.JobsParseRequest)) s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest)) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 6083779d2a41..596c3a58c7b6 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -69,6 +69,24 @@ func BenchmarkHTTPRequests(b *testing.B) { }) } +func TestMultipleInterfaces(t *testing.T) { + httpIps := []string{"127.0.0.1", "127.0.0.2"} + + s := makeHTTPServer(t, func(c *Config) { + c.Addresses.HTTP = strings.Join(httpIps, " ") + c.ACL.Enabled = true + }) + defer s.Shutdown() + + httpPort := s.ports[0] + for _, ip := range httpIps { + resp, err := http.Get(fmt.Sprintf("http://%s:%d/", ip, httpPort)) + + assert.Nil(t, err) + assert.Equal(t, resp.StatusCode, 200) + } +} + // TestRootFallthrough tests rootFallthrough handler to // verify redirect and 404 behavior func TestRootFallthrough(t *testing.T) { @@ -945,8 +963,8 @@ func TestHTTPServer_Limits_Error(t *testing.T) { t.Parallel() conf := &Config{ - normalizedAddrs: &Addresses{ - HTTP: "localhost:0", // port is never used + normalizedAddrs: &NormalizedAddrs{ + HTTP: []string{"localhost:0"}, // port is never used }, TLSConfig: &config.TLSConfig{ EnableHTTP: tc.tls, @@ -964,7 +982,7 @@ func TestHTTPServer_Limits_Error(t *testing.T) { config: conf, } - srv, err := NewHTTPServer(agent, conf) + srv, err := NewHTTPServers(agent, conf) require.Error(t, err) require.Nil(t, srv) require.Contains(t, err.Error(), tc.expectedErr) diff --git a/command/agent/testagent.go b/command/agent/testagent.go index 698a8efb00b8..18c463213ae6 100644 --- a/command/agent/testagent.go +++ b/command/agent/testagent.go @@ -66,7 +66,11 @@ type TestAgent struct { // Key is the optional encryption key for the keyring. Key string - // Server is a reference to the started HTTP endpoint. + // All HTTP servers started. Used to prevent server leaks and preserve + // backwards compability. + Servers []*HTTPServer + + // Server is a reference to the primary, started HTTP endpoint. // It is valid after Start(). Server *HTTPServer @@ -255,12 +259,15 @@ func (a *TestAgent) start() (*Agent, error) { } // Setup the HTTP server - http, err := NewHTTPServer(agent, a.Config) + httpServers, err := NewHTTPServers(agent, a.Config) if err != nil { return agent, err } - a.Server = http + // TODO: investigate if there is a way to remove the requirement by updating test. + // Initial pass at implementing this is https://github.com/kevinschoonover/nomad/tree/tests. + a.Servers = httpServers + a.Server = httpServers[0] return agent, nil } @@ -284,7 +291,10 @@ func (a *TestAgent) Shutdown() error { ch := make(chan error, 1) go func() { defer close(ch) - a.Server.Shutdown() + for _, srv := range a.Servers { + srv.Shutdown() + } + ch <- a.Agent.Shutdown() }() diff --git a/website/content/docs/configuration/consul.mdx b/website/content/docs/configuration/consul.mdx index 80a2dc3de41a..a53cf6d89bb5 100644 --- a/website/content/docs/configuration/consul.mdx +++ b/website/content/docs/configuration/consul.mdx @@ -68,7 +68,11 @@ configuring Nomad to talk to Consul via DNS such as consul.service.consul Consul communication. If this is set then you need to also set `key_file`. - `checks_use_advertise` `(bool: false)` - Specifies if Consul health checks - should bind to the advertise address. By default, this is the bind address. + should bind to the advertise address. By default, this is the first [HTTP + address](https://www.nomadproject.io/docs/configuration#http). If no + [HTTP address](https://www.nomadproject.io/docs/configuration#http) is + specified, it will fall back to the + [bind_addr](https://www.nomadproject.io/docs/configuration#bind_addr). - `client_auto_join` `(bool: true)` - Specifies if the Nomad clients should automatically discover servers in the same region by searching for the Consul