Skip to content

Commit

Permalink
Backport of scheduling: prevent self-collision in dynamic port networ…
Browse files Browse the repository at this point in the history
…k offerings into release/1.5.x (#16410)

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core authored Mar 9, 2023
1 parent 4522aab commit 5c9a035
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 37 deletions.
3 changes: 3 additions & 0 deletions .changelog/16401.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug where collisions in dynamic port offerings would result in spurious plan-for-node-rejected errors
```
28 changes: 27 additions & 1 deletion nomad/structs/bitmap.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package structs

import "fmt"
import (
"fmt"

"golang.org/x/exp/slices"
)

// Bitmap is a simple uncompressed bitmap
type Bitmap []byte
Expand Down Expand Up @@ -76,3 +80,25 @@ func (b Bitmap) IndexesInRange(set bool, from, to uint) []int {

return indexes
}

// IndexesInRangeFiltered returns the indexes in which the values are either set
// or unset based on the passed parameter in the passed range, and do not appear
// in the filter slice
func (b Bitmap) IndexesInRangeFiltered(set bool, from, to uint, filter []int) []int {
var indexes []int
for i := from; i <= to && i < b.Size(); i++ {
c := b.Check(i)
if c == set {
if len(filter) < 1 || !slices.Contains(filter, int(i)) {
indexes = append(indexes, int(i))
}
}
}

return indexes
}

// String represents the Bitmap the same as slice of the Bitmap's set values
func (b Bitmap) String() string {
return fmt.Sprintf("%v", b.IndexesInRange(true, 0, b.Size()))
}
52 changes: 36 additions & 16 deletions nomad/structs/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -502,6 +503,7 @@ func incIP(ip net.IP) {
// AssignTaskNetwork supports the deprecated task.resources.network block.
func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, error) {
var offer AllocatedPorts
var portsInOffer []int

// index of host network name to slice of reserved ports, used during dynamic port assignment
reservedIdx := map[string][]Port{}
Expand Down Expand Up @@ -543,6 +545,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
}

offer = append(offer, *allocPort)
portsInOffer = append(portsInOffer, allocPort.Value)
}

for _, port := range ask.DynamicPorts {
Expand All @@ -554,10 +557,14 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
// lower memory usage.
var dynPorts []int
// TODO: its more efficient to find multiple dynamic ports at once
dynPorts, addrErr = getDynamicPortsStochastic(used, idx.MinDynamicPort, idx.MaxDynamicPort, reservedIdx[port.HostNetwork], 1)
dynPorts, addrErr = getDynamicPortsStochastic(
used, portsInOffer, idx.MinDynamicPort, idx.MaxDynamicPort,
reservedIdx[port.HostNetwork], 1)
if addrErr != nil {
// Fall back to the precise method if the random sampling failed.
dynPorts, addrErr = getDynamicPortsPrecise(used, idx.MinDynamicPort, idx.MaxDynamicPort, reservedIdx[port.HostNetwork], 1)
dynPorts, addrErr = getDynamicPortsPrecise(used, portsInOffer,
idx.MinDynamicPort, idx.MaxDynamicPort,
reservedIdx[port.HostNetwork], 1)
if addrErr != nil {
continue
}
Expand All @@ -583,6 +590,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
return nil, fmt.Errorf("no addresses available for %s network", port.HostNetwork)
}
offer = append(offer, *allocPort)
portsInOffer = append(portsInOffer, allocPort.Value)
}

return offer, nil
Expand Down Expand Up @@ -641,13 +649,15 @@ func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkRe
// lower memory usage.
var dynPorts []int
var dynErr error
dynPorts, dynErr = getDynamicPortsStochastic(used, idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
dynPorts, dynErr = getDynamicPortsStochastic(used, nil,
idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
if dynErr == nil {
goto BUILD_OFFER
}

// Fall back to the precise method if the random sampling failed.
dynPorts, dynErr = getDynamicPortsPrecise(used, idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
dynPorts, dynErr = getDynamicPortsPrecise(used, nil,
idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts))
if dynErr != nil {
err = dynErr
return
Expand All @@ -673,10 +683,11 @@ func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkRe
}

// getDynamicPortsPrecise takes the nodes used port bitmap which may be nil if
// no ports have been allocated yet, the network ask and returns a set of unused
// ports to fulfil the ask's DynamicPorts or an error if it failed. An error
// means the ask can not be satisfied as the method does a precise search.
func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, reserved []Port, numDyn int) ([]int, error) {
// no ports have been allocated yet, any ports already offered in the caller,
// and the network ask. It returns a set of unused ports to fulfil the ask's
// DynamicPorts or an error if it failed. An error means the ask can not be
// satisfied as the method does a precise search.
func getDynamicPortsPrecise(nodeUsed Bitmap, portsInOffer []int, minDynamicPort, maxDynamicPort int, reserved []Port, numDyn int) ([]int, error) {
// Create a copy of the used ports and apply the new reserves
var usedSet Bitmap
var err error
Expand All @@ -696,8 +707,10 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int,
usedSet.Set(uint(port.Value))
}

// Get the indexes of the unset
availablePorts := usedSet.IndexesInRange(false, uint(minDynamicPort), uint(maxDynamicPort))
// Get the indexes of the unset ports, less those which have already been
// picked as part of this offer
availablePorts := usedSet.IndexesInRangeFiltered(
false, uint(minDynamicPort), uint(maxDynamicPort), portsInOffer)

// Randomize the amount we need
if len(availablePorts) < numDyn {
Expand All @@ -713,12 +726,13 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int,
return availablePorts[:numDyn], nil
}

// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil if
// no ports have been allocated yet, the network ask and returns a set of unused
// ports to fulfil the ask's DynamicPorts or an error if it failed. An error
// does not mean the ask can not be satisfied as the method has a fixed amount
// of random probes and if these fail, the search is aborted.
func getDynamicPortsStochastic(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, reservedPorts []Port, count int) ([]int, error) {
// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil
// if no ports have been allocated yet, any ports already offered in the caller,
// and the network ask. It returns a set of unused ports to fulfil the ask's
// DynamicPorts or an error if it failed. An error does not mean the ask can not
// be satisfied as the method has a fixed amount of random probes and if these
// fail, the search is aborted.
func getDynamicPortsStochastic(nodeUsed Bitmap, portsInOffer []int, minDynamicPort, maxDynamicPort int, reservedPorts []Port, count int) ([]int, error) {
var reserved, dynamic []int
for _, port := range reservedPorts {
reserved = append(reserved, port.Value)
Expand All @@ -742,6 +756,12 @@ func getDynamicPortsStochastic(nodeUsed Bitmap, minDynamicPort, maxDynamicPort i
goto PICK
}
}
// the pick conflicted with a previous pick that hasn't been saved to
// the index yet
if slices.Contains(portsInOffer, randPort) {
goto PICK
}

dynamic = append(dynamic, randPort)
}

Expand Down
98 changes: 80 additions & 18 deletions nomad/structs/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -354,6 +355,72 @@ func TestNetworkIndex_yieldIP(t *testing.T) {
}
}

// TestNetworkIndex_AssignPorts exercises assigning ports on group networks.
func TestNetworkIndex_AssignPorts(t *testing.T) {
ci.Parallel(t)

// Create a node that only has one free port
idx := NewNetworkIndex()
n := &Node{
NodeResources: &NodeResources{
Networks: []*NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
IP: "192.168.0.100",
MBits: 1000,
},
},
NodeNetworks: []*NodeNetworkResource{
{
Mode: "host",
Device: "eth0",
Speed: 1000,
Addresses: []NodeNetworkAddress{
{
Alias: "default",
Address: "192.168.0.100",
Family: NodeNetworkAF_IPv4,
},
},
},
},
},
ReservedResources: &NodeReservedResources{
Networks: NodeReservedNetworkResources{
ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-2),
},
},
}

idx.SetNode(n)

// Ask for 2 dynamic ports
ask := &NetworkResource{
ReservedPorts: []Port{{"static", 443, 443, "default"}},
DynamicPorts: []Port{{"http", 0, 80, "default"}, {"admin", 0, 8080, "default"}},
}
offer, err := idx.AssignPorts(ask)
must.NoError(t, err)
must.NotNil(t, offer, must.Sprint("did not get an offer"))

staticPortMapping, ok := offer.Get("static")
must.True(t, ok)

httpPortMapping, ok := offer.Get("http")
must.True(t, ok)

adminPortMapping, ok := offer.Get("admin")
must.True(t, ok)

must.NotEq(t, httpPortMapping.Value, adminPortMapping.Value,
must.Sprint("assigned dynamic ports must not conflict"))

must.Eq(t, 443, staticPortMapping.Value)
must.Between(t, idx.MaxDynamicPort-1, httpPortMapping.Value, idx.MaxDynamicPort)
must.Between(t, idx.MaxDynamicPort-1, adminPortMapping.Value, idx.MaxDynamicPort)
}

func TestNetworkIndex_AssignTaskNetwork(t *testing.T) {
ci.Parallel(t)
idx := NewNetworkIndex()
Expand Down Expand Up @@ -476,32 +543,27 @@ func TestNetworkIndex_AssignTaskNetwork_Dynamic_Contention(t *testing.T) {
},
ReservedResources: &NodeReservedResources{
Networks: NodeReservedNetworkResources{
ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-1),
ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-2),
},
},
}

idx.SetNode(n)

// Ask for dynamic ports
// Ask for 2 dynamic ports
ask := &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80, ""}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"admin", 0, 443, ""}},
}
offer, err := idx.AssignTaskNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.100" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.DynamicPorts) != 1 {
t.Fatalf("There should be one dynamic ports")
}
if p := offer.DynamicPorts[0].Value; p != idx.MaxDynamicPort {
t.Fatalf("Dynamic Port: should have been assigned %d; got %d", p, idx.MaxDynamicPort)
}
must.NoError(t, err)
must.NotNil(t, offer, must.Sprint("did not get an offer"))
must.Eq(t, "192.168.0.100", offer.IP)
must.Len(t, 2, offer.DynamicPorts, must.Sprint("There should be one dynamic ports"))

must.NotEq(t, offer.DynamicPorts[0].Value, offer.DynamicPorts[1].Value,
must.Sprint("assigned dynamic ports must not conflict"))
must.Between(t, idx.MaxDynamicPort-1, offer.DynamicPorts[0].Value, idx.MaxDynamicPort)
must.Between(t, idx.MaxDynamicPort-1, offer.DynamicPorts[1].Value, idx.MaxDynamicPort)
}

// COMPAT(0.11): Remove in 0.11
Expand Down
4 changes: 4 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,10 @@ func TestServiceSched_Spread(t *testing.T) {
if i%2 == 0 {
node.Datacenter = "dc2"
}
// setting a narrow range makes it more likely for this test to
// hit bugs in NetworkIndex
node.NodeResources.MinDynamicPort = 20000
node.NodeResources.MaxDynamicPort = 20005
nodes = append(nodes, node)
assert.Nil(h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node), "UpsertNode")
nodeMap[node.ID] = node
Expand Down
4 changes: 2 additions & 2 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ OUTER:
return nil
}

// Get the proposed allocations
// Get the allocations that already exist on the node + those allocs
// that have been placed as part of this same evaluation
proposed, err := option.ProposedAllocs(iter.ctx)
if err != nil {
iter.ctx.Logger().Named("binpack").Error("failed retrieving proposed allocations", "error", err)
Expand Down Expand Up @@ -400,7 +401,6 @@ OUTER:
continue OUTER
}
}

// Reserve this to prevent another task from colliding
netIdx.AddReserved(offer)

Expand Down

0 comments on commit 5c9a035

Please sign in to comment.