Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bind address for consul checks #1866

Merged
merged 8 commits into from
Oct 27, 2016
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 148 additions & 105 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,92 +134,41 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
conf.EnabledSchedulers = a.config.Server.EnabledSchedulers
}

// Set up the advertise addrs
if addr := a.config.AdvertiseAddrs.Serf; addr != "" {
serfAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving serf advertise address: %s", err)
}
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port
}
if addr := a.config.AdvertiseAddrs.RPC; addr != "" {
rpcAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving rpc advertise address: %s", err)
}
conf.RPCAdvertise = rpcAddr
}

// Set up the bind addresses
if addr := a.config.BindAddr; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
if addr := a.config.Addresses.RPC; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
}

if addr := a.config.Addresses.Serf; addr != "" {
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}

// Set up the ports
if port := a.config.Ports.RPC; port != 0 {
conf.RPCAddr.Port = port
}
if port := a.config.Ports.Serf; port != 0 {
conf.SerfConfig.MemberlistConfig.BindPort = port
}

// Resolve the Server's HTTP Address
if a.config.AdvertiseAddrs.HTTP != "" {
a.serverHTTPAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.serverHTTPAddr = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
} else if a.config.BindAddr != "" {
a.serverHTTPAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.HTTP))
} else {
a.serverHTTPAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.HTTP))
rpcAddr, err := a.getRPCAddr(true)
if err != nil {
return nil, err
}
addr, err := net.ResolveTCPAddr("tcp", a.serverHTTPAddr)
serfAddr, err := a.getSerfAddr(true)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %+q: %v", a.serverHTTPAddr, err)
return nil, err
}
a.serverHTTPAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
conf.RPCAddr.Port = rpcAddr.Port
conf.RPCAddr.IP = rpcAddr.IP
conf.SerfConfig.MemberlistConfig.BindPort = serfAddr.Port
conf.SerfConfig.MemberlistConfig.BindAddr = serfAddr.IP.String()

// Resolve the Server's RPC Address
if a.config.AdvertiseAddrs.RPC != "" {
a.serverRPCAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
a.serverRPCAddr = net.JoinHostPort(a.config.Addresses.RPC, strconv.Itoa(a.config.Ports.RPC))
} else if a.config.BindAddr != "" {
a.serverRPCAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.RPC))
} else {
a.serverRPCAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.RPC))
}
addr, err = net.ResolveTCPAddr("tcp", a.serverRPCAddr)
// Set up the advertise addresses
httpAddr, err := a.getHTTPAddr(false)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %+q: %v", a.serverRPCAddr, err)
return nil, err
}
a.serverRPCAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))

// Resolve the Server's Serf Address
if a.config.AdvertiseAddrs.Serf != "" {
a.serverSerfAddr = a.config.AdvertiseAddrs.Serf
} else if a.config.Addresses.Serf != "" {
a.serverSerfAddr = net.JoinHostPort(a.config.Addresses.Serf, strconv.Itoa(a.config.Ports.Serf))
} else if a.config.BindAddr != "" {
a.serverSerfAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.Serf))
} else {
a.serverSerfAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.Serf))
rpcAddr, err = a.getRPCAddr(false)
if err != nil {
return nil, err
}
addr, err = net.ResolveTCPAddr("tcp", a.serverSerfAddr)
serfAddr, err = a.getSerfAddr(false)
if err != nil {
return nil, fmt.Errorf("error resolving Serf addr %+q: %v", a.serverSerfAddr, err)
return nil, err
}
a.serverSerfAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
a.serverHTTPAddr = net.JoinHostPort(httpAddr.IP.String(), strconv.Itoa(httpAddr.Port))
a.serverRPCAddr = net.JoinHostPort(rpcAddr.IP.String(), strconv.Itoa(rpcAddr.Port))
a.serverSerfAddr = net.JoinHostPort(serfAddr.IP.String(), strconv.Itoa(serfAddr.Port))
conf.RPCAdvertise = rpcAddr
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port

// Set up gc threshold and heartbeat grace period
if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
Expand Down Expand Up @@ -317,22 +266,11 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
conf.Node.Meta = a.config.Client.Meta
conf.Node.NodeClass = a.config.Client.NodeClass

// Resolve the Client's HTTP address
if a.config.AdvertiseAddrs.HTTP != "" {
a.clientHTTPAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.clientHTTPAddr = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
} else if a.config.BindAddr != "" {
a.clientHTTPAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.HTTP))
} else {
a.clientHTTPAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.HTTP))
}
addr, err := net.ResolveTCPAddr("tcp", a.clientHTTPAddr)
// Set up the HTTP advertise address
httpAddr, err := a.selectAddr(a.getHTTPAddr, false)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %+q: %v", a.clientHTTPAddr, err)
return nil, err
}
httpAddr := net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))

conf.Node.HTTPAddr = httpAddr
a.clientHTTPAddr = httpAddr

Expand Down Expand Up @@ -392,6 +330,20 @@ func (a *Agent) setupServer() error {
}
a.server = server

// Resolve consul check addresses. Always use advertise address for services
httpCheckAddr, err := a.selectAddr(a.getHTTPAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}
rpcCheckAddr, err := a.selectAddr(a.getRPCAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}
serfCheckAddr, err := a.selectAddr(a.getSerfAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}

