Skip to content

Commit

Permalink
scheduling: prevent self-collision in dynamic port network offerings
Browse files Browse the repository at this point in the history
When the scheduler tries to find a placement for a new allocation, it iterates
over a subset of nodes. For each node, we populate a `NetworkIndex` bitmap with
the ports of all existing allocations and any other allocations already proposed
as part of this same evaluation via its `SetAllocs` method. Then we make an
"ask" of the `NetworkIndex` in `AssignPorts` for any ports we need and receive
an "offer" in return. The offer will include both static ports and any dynamic
port assignments.

The `AssignPorts` method was written to support group networks, and it shares
code that selects dynamic ports with the original `AssignTaskNetwork`
code. `AssignTaskNetwork` can request multiple ports from the bitmap at a
time. But `AssignPorts` requests them one at a time and does not account for
possible collisions, and doesn't return an error in that case.

What happens next varies:

1. If the scheduler doesn't place the allocation on that node, the port
   conflict is thrown away and there's no problem.
2. If the node is picked and this is the only allocation (or last allocation),
   the plan applier will reject the plan when it calls `SetAllocs`, as we'd expect.
3. If the node is picked and there are additional allocations in the same eval
   that iterate over the same node, their call to `SetAllocs` will detect the
   impossible state and the node will be rejected. This can have the puzzling
   behavior where a second task group for the job without any networking at all
   can hit a port collision error!

It looks like this bug has existed since we implemented group networks, but
there are several factors that add up to making the issue rare for many users
yet frustratingly frequent for others:

* You're more likely to hit this bug the more tightly packed your range for
  dynamic ports is. With 12000 ports in the range by default, many clusters can
  avoid this for a long time.
* You're more likely to hit case (3) for jobs with lots of allocations or if a
  scheduler has to iterate over a large number of nodes, such as with system jobs,
  jobs with `spread` blocks, or (sometimes) jobs using `unique` constraints.

For unlucky combinations of these factors, it's possible that case (3) happens
repeatedly, preventing scheduling of a given job until a client state
change (ex. restarting the agent so all its allocations are rescheduled
elsewhere) re-opens the range of dynamic ports available.

This changeset:

* Fixes the bug by accounting for collisions in dynamic port selection in
  `AssignPorts`.
* Adds test coverage for `AssignPorts`, expands coverage of this case for the
  deprecated `AssignTaskNetwork`, and tightens the dynamic port range in a
  scheduler test for spread scheduling to more easily detect this kind of problem
  in the future.
* Adds a `String()` method to `Bitmap` so that any future "screaming" log lines
  have a human-readable list of used ports.
  • Loading branch information
tgross committed Mar 8, 2023
1 parent fcd51dc commit cb8eb5e
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 37 deletions.
28 changes: 27 additions & 1 deletion nomad/structs/bitmap.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package structs

import "fmt"
import (
"fmt"

"golang.org/x/exp/slices"
)

// Bitmap is a simple uncompressed bitmap
type Bitmap []byte
Expand Down Expand Up @@ -76,3 +80,25 @@ func (b Bitmap) IndexesInRange(set bool, from, to uint) []int {

return indexes
}

// IndexesInRangeFiltered returns the indexes in which the values are either set
// or unset based on the passed parameter in the passed range, and do not appear
// in the filter slice
func (b Bitmap) IndexesInRangeFiltered(set bool, from, to uint, filter []int) []int {
var indexes []int
for i := from; i <= to && i < b.Size(); i++ {
c := b.Check(i)
if c && set || !c && !set {
if len(filter) < 1 || !slices.Contains(filter, int(i)) {
indexes = append(indexes, int(i))
}
}
}

return indexes
}

// String represents the Bitmap the same as slice of the Bitmap's set values
func (b Bitmap) String() string {
return fmt.Sprintf("%v", b.IndexesInRange(true, 0, b.Size()))
}
52 changes: 36 additions & 16 deletions nomad/structs/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -502,6 +503,7 @@ func incIP(ip net.IP) {
// AssignTaskNetwork supports the deprecated task.resources.network block.
func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, error) {
var offer AllocatedPorts
var portsInOffer []int

// index of host network name to slice of reserved ports, used during dynamic port assignment
reservedIdx := map[string][]Port{}
Expand Down Expand Up @@ -543,6 +545,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
}

offer = append(offer, *allocPort)
portsInOffer = append(portsInOffer, allocPort.Value)
}

