Skip to content

Commit

Permalink
Merge pull request #1866 from hashicorp/f-bind-addr
Browse files Browse the repository at this point in the history
Use bind address for consul checks
  • Loading branch information
dadgar committed Oct 27, 2016
2 parents 6821210 + 0e47a66 commit 4fc7e30
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 132 deletions.
253 changes: 148 additions & 105 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,92 +134,41 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
conf.EnabledSchedulers = a.config.Server.EnabledSchedulers
}

// Set up the advertise addrs
if addr := a.config.AdvertiseAddrs.Serf; addr != "" {
serfAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving serf advertise address: %s", err)
}
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port
}
if addr := a.config.AdvertiseAddrs.RPC; addr != "" {
rpcAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving rpc advertise address: %s", err)
}
conf.RPCAdvertise = rpcAddr
}

// Set up the bind addresses
if addr := a.config.BindAddr; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
if addr := a.config.Addresses.RPC; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
}

if addr := a.config.Addresses.Serf; addr != "" {
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}

// Set up the ports
if port := a.config.Ports.RPC; port != 0 {
conf.RPCAddr.Port = port
}
if port := a.config.Ports.Serf; port != 0 {
conf.SerfConfig.MemberlistConfig.BindPort = port
}

// Resolve the Server's HTTP Address
if a.config.AdvertiseAddrs.HTTP != "" {
a.serverHTTPAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.serverHTTPAddr = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
} else if a.config.BindAddr != "" {
a.serverHTTPAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.HTTP))
} else {
a.serverHTTPAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.HTTP))
rpcAddr, err := a.getRPCAddr(true)
if err != nil {
return nil, err
}
addr, err := net.ResolveTCPAddr("tcp", a.serverHTTPAddr)
serfAddr, err := a.getSerfAddr(true)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %+q: %v", a.serverHTTPAddr, err)
return nil, err
}
a.serverHTTPAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
conf.RPCAddr.Port = rpcAddr.Port
conf.RPCAddr.IP = rpcAddr.IP
conf.SerfConfig.MemberlistConfig.BindPort = serfAddr.Port
conf.SerfConfig.MemberlistConfig.BindAddr = serfAddr.IP.String()

// Resolve the Server's RPC Address
if a.config.AdvertiseAddrs.RPC != "" {
a.serverRPCAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
a.serverRPCAddr = net.JoinHostPort(a.config.Addresses.RPC, strconv.Itoa(a.config.Ports.RPC))
} else if a.config.BindAddr != "" {
a.serverRPCAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.RPC))
} else {
a.serverRPCAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.RPC))
}
addr, err = net.ResolveTCPAddr("tcp", a.serverRPCAddr)
// Set up the advertise addresses
httpAddr, err := a.getHTTPAddr(false)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %+q: %v", a.serverRPCAddr, err)
return nil, err
}
a.serverRPCAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))

// Resolve the Server's Serf Address
if a.config.AdvertiseAddrs.Serf != "" {
a.serverSerfAddr = a.config.AdvertiseAddrs.Serf
} else if a.config.Addresses.Serf != "" {
a.serverSerfAddr = net.JoinHostPort(a.config.Addresses.Serf, strconv.Itoa(a.config.Ports.Serf))
} else if a.config.BindAddr != "" {
a.serverSerfAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.Serf))
} else {
a.serverSerfAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.Serf))
rpcAddr, err = a.getRPCAddr(false)
if err != nil {
return nil, err
}
addr, err = net.ResolveTCPAddr("tcp", a.serverSerfAddr)
serfAddr, err = a.getSerfAddr(false)
if err != nil {
return nil, fmt.Errorf("error resolving Serf addr %+q: %v", a.serverSerfAddr, err)
return nil, err
}
a.serverSerfAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
a.serverHTTPAddr = net.JoinHostPort(httpAddr.IP.String(), strconv.Itoa(httpAddr.Port))
a.serverRPCAddr = net.JoinHostPort(rpcAddr.IP.String(), strconv.Itoa(rpcAddr.Port))
a.serverSerfAddr = net.JoinHostPort(serfAddr.IP.String(), strconv.Itoa(serfAddr.Port))
conf.RPCAdvertise = rpcAddr
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port

