diff --git a/.changelog/13651.txt b/.changelog/13651.txt new file mode 100644 index 000000000000..7cce7bb42f2e --- /dev/null +++ b/.changelog/13651.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fixed a bug where reserved ports on multiple node networks would be treated as a collision. `client.reserved.reserved_ports` is now merged into each `host_network`'s reserved ports instead of being treated as a collision. +``` diff --git a/command/agent/command.go b/command/agent/command.go index eef8e97d2e0c..33cd0ad09089 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -399,6 +399,7 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool { } for _, hn := range config.Client.HostNetworks { + // Ensure port range is valid if _, err := structs.ParsePortRanges(hn.ReservedPorts); err != nil { c.Ui.Error(fmt.Sprintf("host_network[%q].reserved_ports %q invalid: %v", hn.Name, hn.ReservedPorts, err)) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index a94524ec44dd..e6128fa6ab6e 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -207,8 +207,11 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi netIdx = NewNetworkIndex() defer netIdx.Release() - if collision, reason := netIdx.SetNode(node); collision { - return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil + if err := netIdx.SetNode(node); err != nil { + // To maintain backward compatibility with when SetNode + // returned collision+reason like AddAllocs, return + // this as a reason instead of an error. + return false, fmt.Sprintf("reserved node port collision: %v", err), used, nil } if collision, reason := netIdx.AddAllocs(allocs); collision { return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil @@ -530,6 +533,10 @@ func ParsePortRanges(spec string) ([]uint64, error) { if err != nil { return nil, err } + + if port > MaxValidPort { + return nil, fmt.Errorf("port must be < %d but found %d", MaxValidPort, port) + } ports[port] = struct{}{} } case 2: diff --git a/nomad/structs/network.go b/nomad/structs/network.go index 30e7693236be..c5ba35a830fd 100644 --- a/nomad/structs/network.go +++ b/nomad/structs/network.go @@ -36,13 +36,33 @@ var ( // NetworkIndex is used to index the available network resources // and the used network resources on a machine given allocations +// +// Fields are exported so they may be JSON serialized for debugging. +// Fields are *not* intended to be used directly. type NetworkIndex struct { - AvailNetworks []*NetworkResource // List of available networks - NodeNetworks []*NodeNetworkResource // List of available node networks - AvailAddresses map[string][]NodeNetworkAddress // Map of host network aliases to list of addresses - AvailBandwidth map[string]int // Bandwidth by device - UsedPorts map[string]Bitmap // Ports by IP - UsedBandwidth map[string]int // Bandwidth by device + // TaskNetworks are the node networks available for + // task.resources.network asks. + TaskNetworks []*NetworkResource + + // GroupNetworks are the node networks available for group.network + // asks. + GroupNetworks []*NodeNetworkResource + + // HostNetworks indexes addresses by host network alias + HostNetworks map[string][]NodeNetworkAddress + + // UsedPorts tracks which ports are used on a per-IP address basis. For + // example if a node has `network_interface=lo` and port 22 reserved, + // then on a dual stack loopback interface UsedPorts would contain: + // { + // "127.0.0.1": Bitmap{22}, + // "::1": Bitmap{22}, + // } + UsedPorts map[string]Bitmap + + // Deprecated bandwidth fields + AvailBandwidth map[string]int // Bandwidth by device + UsedBandwidth map[string]int // Bandwidth by device MinDynamicPort int // The smallest dynamic port generated MaxDynamicPort int // The largest dynamic port generated @@ -51,9 +71,9 @@ type NetworkIndex struct { // NewNetworkIndex is used to construct a new network index func NewNetworkIndex() *NetworkIndex { return &NetworkIndex{ - AvailAddresses: make(map[string][]NodeNetworkAddress), - AvailBandwidth: make(map[string]int), + HostNetworks: make(map[string][]NodeNetworkAddress), UsedPorts: make(map[string]Bitmap), + AvailBandwidth: make(map[string]int), UsedBandwidth: make(map[string]int), MinDynamicPort: DefaultMinDynamicPort, MaxDynamicPort: DefaultMaxDynamicPort, @@ -84,9 +104,9 @@ func (idx *NetworkIndex) Copy() *NetworkIndex { c := new(NetworkIndex) *c = *idx - c.AvailNetworks = copyNetworkResources(idx.AvailNetworks) - c.NodeNetworks = copyNodeNetworks(idx.NodeNetworks) - c.AvailAddresses = copyAvailAddresses(idx.AvailAddresses) + c.TaskNetworks = copyNetworkResources(idx.TaskNetworks) + c.GroupNetworks = copyNodeNetworks(idx.GroupNetworks) + c.HostNetworks = copyAvailAddresses(idx.HostNetworks) if idx.AvailBandwidth != nil && len(idx.AvailBandwidth) == 0 { c.AvailBandwidth = make(map[string]int) } else { @@ -171,61 +191,141 @@ func (idx *NetworkIndex) Overcommitted() bool { return false } -// SetNode is used to setup the available network resources. Returns -// true if there is a collision -func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) { - - // COMPAT(0.11): Remove in 0.11 - // Grab the network resources, handling both new and old - var networks []*NetworkResource +// SetNode is used to initialize a node's network index with available IPs, +// reserved ports, and other details from a node's configuration and +// fingerprinting. +// +// SetNode must be idempotent as preemption causes SetNode to be called +// multiple times on the same NetworkIndex, only clearing UsedPorts between +// calls. +// +// An error is returned if the Node cannot produce a consistent NetworkIndex +// such as if reserved_ports are unparseable. +// +// Any errors returned by SetNode indicate a bug! The bug may lie in client +// code not properly validating its configuration or it may lie in improper +// Node object handling by servers. Users should not be able to cause SetNode +// to error. Data that cause SetNode to error should be caught upstream such as +// a client agent refusing to start with an invalid configuration. +func (idx *NetworkIndex) SetNode(node *Node) error { + + // COMPAT(0.11): Deprecated. taskNetworks are only used for + // task.resources.network asks which have been deprecated since before + // 0.11. + // Grab the network resources, handling both new and old Node layouts + // from clients. + var taskNetworks []*NetworkResource if node.NodeResources != nil && len(node.NodeResources.Networks) != 0 { - networks = node.NodeResources.Networks + taskNetworks = node.NodeResources.Networks } else if node.Resources != nil { - networks = node.Resources.Networks + taskNetworks = node.Resources.Networks } - var nodeNetworks []*NodeNetworkResource - if node.NodeResources != nil && len(node.NodeResources.NodeNetworks) != 0 { - nodeNetworks = node.NodeResources.NodeNetworks + // Reserved ports get merged downward. For example given an agent + // config: + // + // client.reserved.reserved_ports = "22" + // client.host_network["eth0"] = {reserved_ports = "80,443"} + // client.host_network["eth1"] = {reserved_ports = "1-1000"} + // + // Addresses on taskNetworks reserve port 22 + // Addresses on eth0 reserve 22,80,443 (note 22 is also reserved!) + // Addresses on eth1 reserve 1-1000 + globalResPorts := []uint{} + + if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" { + resPorts, err := ParsePortRanges(node.ReservedResources.Networks.ReservedHostPorts) + if err != nil { + // This is a fatal error that should have been + // prevented by client validation. + return fmt.Errorf("error parsing reserved_ports: %w", err) + } + + globalResPorts = make([]uint, len(resPorts)) + for i, p := range resPorts { + globalResPorts[i] = uint(p) + } + } else if node.Reserved != nil { + // COMPAT(0.11): Remove after 0.11. Nodes stopped reporting + // reserved ports under Node.Reserved.Resources in #4750 / v0.9 + for _, n := range node.Reserved.Networks { + used := idx.getUsedPortsFor(n.IP) + for _, ports := range [][]Port{n.ReservedPorts, n.DynamicPorts} { + for _, p := range ports { + if p.Value > MaxValidPort || p.Value < 0 { + // This is a fatal error that + // should have been prevented + // by validation upstream. + return fmt.Errorf("invalid port %d for reserved_ports", p.Value) + } + + globalResPorts = append(globalResPorts, uint(p.Value)) + used.Set(uint(p.Value)) + } + } + + // Reserve mbits + if n.Device != "" { + idx.UsedBandwidth[n.Device] += n.MBits + } + } } - // Add the available CIDR blocks - for _, n := range networks { + // Filter task networks down to those with a device. For example + // taskNetworks may contain a "bridge" interface which has no device + // set and cannot be used to fulfill asks. + for _, n := range taskNetworks { if n.Device != "" { - idx.AvailNetworks = append(idx.AvailNetworks, n) + idx.TaskNetworks = append(idx.TaskNetworks, n) idx.AvailBandwidth[n.Device] = n.MBits + + // Reserve ports + used := idx.getUsedPortsFor(n.IP) + for _, p := range globalResPorts { + used.Set(p) + } } } - // TODO: upgrade path? - // is it possible to get duplicates here? + // nodeNetworks are used for group.network asks. + var nodeNetworks []*NodeNetworkResource + if node.NodeResources != nil && len(node.NodeResources.NodeNetworks) != 0 { + nodeNetworks = node.NodeResources.NodeNetworks + } + for _, n := range nodeNetworks { for _, a := range n.Addresses { - idx.AvailAddresses[a.Alias] = append(idx.AvailAddresses[a.Alias], a) - if c, r := idx.AddReservedPortsForIP(a.ReservedPorts, a.Address); c { - collide = true - reason = fmt.Sprintf("collision when reserving ports for node network %s in node %s: %v", a.Alias, node.ID, r) + // Index host networks by their unique alias for asks + // with group.network.port.host_network set. + idx.HostNetworks[a.Alias] = append(idx.HostNetworks[a.Alias], a) + + // Mark reserved ports as used without worrying about + // collisions. This effectively merges + // client.reserved.reserved_ports into each + // host_network. + used := idx.getUsedPortsFor(a.Address) + for _, p := range globalResPorts { + used.Set(p) } - } - } - // COMPAT(0.11): Remove in 0.11 - // Handle reserving ports, handling both new and old - if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" { - c, r := idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts) - collide = c - if collide { - reason = fmt.Sprintf("collision when reserving port range for node %s: %v", node.ID, r) - } - } else if node.Reserved != nil { - for _, n := range node.Reserved.Networks { - if c, r := idx.AddReserved(n); c { - collide = true - reason = fmt.Sprintf("collision when reserving network %s for node %s: %v", n.IP, node.ID, r) + // If ReservedPorts is set on the NodeNetwork, use it + // and the global reserved ports. + if a.ReservedPorts != "" { + rp, err := ParsePortRanges(a.ReservedPorts) + if err != nil { + // This is a fatal error that should + // have been prevented by validation + // upstream. + return fmt.Errorf("error parsing reserved_ports for network %q: %w", a.Alias, err) + } + for _, p := range rp { + used.Set(uint(p)) + } } } } + // Set dynamic port range (applies to all addresses) if node.NodeResources != nil && node.NodeResources.MinDynamicPort > 0 { idx.MinDynamicPort = node.NodeResources.MinDynamicPort } @@ -234,11 +334,16 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) { idx.MaxDynamicPort = node.NodeResources.MaxDynamicPort } - return + return nil } // AddAllocs is used to add the used network resources. Returns // true if there is a collision +// +// AddAllocs may be called multiple times for the same NetworkIndex with +// UsedPorts cleared between calls (by Release). Therefore AddAllocs must be +// determistic and must not manipulate state outside of UsedPorts as that state +// would persist between Release calls. func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool, reason string) { for _, alloc := range allocs { // Do not consider the resource impact of terminal allocations @@ -338,51 +443,11 @@ func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool, r return } -// AddReservedPortRange marks the ports given as reserved on all network -// interfaces. The port format is comma delimited, with spans given as n1-n2 -// (80,100-200,205) -func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool, reasons []string) { - // Convert the ports into a slice of ints - resPorts, err := ParsePortRanges(ports) - if err != nil { - return - } - - // Ensure we create a bitmap for each available network - for _, n := range idx.AvailNetworks { - idx.getUsedPortsFor(n.IP) - } - - for _, used := range idx.UsedPorts { - for _, port := range resPorts { - // Guard against invalid port - if port >= MaxValidPort { - return true, []string{fmt.Sprintf("invalid port %d", port)} - } - if used.Check(uint(port)) { - collide = true - reason := fmt.Sprintf("port %d already in use", port) - reasons = append(reasons, reason) - } else { - used.Set(uint(port)) - } - } - } - - return -} - // AddReservedPortsForIP checks whether any reserved ports collide with those // in use for the IP address. -func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool, reasons []string) { - // Convert the ports into a slice of ints - resPorts, err := ParsePortRanges(ports) - if err != nil { - return - } - +func (idx *NetworkIndex) AddReservedPortsForIP(ports []uint64, ip string) (collide bool, reasons []string) { used := idx.getUsedPortsFor(ip) - for _, port := range resPorts { + for _, port := range ports { // Guard against invalid port if port >= MaxValidPort { return true, []string{fmt.Sprintf("invalid port %d", port)} @@ -401,22 +466,13 @@ func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide // yieldIP is used to iteratively invoke the callback with // an available IP -func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) { - inc := func(ip net.IP) { - for j := len(ip) - 1; j >= 0; j-- { - ip[j]++ - if ip[j] > 0 { - break - } - } - } - - for _, n := range idx.AvailNetworks { +func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, offerIP net.IP) bool) { + for _, n := range idx.TaskNetworks { ip, ipnet, err := net.ParseCIDR(n.CIDR) if err != nil { continue } - for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); incIP(ip) { if cb(n, ip) { return } @@ -424,6 +480,26 @@ func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) } } +func incIP(ip net.IP) { + // Iterate over IP octects from right to left + for j := len(ip) - 1; j >= 0; j-- { + + // Increment octect + ip[j]++ + + // If this octect did not wrap around to 0, it's the next IP to + // try. If it did wrap (p[j]==0), then the next octect is + // incremented. + if ip[j] > 0 { + break + } + } +} + +// AssignPorts based on an ask from the scheduler processing a group.network +// stanza. Supports multi-interfaces through node configured host_networks. +// +// AssignTaskNetwork supports the deprecated task.resources.network stanza. func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, error) { var offer AllocatedPorts @@ -437,7 +513,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro // if allocPort is still nil after the loop, the port wasn't available for reservation var allocPort *AllocatedPortMapping var addrErr error - for _, addr := range idx.AvailAddresses[port.HostNetwork] { + for _, addr := range idx.HostNetworks[port.HostNetwork] { used := idx.getUsedPortsFor(addr.Address) // Guard against invalid port if port.Value < 0 || port.Value >= MaxValidPort { @@ -472,7 +548,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro for _, port := range ask.DynamicPorts { var allocPort *AllocatedPortMapping var addrErr error - for _, addr := range idx.AvailAddresses[port.HostNetwork] { + for _, addr := range idx.HostNetworks[port.HostNetwork] { used := idx.getUsedPortsFor(addr.Address) // Try to stochastically pick the dynamic ports as it is faster and // lower memory usage. @@ -512,13 +588,18 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro return offer, nil } -// AssignNetwork is used to assign network resources given an ask. -// If the ask cannot be satisfied, returns nil -func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) { +// AssignTaskNetwork is used to offer network resources given a +// task.resources.network ask. If the ask cannot be satisfied, returns nil +// +// AssignTaskNetwork and task.resources.network are deprecated in favor of +// AssignPorts and group.network. AssignTaskNetwork does not support multiple +// interfaces and only uses the node's default interface. AssignPorts is the +// method that is used for group.network asks. +func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkResource, err error) { err = fmt.Errorf("no networks available") - idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) { + idx.yieldIP(func(n *NetworkResource, offerIP net.IP) (stop bool) { // Convert the IP to a string - ipStr := ip.String() + offerIPStr := offerIP.String() // Check if we would exceed the bandwidth cap availBandwidth := idx.AvailBandwidth[n.Device] @@ -528,7 +609,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour return } - used := idx.UsedPorts[ipStr] + used := idx.UsedPorts[offerIPStr] // Check if any of the reserved ports are in use for _, port := range ask.ReservedPorts { @@ -549,7 +630,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour offer := &NetworkResource{ Mode: ask.Mode, Device: n.Device, - IP: ipStr, + IP: offerIPStr, MBits: ask.MBits, DNS: ask.DNS, ReservedPorts: ask.ReservedPorts, diff --git a/nomad/structs/network_test.go b/nomad/structs/network_test.go index ccb2900c10bd..9dcfd91d7478 100644 --- a/nomad/structs/network_test.go +++ b/nomad/structs/network_test.go @@ -189,20 +189,10 @@ func TestNetworkIndex_SetNode(t *testing.T) { }, }, } - collide, reason := idx.SetNode(n) - if collide || reason != "" { - t.Fatalf("bad") - } - - if len(idx.AvailNetworks) != 1 { - t.Fatalf("Bad") - } - if idx.AvailBandwidth["eth0"] != 1000 { - t.Fatalf("Bad") - } - if !idx.UsedPorts["192.168.0.100"].Check(22) { - t.Fatalf("Bad") - } + require.NoError(t, idx.SetNode(n)) + require.Len(t, idx.TaskNetworks, 1) + require.Equal(t, 1000, idx.AvailBandwidth["eth0"]) + require.True(t, idx.UsedPorts["192.168.0.100"].Check(22)) } func TestNetworkIndex_AddAllocs(t *testing.T) { @@ -327,7 +317,7 @@ func TestNetworkIndex_yieldIP(t *testing.T) { } } -func TestNetworkIndex_AssignNetwork(t *testing.T) { +func TestNetworkIndex_AssignTaskNetwork(t *testing.T) { ci.Parallel(t) idx := NewNetworkIndex() n := &Node{ @@ -379,7 +369,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) { ask := &NetworkResource{ ReservedPorts: []Port{{"main", 8000, 0, ""}}, } - offer, err := idx.AssignNetwork(ask) + offer, err := idx.AssignTaskNetwork(ask) require.NoError(t, err) require.NotNil(t, offer) require.Equal(t, "192.168.0.101", offer.IP) @@ -391,7 +381,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) { ask = &NetworkResource{ DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, -1, ""}}, } - offer, err = idx.AssignNetwork(ask) + offer, err = idx.AssignTaskNetwork(ask) require.NoError(t, err) require.NotNil(t, offer) require.Equal(t, "192.168.0.100", offer.IP) @@ -410,7 +400,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) { ReservedPorts: []Port{{"main", 2345, 0, ""}}, DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}}, } - offer, err = idx.AssignNetwork(ask) + offer, err = idx.AssignTaskNetwork(ask) require.NoError(t, err) require.NotNil(t, offer) require.Equal(t, "192.168.0.100", offer.IP) @@ -423,7 +413,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) { ask = &NetworkResource{ MBits: 1000, } - offer, err = idx.AssignNetwork(ask) + offer, err = idx.AssignTaskNetwork(ask) require.Error(t, err) require.Equal(t, "bandwidth exceeded", err.Error()) require.Nil(t, offer) @@ -431,7 +421,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) { // This test ensures that even with a small domain of available ports we are // able to make a dynamic port allocation. -func TestNetworkIndex_AssignNetwork_Dynamic_Contention(t *testing.T) { +func TestNetworkIndex_AssignTaskNetwork_Dynamic_Contention(t *testing.T) { ci.Parallel(t) // Create a node that only has one free port @@ -459,7 +449,7 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention(t *testing.T) { ask := &NetworkResource{ DynamicPorts: []Port{{"http", 0, 80, ""}}, } - offer, err := idx.AssignNetwork(ask) + offer, err := idx.AssignTaskNetwork(ask) if err != nil { t.Fatalf("err: %v", err) } @@ -503,23 +493,11 @@ func TestNetworkIndex_SetNode_Old(t *testing.T) { }, }, } - collide, reason := idx.SetNode(n) - if collide || reason != "" { - t.Fatalf("bad") - } - - if len(idx.AvailNetworks) != 1 { - t.Fatalf("Bad") - } - if idx.AvailBandwidth["eth0"] != 1000 { - t.Fatalf("Bad") - } - if idx.UsedBandwidth["eth0"] != 1 { - t.Fatalf("Bad") - } - if !idx.UsedPorts["192.168.0.100"].Check(22) { - t.Fatalf("Bad") - } + require.NoError(t, idx.SetNode(n)) + require.Len(t, idx.TaskNetworks, 1) + require.Equal(t, 1000, idx.AvailBandwidth["eth0"]) + require.Equal(t, 1, idx.UsedBandwidth["eth0"]) + require.True(t, idx.UsedPorts["192.168.0.100"].Check(22)) } // COMPAT(0.11): Remove in 0.11 @@ -618,7 +596,7 @@ func TestNetworkIndex_yieldIP_Old(t *testing.T) { } // COMPAT(0.11): Remove in 0.11 -func TestNetworkIndex_AssignNetwork_Old(t *testing.T) { +func TestNetworkIndex_AssignTaskNetwork_Old(t *testing.T) { ci.Parallel(t) idx := NewNetworkIndex() @@ -681,7 +659,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) { ask := &NetworkResource{ ReservedPorts: []Port{{"main", 8000, 0, ""}}, } - offer, err := idx.AssignNetwork(ask) + offer, err := idx.AssignTaskNetwork(ask) if err != nil { t.Fatalf("err: %v", err) } @@ -700,7 +678,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) { ask = &NetworkResource{ DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}}, } - offer, err = idx.AssignNetwork(ask) + offer, err = idx.AssignTaskNetwork(ask) if err != nil { t.Fatalf("err: %v", err) } @@ -724,7 +702,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) { ReservedPorts: []Port{{"main", 2345, 0, ""}}, DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}}, } - offer, err = idx.AssignNetwork(ask) + offer, err = idx.AssignTaskNetwork(ask) if err != nil { t.Fatalf("err: %v", err) } @@ -744,7 +722,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) { ask = &NetworkResource{ MBits: 1000, } - offer, err = idx.AssignNetwork(ask) + offer, err = idx.AssignTaskNetwork(ask) if err.Error() != "bandwidth exceeded" { t.Fatalf("err: %v", err) } @@ -756,7 +734,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) { // COMPAT(0.11): Remove in 0.11 // This test ensures that even with a small domain of available ports we are // able to make a dynamic port allocation. -func TestNetworkIndex_AssignNetwork_Dynamic_Contention_Old(t *testing.T) { +func TestNetworkIndex_AssignTaskNetwork_Dynamic_Contention_Old(t *testing.T) { ci.Parallel(t) // Create a node that only has one free port @@ -791,7 +769,7 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention_Old(t *testing.T) { ask := &NetworkResource{ DynamicPorts: []Port{{"http", 0, 80, ""}}, } - offer, err := idx.AssignNetwork(ask) + offer, err := idx.AssignTaskNetwork(ask) if err != nil { t.Fatalf("err: %v", err) } @@ -811,7 +789,7 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention_Old(t *testing.T) { func TestIntContains(t *testing.T) { ci.Parallel(t) - + l := []int{1, 2, 10, 20} if isPortReserved(l, 50) { t.Fatalf("bad") @@ -823,3 +801,116 @@ func TestIntContains(t *testing.T) { t.Fatalf("bad") } } + +func TestNetworkIndex_SetNode_HostNets(t *testing.T) { + ci.Parallel(t) + + idx := NewNetworkIndex() + n := &Node{ + NodeResources: &NodeResources{ + Networks: []*NetworkResource{ + // As of Nomad v1.3 bridge networks get + // registered with only their mode set. + { + Mode: "bridge", + }, + + // Localhost (agent interface) + { + CIDR: "127.0.0.1/32", + Device: "lo", + IP: "127.0.0.1", + MBits: 1000, + Mode: "host", + }, + { + CIDR: "::1/128", + Device: "lo", + IP: "::1", + MBits: 1000, + Mode: "host", + }, + + // Node.NodeResources.Networks does *not* + // contain host_networks. + }, + NodeNetworks: []*NodeNetworkResource{ + // As of Nomad v1.3 bridge networks get + // registered with only their mode set. + { + Mode: "bridge", + }, + { + Addresses: []NodeNetworkAddress{ + { + Address: "127.0.0.1", + Alias: "default", + Family: "ipv4", + }, + { + Address: "::1", + Alias: "default", + Family: "ipv6", + }, + }, + Device: "lo", + Mode: "host", + Speed: 1000, + }, + { + Addresses: []NodeNetworkAddress{ + { + Address: "192.168.0.1", + Alias: "eth0", + Family: "ipv4", + ReservedPorts: "22", + }, + }, + Device: "enxaaaaaaaaaaaa", + MacAddress: "aa:aa:aa:aa:aa:aa", + Mode: "host", + Speed: 1000, + }, + { + Addresses: []NodeNetworkAddress{ + { + Address: "192.168.1.1", + Alias: "eth1", + Family: "ipv4", + ReservedPorts: "80", + }, + }, + Device: "enxbbbbbbbbbbbb", + MacAddress: "bb:bb:bb:bb:bb:bb", + Mode: "host", + Speed: 1000, + }, + }, + }, + ReservedResources: &NodeReservedResources{ + Networks: NodeReservedNetworkResources{ + ReservedHostPorts: "22", + }, + }, + } + + require.NoError(t, idx.SetNode(n)) + + // TaskNetworks should only contain the bridge and agent network + require.Len(t, idx.TaskNetworks, 2) + + // Ports should be used across all 4 IPs + require.Equal(t, 4, len(idx.UsedPorts)) + + // 22 should be reserved on all IPs + require.True(t, idx.UsedPorts["127.0.0.1"].Check(22)) + require.True(t, idx.UsedPorts["::1"].Check(22)) + require.True(t, idx.UsedPorts["192.168.0.1"].Check(22)) + require.True(t, idx.UsedPorts["192.168.1.1"].Check(22)) + + // 80 should only be reserved on eth1's address + require.False(t, idx.UsedPorts["127.0.0.1"].Check(80)) + require.False(t, idx.UsedPorts["::1"].Check(80)) + require.False(t, idx.UsedPorts["192.168.0.1"].Check(80)) + require.True(t, idx.UsedPorts["192.168.1.1"].Check(80)) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9ce2fd72d45e..75ed14609ffe 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2910,13 +2910,23 @@ func (r *RequestedDevice) Validate() error { // NodeResources is used to define the resources available on a client node. type NodeResources struct { - Cpu NodeCpuResources - Memory NodeMemoryResources - Disk NodeDiskResources - Networks Networks + Cpu NodeCpuResources + Memory NodeMemoryResources + Disk NodeDiskResources + Devices []*NodeDeviceResource + + // NodeNetworks was added in Nomad 0.12 to support multiple interfaces. + // It is the superset of host_networks, fingerprinted networks, and the + // node's default interface. NodeNetworks []*NodeNetworkResource - Devices []*NodeDeviceResource + // Networks is the node's bridge network and default interface. It is + // only used when scheduling jobs with a deprecated + // task.resources.network stanza. + Networks Networks + + // MinDynamicPort and MaxDynamicPort represent the inclusive port range + // to select dynamic ports from across all networks. MinDynamicPort int MaxDynamicPort int } @@ -2993,17 +3003,8 @@ func (n *NodeResources) Merge(o *NodeResources) { } if len(o.NodeNetworks) != 0 { - lookupNetwork := func(nets []*NodeNetworkResource, name string) (int, *NodeNetworkResource) { - for i, nw := range nets { - if nw.Device == name { - return i, nw - } - } - return 0, nil - } - for _, nw := range o.NodeNetworks { - if i, nnw := lookupNetwork(n.NodeNetworks, nw.Device); nnw != nil { + if i, nnw := lookupNetworkByDevice(n.NodeNetworks, nw.Device); nnw != nil { n.NodeNetworks[i] = nw } else { n.NodeNetworks = append(n.NodeNetworks, nw) @@ -3012,6 +3013,15 @@ func (n *NodeResources) Merge(o *NodeResources) { } } +func lookupNetworkByDevice(nets []*NodeNetworkResource, name string) (int, *NodeNetworkResource) { + for i, nw := range nets { + if nw.Device == name { + return i, nw + } + } + return 0, nil +} + func (n *NodeResources) Equals(o *NodeResources) bool { if o == nil && n == nil { return true diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 37afc149a4f8..0bd0c5432235 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -435,7 +435,7 @@ func TestPortCollisionEvent_Copy(t *testing.T) { evCopy.Allocations = append(evCopy.Allocations, mock.Alloc()) require.NotEqual(t, ev.Allocations, evCopy.Allocations) - evCopy.NetIndex.AddReservedPortRange("1000-2000") + evCopy.NetIndex.AddAllocs(evCopy.Allocations) require.NotEqual(t, ev.NetIndex, evCopy.NetIndex) } diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 18021d3edb68..152aea7f6edd 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -417,13 +417,13 @@ func (c *NetworkChecker) hasHostNetworks(option *structs.Node) bool { } found := false for _, net := range option.NodeResources.NodeNetworks { - if net.HasAlias(hostNetworkValue.(string)) { + if net.HasAlias(hostNetworkValue) { found = true break } } if !found { - c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", hostNetworkValue.(string), port.Label)) + c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", hostNetworkValue, port.Label)) return false } } @@ -766,7 +766,7 @@ func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, opti } // resolveTarget is used to resolve the LTarget and RTarget of a Constraint. -func resolveTarget(target string, node *structs.Node) (interface{}, bool) { +func resolveTarget(target string, node *structs.Node) (string, bool) { // If no prefix, this must be a literal value if !strings.HasPrefix(target, "${") { return target, true @@ -797,7 +797,7 @@ func resolveTarget(target string, node *structs.Node) (interface{}, bool) { return val, ok default: - return nil, false + return "", false } } diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index d0483b781335..f6d48fe28607 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -342,14 +342,5 @@ func getProperty(n *structs.Node, property string) (string, bool) { return "", false } - val, ok := resolveTarget(property, n) - if !ok { - return "", false - } - nodeValue, ok := val.(string) - if !ok { - return "", false - } - - return nodeValue, true + return resolveTarget(property, n) } diff --git a/scheduler/rank.go b/scheduler/rank.go index fa3223d114ff..6aa32afcfd6c 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -211,13 +211,13 @@ OUTER: // the node. If it does collide though, it means we found a bug! So // collect as much information as possible. netIdx := structs.NewNetworkIndex() - if collide, reason := netIdx.SetNode(option.Node); collide { + if err := netIdx.SetNode(option.Node); err != nil { iter.ctx.SendEvent(&PortCollisionEvent{ - Reason: reason, + Reason: err.Error(), NetIndex: netIdx.Copy(), Node: option.Node, }) - iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision") + iter.ctx.Metrics().ExhaustedNode(option.Node, "network: invalid node") continue } if collide, reason := netIdx.AddAllocs(proposed); collide { @@ -274,7 +274,7 @@ OUTER: for i, port := range ask.DynamicPorts { if port.HostNetwork != "" { if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk { - ask.DynamicPorts[i].HostNetwork = hostNetworkValue.(string) + ask.DynamicPorts[i].HostNetwork = hostNetworkValue } else { iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label)) netIdx.Release() @@ -285,7 +285,7 @@ OUTER: for i, port := range ask.ReservedPorts { if port.HostNetwork != "" { if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk { - ask.ReservedPorts[i].HostNetwork = hostNetworkValue.(string) + ask.ReservedPorts[i].HostNetwork = hostNetworkValue } else { iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label)) netIdx.Release() @@ -363,7 +363,7 @@ OUTER: // Check if we need a network resource if len(task.Resources.Networks) > 0 { ask := task.Resources.Networks[0].Copy() - offer, err := netIdx.AssignNetwork(ask) + offer, err := netIdx.AssignTaskNetwork(ask) if offer == nil { // If eviction is not enabled, mark this node as exhausted and continue if !iter.evict { @@ -393,7 +393,7 @@ OUTER: netIdx.SetNode(option.Node) netIdx.AddAllocs(proposed) - offer, err = netIdx.AssignNetwork(ask) + offer, err = netIdx.AssignTaskNetwork(ask) if offer == nil { iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err) netIdx.Release() diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index c953fdfd5ec0..d5a0276f2e85 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -492,12 +492,13 @@ func TestBinPackIterator_Network_Failure(t *testing.T) { require.Equal(1, ctx.metrics.DimensionExhausted["network: bandwidth exceeded"]) } -func TestBinPackIterator_Network_PortCollision_Node(t *testing.T) { +func TestBinPackIterator_Network_NoCollision_Node(t *testing.T) { _, ctx := testContext(t) eventsCh := make(chan interface{}) ctx.eventsCh = eventsCh - // Collide on host with duplicate IPs. + // Host networks can have overlapping addresses in which case their + // reserved ports are merged. nodes := []*RankedNode{ { Node: &structs.Node{ @@ -577,9 +578,110 @@ func TestBinPackIterator_Network_PortCollision_Node(t *testing.T) { scoreNorm := NewScoreNormalizationIterator(ctx, binp) out := collectRanked(scoreNorm) - // We expect a placement failure due to port collision. + // Placement should succeed since reserved ports are merged instead of + // treating them as a collision + require.Len(t, out, 1) +} + +// TestBinPackIterator_Network_NodeError asserts that NetworkIndex.SetNode can +// return an error and cause a node to be infeasible. +// +// This should never happen as it indicates "bad" configuration was either not +// caught by validation or caused by bugs in serverside Node handling. +func TestBinPackIterator_Network_NodeError(t *testing.T) { + _, ctx := testContext(t) + eventsCh := make(chan interface{}) + ctx.eventsCh = eventsCh + + nodes := []*RankedNode{ + { + Node: &structs.Node{ + ID: uuid.Generate(), + Resources: &structs.Resources{ + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.158.0.100", + }, + }, + }, + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 4096, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 4096, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.158.0.100", + }, + }, + NodeNetworks: []*structs.NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + Addresses: []structs.NodeNetworkAddress{ + { + Alias: "default", + Address: "192.168.0.100", + ReservedPorts: "22,80", + }, + { + Alias: "private", + Address: "192.168.0.100", + ReservedPorts: "22", + }, + }, + }, + }, + }, + ReservedResources: &structs.NodeReservedResources{ + Networks: structs.NodeReservedNetworkResources{ + ReservedHostPorts: "not-valid-ports", + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + out := collectRanked(scoreNorm) + + // We expect a placement failure because the node has invalid reserved + // ports require.Len(t, out, 0) - require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: port collision"]) + require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: invalid node"], + ctx.metrics.DimensionExhausted) } func TestBinPackIterator_Network_PortCollision_Alloc(t *testing.T) { diff --git a/website/content/docs/configuration/client.mdx b/website/content/docs/configuration/client.mdx index a4a6ea8e5a3d..cfec4d515e26 100644 --- a/website/content/docs/configuration/client.mdx +++ b/website/content/docs/configuration/client.mdx @@ -206,9 +206,12 @@ chroot as doing so would cause infinite recursion. - `disk` `(int: 0)` - Specifies the amount of disk to reserve, in MB. -- `reserved_ports` `(string: "")` - Specifies a comma-separated list of ports to - reserve on all fingerprinted network devices. Ranges can be specified by using - a hyphen separated the two inclusive ends. +- `reserved_ports` `(string: "")` - Specifies a comma-separated list of ports + to reserve on all fingerprinted network devices. Ranges can be specified by + using a hyphen separating the two inclusive ends. See also + [`host_network`](#host_network-stanza) for reserving ports on specific host + networks. + ### `artifact` Parameters @@ -396,8 +399,10 @@ client { - `interface` `(string: "")` - Filters searching of addresses to a specific interface. - `reserved_ports` `(string: "")` - Specifies a comma-separated list of ports to - reserve on all fingerprinted network devices. Ranges can be specified by using + reserve on all addresses associated with this network. Ranges can be specified by using a hyphen separating the two inclusive ends. + [`reserved.reserved_ports`](#reserved_ports) are also reserved on each host + network. ## `client` Examples