for _, port := range ask.DynamicPorts {
Expand All @@ -554,10 +557,14 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
// lower memory usage.
var dynPorts []int
// TODO: its more efficient to find multiple dynamic ports at once
dynPorts, addrErr = getDynamicPortsStochastic(used, idx.MinDynamicPort, idx.MaxDynamicPort, reservedIdx[port.HostNetwork], 1)
dynPorts, addrErr = getDynamicPortsStochastic(
used, portsInOffer, idx.MinDynamicPort, idx.MaxDynamicPort,
reservedIdx[port.HostNetwork], 1)
if addrErr != nil {
// Fall back to the precise method if the random sampling failed.
dynPorts, addrErr = getDynamicPortsPrecise(used, idx.MinDynamicPort, idx.MaxDynamicPort, reservedIdx[port.HostNetwork], 1)
dynPorts, addrErr = getDynamicPortsPrecise(used, portsInOffer,
idx.MinDynamicPort, idx.MaxDynamicPort,
reservedIdx[port.HostNetwork], 1)
if addrErr != nil {
continue
}
Expand All @@ -583,6 +590,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
return nil, fmt.Errorf("no addresses available for %s network", port.HostNetwork)
}
offer = append(offer, *allocPort)
portsInOffer = append(portsInOffer, allocPort.Value)
}

return offer, nil
Expand Down Expand Up @@ -641,13 +649,15 @@ func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkRe
// lower memory usage.
var dynPorts []int
var dynErr error
dynPorts, dynErr = getDynamicPortsStochastic(used, idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
dynPorts, dynErr = getDynamicPortsStochastic(used, nil,
idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
if dynErr == nil {
goto BUILD_OFFER
}

// Fall back to the precise method if the random sampling failed.
dynPorts, dynErr = getDynamicPortsPrecise(used, idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
dynPorts, dynErr = getDynamicPortsPrecise(used, nil,
idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
if dynErr != nil {
err = dynErr
return
Expand All @@ -673,10 +683,11 @@ func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkRe
}

// getDynamicPortsPrecise takes the nodes used port bitmap which may be nil if
// no ports have been allocated yet, the network ask and returns a set of unused
// ports to fulfil the ask's DynamicPorts or an error if it failed. An error
// means the ask can not be satisfied as the method does a precise search.
func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, reserved []Port, numDyn int) ([]int, error) {
// no ports have been allocated yet, any ports already offered in the caller,
// and the network ask. It returns a set of unused ports to fulfil the ask's
// DynamicPorts or an error if it failed. An error means the ask can not be
// satisfied as the method does a precise search.
func getDynamicPortsPrecise(nodeUsed Bitmap, portsInOffer []int, minDynamicPort, maxDynamicPort int, reserved []Port, numDyn int) ([]int, error) {
// Create a copy of the used ports and apply the new reserves
var usedSet Bitmap
var err error
Expand All @@ -696,8 +707,10 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int,
usedSet.Set(uint(port.Value))
}

// Get the indexes of the unset
availablePorts := usedSet.IndexesInRange(false, uint(minDynamicPort), uint(maxDynamicPort))
// Get the indexes of the unset ports, less those which have already been
// picked as part of this offer
availablePorts := usedSet.IndexesInRangeFiltered(
false, uint(minDynamicPort), uint(maxDynamicPort), portsInOffer)

// Randomize the amount we need
if len(availablePorts) < numDyn {
Expand All @@ -713,12 +726,13 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int,
return availablePorts[:numDyn], nil
}

// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil if
// no ports have been allocated yet, the network ask and returns a set of unused
// ports to fulfil the ask's DynamicPorts or an error if it failed. An error
// does not mean the ask can not be satisfied as the method has a fixed amount
// of random probes and if these fail, the search is aborted.
func getDynamicPortsStochastic(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, reservedPorts []Port, count int) ([]int, error) {
// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil
// if no ports have been allocated yet, any ports already offered in the caller,
// and the network ask. It returns a set of unused ports to fulfil the ask's
// DynamicPorts or an error if it failed. An error does not mean the ask can not
// be satisfied as the method has a fixed amount of random probes and if these
// fail, the search is aborted.
func getDynamicPortsStochastic(nodeUsed Bitmap, portsInOffer []int, minDynamicPort, maxDynamicPort int, reservedPorts []Port, count int) ([]int, error) {
var reserved, dynamic []int
for _, port := range reservedPorts {
reserved = append(reserved, port.Value)
Expand All @@ -742,6 +756,12 @@ func getDynamicPortsStochastic(nodeUsed Bitmap, minDynamicPort, maxDynamicPort i
goto PICK
}
}
// the pick conflicted with a previous pick that hasn't been saved to
// the index yet
if portsInOffer != nil && slices.Contains(portsInOffer, randPort) {
goto PICK
}

dynamic = append(dynamic, randPort)
}

Expand Down
98 changes: 80 additions & 18 deletions nomad/structs/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -354,6 +355,72 @@ func TestNetworkIndex_yieldIP(t *testing.T) {
}
}

// TestNetworkIndex_AssignPorts exercises assigning ports on group networks.
func TestNetworkIndex_AssignPorts(t *testing.T) {
ci.Parallel(t)

// Create a node that only has one free port
idx := NewNetworkIndex()
n := &Node{
NodeResources: &NodeResources{
Networks: []*NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
IP: "192.168.0.100",
MBits: 1000,
},
},
NodeNetworks: []*NodeNetworkResource{
{
Mode: "host",
Device: "eth0",
Speed: 1000,
Addresses: []NodeNetworkAddress{
{
Alias: "default",
Address: "192.168.0.100",
Family: NodeNetworkAF_IPv4,
},
},
},
},
},
ReservedResources: &NodeReservedResources{
Networks: NodeReservedNetworkResources{
ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-2),
},
},
}

idx.SetNode(n)

// Ask for 2 dynamic ports
ask := &NetworkResource{
ReservedPorts: []Port{{"static", 443, 443, "default"}},
DynamicPorts: []Port{{"http", 0, 80, "default"}, {"admin", 0, 8080, "default"}},
}
offer, err := idx.AssignPorts(ask)
must.NoError(t, err)
must.NotNil(t, offer, must.Sprint("did not get an offer"))

staticPortMapping, ok := offer.Get("static")
must.True(t, ok)

httpPortMapping, ok := offer.Get("http")
must.True(t, ok)

adminPortMapping, ok := offer.Get("admin")
must.True(t, ok)

must.NotEq(t, httpPortMapping.Value, adminPortMapping.Value,
must.Sprint("assigned dynamic ports must not conflict"))

must.Eq(t, 443, staticPortMapping.Value)
must.Between(t, idx.MaxDynamicPort-1, httpPortMapping.Value, idx.MaxDynamicPort)
must.Between(t, idx.MaxDynamicPort-1, adminPortMapping.Value, idx.MaxDynamicPort)
}

func TestNetworkIndex_AssignTaskNetwork(t *testing.T) {
ci.Parallel(t)
idx := NewNetworkIndex()
Expand Down Expand Up @@ -476,32 +543,27 @@ func TestNetworkIndex_AssignTaskNetwork_Dynamic_Contention(t *testing.T) {
},
ReservedResources: &NodeReservedResources{
Networks: NodeReservedNetworkResources{
ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-1),
ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-2),
},
},
}