// Create the Nomad Server services for Consul
// TODO re-introduce HTTP/S checks when Consul 0.7.1 comes out
if a.config.Consul.AutoAdvertise {
Expand All @@ -401,10 +353,11 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagHTTP},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server HTTP Check",
Type: "tcp",
Interval: serverHttpCheckInterval,
Timeout: serverHttpCheckTimeout,
Name: "Nomad Server HTTP Check",
Type: "tcp",
Interval: serverHttpCheckInterval,
Timeout: serverHttpCheckTimeout,
PortLabel: httpCheckAddr,
},
},
}
Expand All @@ -414,10 +367,11 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagRPC},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server RPC Check",
Type: "tcp",
Interval: serverRpcCheckInterval,
Timeout: serverRpcCheckTimeout,
Name: "Nomad Server RPC Check",
Type: "tcp",
Interval: serverRpcCheckInterval,
Timeout: serverRpcCheckTimeout,
PortLabel: rpcCheckAddr,
},
},
}
Expand All @@ -427,13 +381,15 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagSerf},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server Serf Check",
Type: "tcp",
Interval: serverSerfCheckInterval,
Timeout: serverSerfCheckTimeout,
Name: "Nomad Server Serf Check",
Type: "tcp",
Interval: serverSerfCheckInterval,
Timeout: serverSerfCheckTimeout,
PortLabel: serfCheckAddr,
},
},
}

a.consulSyncer.SetServices(consul.ServerDomain, map[consul.ServiceKey]*structs.Service{
consul.GenerateServiceKey(httpServ): httpServ,
consul.GenerateServiceKey(rpcServ): rpcServ,
Expand Down Expand Up @@ -494,6 +450,12 @@ func (a *Agent) setupClient() error {
}
a.client = client

// Resolve the http check address
httpCheckAddr, err := a.selectAddr(a.getHTTPAddr, !a.config.Consul.ChecksUseAdvertise)
if err != nil {
return err
}

// Create the Nomad Client services for Consul
// TODO think how we can re-introduce HTTP/S checks when Consul 0.7.1 comes
// out
Expand All @@ -504,10 +466,11 @@ func (a *Agent) setupClient() error {
Tags: []string{consul.ServiceTagHTTP},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Client HTTP Check",
Type: "tcp",
Interval: clientHttpCheckInterval,
Timeout: clientHttpCheckTimeout,
Name: "Nomad Client HTTP Check",
Type: "tcp",
Interval: clientHttpCheckInterval,
Timeout: clientHttpCheckTimeout,
PortLabel: httpCheckAddr,
},
},
}
Expand All @@ -519,6 +482,86 @@ func (a *Agent) setupClient() error {
return nil
}

// Defines the selector interface
type addrSelector func(bool) (*net.TCPAddr, error)

// selectAddr returns the right address given a selector, and return it as a PortLabel
// preferBind is a weak preference, and will skip 0.0.0.0
func (a *Agent) selectAddr(selector addrSelector, preferBind bool) (string, error) {
addr, err := selector(preferBind)
if err != nil {
return "", err
}

if preferBind && addr.IP.String() == "0.0.0.0" {
addr, err = selector(false)
if err != nil {
return "", err
}
}

address := net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
return address, nil
}

// getHTTPAddr returns the HTTP address to use based on the clients
// configuration. If bind is true, an address appropriate for binding is
// returned, otherwise an address for advertising is returned. Skip 0.0.0.0
// unless returning a bind address, since that's the only time it's useful.
func (a *Agent) getHTTPAddr(bind bool) (*net.TCPAddr, error) {
advertAddr := a.config.AdvertiseAddrs.HTTP
bindAddr := a.config.Addresses.HTTP
globalBindAddr := a.config.BindAddr
port := a.config.Ports.HTTP
return pickAddress(bind, globalBindAddr, advertAddr, bindAddr, port, "HTTP")
}

// getRPCAddr returns the HTTP address to use based on the clients
// configuration. If bind is true, an address appropriate for binding is
// returned, otherwise an address for advertising is returned. Skip 0.0.0.0
// unless returning a bind address, since that's the only time it's useful.
func (a *Agent) getRPCAddr(bind bool) (*net.TCPAddr, error) {
advertAddr := a.config.AdvertiseAddrs.RPC
bindAddr := a.config.Addresses.RPC
globalBindAddr := a.config.BindAddr
port := a.config.Ports.RPC
return pickAddress(bind, globalBindAddr, advertAddr, bindAddr, port, "RPC")
}

// getSerfAddr returns the Serf address to use based on the clients
// configuration. If bind is true, an address appropriate for binding is
// returned, otherwise an address for advertising is returned. Skip 0.0.0.0
// unless returning a bind address, since that's the only time it's useful.
func (a *Agent) getSerfAddr(bind bool) (*net.TCPAddr, error) {
advertAddr := a.config.AdvertiseAddrs.Serf
bindAddr := a.config.Addresses.Serf
globalBindAddr := a.config.BindAddr
port := a.config.Ports.Serf
return pickAddress(bind, globalBindAddr, advertAddr, bindAddr, port, "RPC")
}

// pickAddress is a shared helper to pick the address to either bind to or
// advertise.
func pickAddress(bind bool, globalBindAddr, advertiseAddr, bindAddr string, port int, service string) (*net.TCPAddr, error) {
portConverted := strconv.Itoa(port)
var serverAddr string
if advertiseAddr != "" && !bind {
serverAddr = advertiseAddr
} else if bindAddr != "" && !(bindAddr == "0.0.0.0" && !bind) {
serverAddr = net.JoinHostPort(bindAddr, portConverted)
} else if globalBindAddr != "" && !(globalBindAddr == "0.0.0.0" && !bind) {
serverAddr = net.JoinHostPort(globalBindAddr, portConverted)
} else {
serverAddr = net.JoinHostPort("127.0.0.1", portConverted)
}

addr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return nil, fmt.Errorf("error resolving %s addr %+q: %v", service, serverAddr, err)
}
return addr, nil
}

// reservePortsForClient reserves a range of ports for the client to use when
// it creates various plugins for log collection, executors, drivers, etc
func (a *Agent) reservePortsForClient(conf *clientconfig.Config) error {
Expand Down
Loading