From d1b652c9047a59759a23c65e28605fe634847ccf Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 7 Jul 2022 17:06:20 -0700 Subject: [PATCH] core: merge reserved_ports into host_networks Fixes #13505 WIP --- nomad/structs/funcs.go | 11 +- nomad/structs/network.go | 268 +++++++++++++++++++++++---------------- nomad/structs/structs.go | 40 +++--- scheduler/feasible.go | 8 +- scheduler/propertyset.go | 6 +- scheduler/rank.go | 14 +- 6 files changed, 208 insertions(+), 139 deletions(-) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index a94524ec44dd..e6128fa6ab6e 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -207,8 +207,11 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi netIdx = NewNetworkIndex() defer netIdx.Release() - if collision, reason := netIdx.SetNode(node); collision { - return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil + if err := netIdx.SetNode(node); err != nil { + // To maintain backward compatibility with when SetNode + // returned collision+reason like AddAllocs, return + // this as a reason instead of an error. + return false, fmt.Sprintf("reserved node port collision: %v", err), used, nil } if collision, reason := netIdx.AddAllocs(allocs); collision { return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil @@ -530,6 +533,10 @@ func ParsePortRanges(spec string) ([]uint64, error) { if err != nil { return nil, err } + + if port > MaxValidPort { + return nil, fmt.Errorf("port must be < %d but found %d", MaxValidPort, port) + } ports[port] = struct{}{} } case 2: diff --git a/nomad/structs/network.go b/nomad/structs/network.go index 30e7693236be..530f8e6ba173 100644 --- a/nomad/structs/network.go +++ b/nomad/structs/network.go @@ -36,13 +36,18 @@ var ( // NetworkIndex is used to index the available network resources // and the used network resources on a machine given allocations +// +// Fields are exported so they may be JSON serialized for debugging. +// Fields are *not* intended to be used directly. type NetworkIndex struct { - AvailNetworks []*NetworkResource // List of available networks - NodeNetworks []*NodeNetworkResource // List of available node networks - AvailAddresses map[string][]NodeNetworkAddress // Map of host network aliases to list of addresses - AvailBandwidth map[string]int // Bandwidth by device - UsedPorts map[string]Bitmap // Ports by IP - UsedBandwidth map[string]int // Bandwidth by device + TaskNetworks []*NetworkResource // List of available networks + GroupNetworks []*NodeNetworkResource // List of available node networks + HostNetworks map[string][]NodeNetworkAddress // Map of host network aliases to list of addresses + UsedPorts map[string]Bitmap // Ports by IP + + // Deprecated bandwidth fields + AvailBandwidth map[string]int // Bandwidth by device + UsedBandwidth map[string]int // Bandwidth by device MinDynamicPort int // The smallest dynamic port generated MaxDynamicPort int // The largest dynamic port generated @@ -51,9 +56,9 @@ type NetworkIndex struct { // NewNetworkIndex is used to construct a new network index func NewNetworkIndex() *NetworkIndex { return &NetworkIndex{ - AvailAddresses: make(map[string][]NodeNetworkAddress), - AvailBandwidth: make(map[string]int), + HostNetworks: make(map[string][]NodeNetworkAddress), UsedPorts: make(map[string]Bitmap), + AvailBandwidth: make(map[string]int), UsedBandwidth: make(map[string]int), MinDynamicPort: DefaultMinDynamicPort, MaxDynamicPort: DefaultMaxDynamicPort, @@ -84,9 +89,9 @@ func (idx *NetworkIndex) Copy() *NetworkIndex { c := new(NetworkIndex) *c = *idx - c.AvailNetworks = copyNetworkResources(idx.AvailNetworks) - c.NodeNetworks = copyNodeNetworks(idx.NodeNetworks) - c.AvailAddresses = copyAvailAddresses(idx.AvailAddresses) + c.TaskNetworks = copyNetworkResources(idx.TaskNetworks) + c.GroupNetworks = copyNodeNetworks(idx.GroupNetworks) + c.HostNetworks = copyAvailAddresses(idx.HostNetworks) if idx.AvailBandwidth != nil && len(idx.AvailBandwidth) == 0 { c.AvailBandwidth = make(map[string]int) } else { @@ -171,61 +176,131 @@ func (idx *NetworkIndex) Overcommitted() bool { return false } -// SetNode is used to setup the available network resources. Returns -// true if there is a collision -func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) { - - // COMPAT(0.11): Remove in 0.11 - // Grab the network resources, handling both new and old - var networks []*NetworkResource +// SetNode is used to initialize a node's network index with available IPs, +// reserved ports, and other details from a node's configuration and +// fingerprinting. +// +// SetNode must be idempotent as preemption causes SetNode to be called +// multiple times on the same NetworkIndex, only clearing UsedPorts being +// calls. +// +// An error is returned if the Node cannot produce a consistent NetworkIndex +// such as if reserved_ports are unparseable. Any such Node is effectively +// unschedulable for workloads which require networks. +// +// Any errors returned by SetNode indicate a bug! The bug may lie in client +// code not properly validating its configuration or it may lie in improper +// Node object handling by servers. Users should not be able to cause SetNode +// to error. Data that cause SetNode to error should be caught upstream such as +// a client agent refusing to start with an invalid configuration. +func (idx *NetworkIndex) SetNode(node *Node) error { + + // COMPAT(0.11): Deprecated. taskNetworks are only used for + // task.resources.network asks which have been deprecated since before + // 0.11. + // Grab the network resources, handling both new and old Node layouts + // from clients. + var taskNetworks []*NetworkResource if node.NodeResources != nil && len(node.NodeResources.Networks) != 0 { - networks = node.NodeResources.Networks + taskNetworks = node.NodeResources.Networks } else if node.Resources != nil { - networks = node.Resources.Networks + taskNetworks = node.Resources.Networks } + // nodeNetworks are used for group.network asks. var nodeNetworks []*NodeNetworkResource if node.NodeResources != nil && len(node.NodeResources.NodeNetworks) != 0 { nodeNetworks = node.NodeResources.NodeNetworks } - // Add the available CIDR blocks - for _, n := range networks { + // Filter task networks down to those with a device. For example + // taskNetworks may contain a "bridge" interface which has no device + // set and cannot be used to fulfill asks. + for _, n := range taskNetworks { if n.Device != "" { - idx.AvailNetworks = append(idx.AvailNetworks, n) + idx.TaskNetworks = append(idx.TaskNetworks, n) idx.AvailBandwidth[n.Device] = n.MBits } } - // TODO: upgrade path? - // is it possible to get duplicates here? - for _, n := range nodeNetworks { - for _, a := range n.Addresses { - idx.AvailAddresses[a.Alias] = append(idx.AvailAddresses[a.Alias], a) - if c, r := idx.AddReservedPortsForIP(a.ReservedPorts, a.Address); c { - collide = true - reason = fmt.Sprintf("collision when reserving ports for node network %s in node %s: %v", a.Alias, node.ID, r) - } - } - } + // Reserved ports get merged downward. For example given an agent + // config: + // + // client.reserved.reserved_ports = "22" + // client.host_network["eth0"] = {reserved_ports = "80,443"} + // client.host_network["eth1"] = {reserved_ports = "1-1000"} + // + // Addresses on taskNetworks reserve port 22 + // Addresses on eth0 reserve 22,80,443 (note that 22 is included!) + // Addresses on eth1 reserve 1-1000 (22 already included in this range) + globalResPorts := []uint{} // COMPAT(0.11): Remove in 0.11 - // Handle reserving ports, handling both new and old + // Handle reserving ports, handling both new and old client Node + // layouts if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" { - c, r := idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts) - collide = c - if collide { - reason = fmt.Sprintf("collision when reserving port range for node %s: %v", node.ID, r) + resPorts, err := ParsePortRanges(node.ReservedResources.Networks.ReservedHostPorts) + if err != nil { + // This is a fatal error that should have been + // prevented by client validation. + return fmt.Errorf("error parsing reserved_ports: %w", err) + } + + globalResPorts = make([]uint, len(resPorts)) + for i, p := range resPorts { + globalResPorts[i] = uint(p) } } else if node.Reserved != nil { for _, n := range node.Reserved.Networks { - if c, r := idx.AddReserved(n); c { - collide = true - reason = fmt.Sprintf("collision when reserving network %s for node %s: %v", n.IP, node.ID, r) + for _, ports := range [][]Port{n.ReservedPorts, n.DynamicPorts} { + for _, p := range ports { + if p.Value > MaxValidPort || p.Value < 0 { + // This is a fatal error that + // should have been prevented + // by validation upstream. + return fmt.Errorf("invalid port %d for reserved_ports", p.Value) + } + globalResPorts = append(globalResPorts, uint(p.Value)) + } + } + } + } + + // TODO: upgrade path? + // is it possible to get duplicates here? + for _, n := range nodeNetworks { + for _, a := range n.Addresses { + // Index host networks by their unique alias for asks + // with group.network.port.host_network set. + idx.HostNetworks[a.Alias] = append(idx.HostNetworks[a.Alias], a) + + // Mark reserved ports as used without worrying about + // collisions. This effectively merges + // client.reserved.reserved_ports into each + // host_network. + used := idx.getUsedPortsFor(a.Address) + for _, p := range globalResPorts { + used.Set(p) + } + + // If ReservedPorts is set on the NodeNetwork, use it + // and the global reserved ports. + if a.ReservedPorts != "" { + rp, err := ParsePortRanges(a.ReservedPorts) + if err != nil { + // This is a fatal error that should + // have been prevented by validation + // upstream. + return fmt.Errorf("error parsing reserved_ports for network %q: %w", a.Alias, err) + } + for _, p := range rp { + used.Set(uint(p)) + } } } } + // Set dynamic port range (applies to all addresses) if node.NodeResources != nil && node.NodeResources.MinDynamicPort > 0 { idx.MinDynamicPort = node.NodeResources.MinDynamicPort } @@ -234,11 +309,16 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) { idx.MaxDynamicPort = node.NodeResources.MaxDynamicPort } - return + return nil } // AddAllocs is used to add the used network resources. Returns // true if there is a collision +// +// AddAllocs may be called multiple times for the same NetworkIndex with +// UsedPorts cleared between calls (by Release). Therefore AddAllocs must be +// determistic and must not manipulate state outside of UsedPorts as that state +// would persist between Release calls. func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool, reason string) { for _, alloc := range allocs { // Do not consider the resource impact of terminal allocations @@ -338,51 +418,11 @@ func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool, r return } -// AddReservedPortRange marks the ports given as reserved on all network -// interfaces. The port format is comma delimited, with spans given as n1-n2 -// (80,100-200,205) -func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool, reasons []string) { - // Convert the ports into a slice of ints - resPorts, err := ParsePortRanges(ports) - if err != nil { - return - } - - // Ensure we create a bitmap for each available network - for _, n := range idx.AvailNetworks { - idx.getUsedPortsFor(n.IP) - } - - for _, used := range idx.UsedPorts { - for _, port := range resPorts { - // Guard against invalid port - if port >= MaxValidPort { - return true, []string{fmt.Sprintf("invalid port %d", port)} - } - if used.Check(uint(port)) { - collide = true - reason := fmt.Sprintf("port %d already in use", port) - reasons = append(reasons, reason) - } else { - used.Set(uint(port)) - } - } - } - - return -} - // AddReservedPortsForIP checks whether any reserved ports collide with those // in use for the IP address. -func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool, reasons []string) { - // Convert the ports into a slice of ints - resPorts, err := ParsePortRanges(ports) - if err != nil { - return - } - +func (idx *NetworkIndex) AddReservedPortsForIP(ports []uint64, ip string) (collide bool, reasons []string) { used := idx.getUsedPortsFor(ip) - for _, port := range resPorts { + for _, port := range ports { // Guard against invalid port if port >= MaxValidPort { return true, []string{fmt.Sprintf("invalid port %d", port)} @@ -401,22 +441,13 @@ func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide // yieldIP is used to iteratively invoke the callback with // an available IP -func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) { - inc := func(ip net.IP) { - for j := len(ip) - 1; j >= 0; j-- { - ip[j]++ - if ip[j] > 0 { - break - } - } - } - - for _, n := range idx.AvailNetworks { +func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, offerIP net.IP) bool) { + for _, n := range idx.TaskNetworks { ip, ipnet, err := net.ParseCIDR(n.CIDR) if err != nil { continue } - for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); incIP(ip) { if cb(n, ip) { return } @@ -424,6 +455,26 @@ func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) } } +func incIP(ip net.IP) { + // Iterate over IP octects from right to left + for j := len(ip) - 1; j >= 0; j-- { + + // Increment octect + ip[j]++ + + // If this octect did not wrap around to 0, it's the next IP to + // try. If it did wrap (p[j]==0), then the next octect is + // incremented. + if ip[j] > 0 { + break + } + } +} + +// AssignPorts based on an ask from the scheduler processing a group.network +// stanza. Supports multi-interfaces through node configured host_networks. +// +// AssignNetwork supports the deprecated task.network stanza. func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, error) { var offer AllocatedPorts @@ -437,7 +488,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro // if allocPort is still nil after the loop, the port wasn't available for reservation var allocPort *AllocatedPortMapping var addrErr error - for _, addr := range idx.AvailAddresses[port.HostNetwork] { + for _, addr := range idx.HostNetworks[port.HostNetwork] { used := idx.getUsedPortsFor(addr.Address) // Guard against invalid port if port.Value < 0 || port.Value >= MaxValidPort { @@ -472,7 +523,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro for _, port := range ask.DynamicPorts { var allocPort *AllocatedPortMapping var addrErr error - for _, addr := range idx.AvailAddresses[port.HostNetwork] { + for _, addr := range idx.HostNetworks[port.HostNetwork] { used := idx.getUsedPortsFor(addr.Address) // Try to stochastically pick the dynamic ports as it is faster and // lower memory usage. @@ -512,13 +563,18 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro return offer, nil } -// AssignNetwork is used to assign network resources given an ask. -// If the ask cannot be satisfied, returns nil -func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) { +// AssignTaskNetwork is used to offer network resources given a +// task.resources.network ask. If the ask cannot be satisfied, returns nil +// +// AssignTaskNetwork and task.resources.network are deprecated in favor of +// AssignPorts and group.network. AssignTaskNetwork does not support multiple +// interfaces and only uses the node's default interface. AssignPorts is the +// method that is used for group.network asks. +func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkResource, err error) { err = fmt.Errorf("no networks available") - idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) { + idx.yieldIP(func(n *NetworkResource, offerIP net.IP) (stop bool) { // Convert the IP to a string - ipStr := ip.String() + offerIPStr := offerIP.String() // Check if we would exceed the bandwidth cap availBandwidth := idx.AvailBandwidth[n.Device] @@ -528,7 +584,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour return } - used := idx.UsedPorts[ipStr] + used := idx.UsedPorts[offerIPStr] // Check if any of the reserved ports are in use for _, port := range ask.ReservedPorts { @@ -549,7 +605,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour offer := &NetworkResource{ Mode: ask.Mode, Device: n.Device, - IP: ipStr, + IP: offerIPStr, MBits: ask.MBits, DNS: ask.DNS, ReservedPorts: ask.ReservedPorts, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 723865d2d930..38323966b313 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2905,13 +2905,23 @@ func (r *RequestedDevice) Validate() error { // NodeResources is used to define the resources available on a client node. type NodeResources struct { - Cpu NodeCpuResources - Memory NodeMemoryResources - Disk NodeDiskResources - Networks Networks + Cpu NodeCpuResources + Memory NodeMemoryResources + Disk NodeDiskResources + Devices []*NodeDeviceResource + + // NodeNetworks was added in Nomad 0.12 to support multiple interfaces. + // It is the superset of host_networks, fingerprinted networks, and the + // node's default interface. NodeNetworks []*NodeNetworkResource - Devices []*NodeDeviceResource + // Networks is the node's bridge network and default interface. It is + // only used when scheduling jobs with a deprecated + // task.resources.network stanza. + Networks Networks + + // MinDynamicPort and MaxDynamicPort represent the inclusive port range + // to select dynamic ports from across all networks. MinDynamicPort int MaxDynamicPort int } @@ -2988,17 +2998,8 @@ func (n *NodeResources) Merge(o *NodeResources) { } if len(o.NodeNetworks) != 0 { - lookupNetwork := func(nets []*NodeNetworkResource, name string) (int, *NodeNetworkResource) { - for i, nw := range nets { - if nw.Device == name { - return i, nw - } - } - return 0, nil - } - for _, nw := range o.NodeNetworks { - if i, nnw := lookupNetwork(n.NodeNetworks, nw.Device); nnw != nil { + if i, nnw := lookupNetworkByDevice(n.NodeNetworks, nw.Device); nnw != nil { n.NodeNetworks[i] = nw } else { n.NodeNetworks = append(n.NodeNetworks, nw) @@ -3007,6 +3008,15 @@ func (n *NodeResources) Merge(o *NodeResources) { } } +func lookupNetworkByDevice(nets []*NodeNetworkResource, name string) (int, *NodeNetworkResource) { + for i, nw := range nets { + if nw.Device == name { + return i, nw + } + } + return 0, nil +} + func (n *NodeResources) Equals(o *NodeResources) bool { if o == nil && n == nil { return true diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 18021d3edb68..152aea7f6edd 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -417,13 +417,13 @@ func (c *NetworkChecker) hasHostNetworks(option *structs.Node) bool { } found := false for _, net := range option.NodeResources.NodeNetworks { - if net.HasAlias(hostNetworkValue.(string)) { + if net.HasAlias(hostNetworkValue) { found = true break } } if !found { - c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", hostNetworkValue.(string), port.Label)) + c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", hostNetworkValue, port.Label)) return false } } @@ -766,7 +766,7 @@ func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, opti } // resolveTarget is used to resolve the LTarget and RTarget of a Constraint. -func resolveTarget(target string, node *structs.Node) (interface{}, bool) { +func resolveTarget(target string, node *structs.Node) (string, bool) { // If no prefix, this must be a literal value if !strings.HasPrefix(target, "${") { return target, true @@ -797,7 +797,7 @@ func resolveTarget(target string, node *structs.Node) (interface{}, bool) { return val, ok default: - return nil, false + return "", false } } diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index d0483b781335..b5c4319cf564 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -346,10 +346,6 @@ func getProperty(n *structs.Node, property string) (string, bool) { if !ok { return "", false } - nodeValue, ok := val.(string) - if !ok { - return "", false - } - return nodeValue, true + return val, true } diff --git a/scheduler/rank.go b/scheduler/rank.go index fa3223d114ff..6aa32afcfd6c 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -211,13 +211,13 @@ OUTER: // the node. If it does collide though, it means we found a bug! So // collect as much information as possible. netIdx := structs.NewNetworkIndex() - if collide, reason := netIdx.SetNode(option.Node); collide { + if err := netIdx.SetNode(option.Node); err != nil { iter.ctx.SendEvent(&PortCollisionEvent{ - Reason: reason, + Reason: err.Error(), NetIndex: netIdx.Copy(), Node: option.Node, }) - iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision") + iter.ctx.Metrics().ExhaustedNode(option.Node, "network: invalid node") continue } if collide, reason := netIdx.AddAllocs(proposed); collide { @@ -274,7 +274,7 @@ OUTER: for i, port := range ask.DynamicPorts { if port.HostNetwork != "" { if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk { - ask.DynamicPorts[i].HostNetwork = hostNetworkValue.(string) + ask.DynamicPorts[i].HostNetwork = hostNetworkValue } else { iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label)) netIdx.Release() @@ -285,7 +285,7 @@ OUTER: for i, port := range ask.ReservedPorts { if port.HostNetwork != "" { if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk { - ask.ReservedPorts[i].HostNetwork = hostNetworkValue.(string) + ask.ReservedPorts[i].HostNetwork = hostNetworkValue } else { iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label)) netIdx.Release() @@ -363,7 +363,7 @@ OUTER: // Check if we need a network resource if len(task.Resources.Networks) > 0 { ask := task.Resources.Networks[0].Copy() - offer, err := netIdx.AssignNetwork(ask) + offer, err := netIdx.AssignTaskNetwork(ask) if offer == nil { // If eviction is not enabled, mark this node as exhausted and continue if !iter.evict { @@ -393,7 +393,7 @@ OUTER: netIdx.SetNode(option.Node) netIdx.AddAllocs(proposed) - offer, err = netIdx.AssignNetwork(ask) + offer, err = netIdx.AssignTaskNetwork(ask) if offer == nil { iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err) netIdx.Release()