idx.SetNode(n)

// Ask for dynamic ports
// Ask for 2 dynamic ports
ask := &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80, ""}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"admin", 0, 443, ""}},
}
offer, err := idx.AssignTaskNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.100" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.DynamicPorts) != 1 {
t.Fatalf("There should be one dynamic ports")
}
if p := offer.DynamicPorts[0].Value; p != idx.MaxDynamicPort {
t.Fatalf("Dynamic Port: should have been assigned %d; got %d", p, idx.MaxDynamicPort)
}
must.NoError(t, err)
must.NotNil(t, offer, must.Sprint("did not get an offer"))
must.Eq(t, "192.168.0.100", offer.IP)
must.Len(t, 2, offer.DynamicPorts, must.Sprint("There should be one dynamic ports"))

must.NotEq(t, offer.DynamicPorts[0].Value, offer.DynamicPorts[1].Value,
must.Sprint("assigned dynamic ports must not conflict"))
must.Between(t, idx.MaxDynamicPort-1, offer.DynamicPorts[0].Value, idx.MaxDynamicPort)
must.Between(t, idx.MaxDynamicPort-1, offer.DynamicPorts[1].Value, idx.MaxDynamicPort)
}

// COMPAT(0.11): Remove in 0.11
Expand Down
4 changes: 4 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,10 @@ func TestServiceSched_Spread(t *testing.T) {
if i%2 == 0 {
node.Datacenter = "dc2"
}
// setting a narrow range makes it more likely for this test to
// hit bugs in NetworkIndex
node.NodeResources.MinDynamicPort = 20000
node.NodeResources.MaxDynamicPort = 20005
nodes = append(nodes, node)
assert.Nil(h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node), "UpsertNode")
nodeMap[node.ID] = node
Expand Down
4 changes: 2 additions & 2 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ OUTER:
return nil
}

// Get the proposed allocations
// Get the allocations that already exist on the node + those allocs
// that have been placed as part of this same evaluation
proposed, err := option.ProposedAllocs(iter.ctx)
if err != nil {
iter.ctx.Logger().Named("binpack").Error("failed retrieving proposed allocations", "error", err)
Expand Down Expand Up @@ -400,7 +401,6 @@ OUTER:
continue OUTER
}
}

// Reserve this to prevent another task from colliding
netIdx.AddReserved(offer)

Expand Down

0 comments on commit cb8eb5e

Please sign in to comment.