// Set up gc threshold and heartbeat grace period
if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
Expand Down Expand Up @@ -317,22 +266,11 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
conf.Node.Meta = a.config.Client.Meta
conf.Node.NodeClass = a.config.Client.NodeClass

// Resolve the Client's HTTP address
if a.config.AdvertiseAddrs.HTTP != "" {
a.clientHTTPAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.clientHTTPAddr = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
} else if a.config.BindAddr != "" {
a.clientHTTPAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.HTTP))
} else {
a.clientHTTPAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.HTTP))
}
addr, err := net.ResolveTCPAddr("tcp", a.clientHTTPAddr)
// Set up the HTTP advertise address
httpAddr, err := a.selectAddr(a.getHTTPAddr, false)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %+q: %v", a.clientHTTPAddr, err)
return nil, err
}
httpAddr := net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))

conf.Node.HTTPAddr = httpAddr
a.clientHTTPAddr = httpAddr

Expand Down Expand Up @@ -392,6 +330,20 @@ func (a *Agent) setupServer() error {
}
a.server = server

// Resolve consul check addresses. Always use advertise address for services
httpCheckAddr, err := a.selectAddr(a.getHTTPAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}
rpcCheckAddr, err := a.selectAddr(a.getRPCAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}
serfCheckAddr, err := a.selectAddr(a.getSerfAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}

// Create the Nomad Server services for Consul
// TODO re-introduce HTTP/S checks when Consul 0.7.1 comes out
if a.config.Consul.AutoAdvertise {
Expand All @@ -401,10 +353,11 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagHTTP},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server HTTP Check",
Type: "tcp",
Interval: serverHttpCheckInterval,
Timeout: serverHttpCheckTimeout,
Name: "Nomad Server HTTP Check",
Type: "tcp",
Interval: serverHttpCheckInterval,
Timeout: serverHttpCheckTimeout,
PortLabel: httpCheckAddr,
},
},
}
Expand All @@ -414,10 +367,11 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagRPC},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server RPC Check",
Type: "tcp",
Interval: serverRpcCheckInterval,
Timeout: serverRpcCheckTimeout,
Name: "Nomad Server RPC Check",
Type: "tcp",
Interval: serverRpcCheckInterval,
Timeout: serverRpcCheckTimeout,
PortLabel: rpcCheckAddr,
},
},
}
Expand All @@ -427,13 +381,15 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagSerf},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server Serf Check",
Type: "tcp",
Interval: serverSerfCheckInterval,
Timeout: serverSerfCheckTimeout,
Name: "Nomad Server Serf Check",
Type: "tcp",
Interval: serverSerfCheckInterval,
Timeout: serverSerfCheckTimeout,
PortLabel: serfCheckAddr,
},
},
}

a.consulSyncer.SetServices(consul.ServerDomain, map[consul.ServiceKey]*structs.Service{
consul.GenerateServiceKey(httpServ): httpServ,
consul.GenerateServiceKey(rpcServ): rpcServ,
Expand Down Expand Up @@ -494,6 +450,12 @@ func (a *Agent) setupClient() error {
}
a.client = client

// Resolve the http check address
httpCheckAddr, err := a.selectAddr(a.getHTTPAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}

// Create the Nomad Client services for Consul
// TODO think how we can re-introduce HTTP/S checks when Consul 0.7.1 comes
// out
Expand All @@ -504,10 +466,11 @@ func (a *Agent) setupClient() error {
Tags: []string{consul.ServiceTagHTTP},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Client HTTP Check",
Type: "tcp",
Interval: clientHttpCheckInterval,
Timeout: clientHttpCheckTimeout,
Name: "Nomad Client HTTP Check",
Type: "tcp",
Interval: clientHttpCheckInterval,
Timeout: clientHttpCheckTimeout,
PortLabel: httpCheckAddr,
},
},
}
Expand All @@ -519,6 +482,86 @@ func (a *Agent) setupClient() error {
return nil
}

// Defines the selector interface
type addrSelector func(bool) (*net.TCPAddr, error)

// selectAddr returns the right address given a selector, and return it as a PortLabel
// preferBind is a weak preference, and will skip 0.0.0.0
func (a *Agent) selectAddr(selector addrSelector, preferBind bool) (string, error) {
addr, err := selector(preferBind)
if err != nil {
return "", err
}

if preferBind && addr.IP.String() == "0.0.0.0" {
addr, err = selector(false)
if err != nil {
return "", err
}
}

address := net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
return address, nil
}

// getHTTPAddr returns the HTTP address to use based on the clients
// configuration. If bind is true, an address appropriate for binding is
// returned, otherwise an address for advertising is returned. Skip 0.0.0.0
// unless returning a bind address, since that's the only time it's useful.
func (a *Agent) getHTTPAddr(bind bool) (*net.TCPAddr, error) {
advertAddr := a.config.AdvertiseAddrs.HTTP
bindAddr := a.config.Addresses.HTTP
globalBindAddr := a.config.BindAddr
port := a.config.Ports.HTTP
return pickAddress(bind, globalBindAddr, advertAddr, bindAddr, port, "HTTP")
}

// getRPCAddr returns the HTTP address to use based on the clients
// configuration. If bind is true, an address appropriate for binding is
// returned, otherwise an address for advertising is returned. Skip 0.0.0.0
// unless returning a bind address, since that's the only time it's useful.
func (a *Agent) getRPCAddr(bind bool) (*net.TCPAddr, error) {
advertAddr := a.config.AdvertiseAddrs.RPC
bindAddr := a.config.Addresses.RPC
globalBindAddr := a.config.BindAddr
port := a.config.Ports.RPC
return pickAddress(bind, globalBindAddr, advertAddr, bindAddr, port, "RPC")
}

// getSerfAddr returns the Serf address to use based on the clients
// configuration. If bind is true, an address appropriate for binding is
// returned, otherwise an address for advertising is returned. Skip 0.0.0.0
// unless returning a bind address, since that's the only time it's useful.
func (a *Agent) getSerfAddr(bind bool) (*net.TCPAddr, error) {
advertAddr := a.config.AdvertiseAddrs.Serf
bindAddr := a.config.Addresses.Serf
globalBindAddr := a.config.BindAddr
port := a.config.Ports.Serf
return pickAddress(bind, globalBindAddr, advertAddr, bindAddr, port, "RPC")
}

// pickAddress is a shared helper to pick the address to either bind to or
// advertise.
func pickAddress(bind bool, globalBindAddr, advertiseAddr, bindAddr string, port int, service string) (*net.TCPAddr, error) {
portConverted := strconv.Itoa(port)
var serverAddr string
if advertiseAddr != "" && !bind {
serverAddr = advertiseAddr
} else if bindAddr != "" && !(bindAddr == "0.0.0.0" && !bind) {
serverAddr = net.JoinHostPort(bindAddr, portConverted)
} else if globalBindAddr != "" && !(globalBindAddr == "0.0.0.0" && !bind) {
serverAddr = net.JoinHostPort(globalBindAddr, portConverted)
} else {
serverAddr = net.JoinHostPort("127.0.0.1", portConverted)
}

addr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return nil, fmt.Errorf("error resolving %s addr %+q: %v", service, serverAddr, err)
}
return addr, nil
}

// reservePortsForClient reserves a range of ports for the client to use when
// it creates various plugins for log collection, executors, drivers, etc
func (a *Agent) reservePortsForClient(conf *clientconfig.Config) error {
Expand Down
Loading

0 comments on commit 4fc7e30

Please sign in to comment.