From 21cdd93db1cc7b5e64b46c4d31fda159742f0817 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 14 Jan 2022 20:09:14 -0500 Subject: [PATCH] scheduler: detect and log unexpected scheduling collisions (#11793) --- nomad/fsm.go | 2 +- nomad/job_endpoint.go | 2 +- nomad/mock/mock.go | 53 ++++++++ nomad/plan_apply.go | 2 + nomad/server.go | 50 +++++++- nomad/structs/funcs.go | 8 +- nomad/structs/network.go | 147 ++++++++++++++++++---- nomad/structs/network_test.go | 138 +++++++++++++++++--- nomad/structs/structs.go | 23 ++++ nomad/structs/structs_test.go | 71 +++++++++++ nomad/worker.go | 2 +- nomad/worker_test.go | 11 +- scheduler/context.go | 74 ++++++++++- scheduler/context_test.go | 47 ++++++- scheduler/generic_sched.go | 33 ++--- scheduler/generic_sched_test.go | 12 +- scheduler/rank.go | 30 ++++- scheduler/rank_test.go | 215 ++++++++++++++++++++++++++++++++ scheduler/scheduler.go | 6 +- scheduler/system_sched.go | 18 +-- scheduler/system_sched_test.go | 17 ++- scheduler/testing.go | 14 ++- 22 files changed, 871 insertions(+), 104 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 59f007202f53..7a962b9a0e36 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1729,7 +1729,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { // Ignore eval event creation during snapshot restore snap.UpsertEvals(structs.IgnoreUnknownTypeFlag, 100, []*structs.Evaluation{eval}) // Create the scheduler and run it - sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner) + sched, err := scheduler.NewScheduler(eval.Type, n.logger, nil, snap, planner) if err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 83f517dfc938..f8cf505de487 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1770,7 +1770,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) } // Create the scheduler and run it - sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner) + sched, err := scheduler.NewScheduler(eval.Type, j.logger, j.srv.workersEventCh, snap, planner) if err != nil { return err } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index a3fe090542cc..5eda923b6fe9 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -2,6 +2,7 @@ package mock import ( "fmt" + "math/rand" "time" "github.com/hashicorp/nomad/helper" @@ -1171,6 +1172,58 @@ func Alloc() *structs.Allocation { return alloc } +func AllocWithoutReservedPort() *structs.Allocation { + alloc := Alloc() + alloc.Resources.Networks[0].ReservedPorts = nil + alloc.TaskResources["web"].Networks[0].ReservedPorts = nil + alloc.AllocatedResources.Tasks["web"].Networks[0].ReservedPorts = nil + + return alloc +} + +func AllocForNode(n *structs.Node) *structs.Allocation { + nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address + + dynamicPortRange := structs.MaxDynamicPort - structs.MinDynamicPort + randomDynamicPort := rand.Intn(dynamicPortRange) + structs.MinDynamicPort + + alloc := Alloc() + alloc.NodeID = n.ID + + // Set node IP address. + alloc.Resources.Networks[0].IP = nodeIP + alloc.TaskResources["web"].Networks[0].IP = nodeIP + alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP + + // Set dynamic port to a random value. + alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + + return alloc + +} + +func AllocForNodeWithoutReservedPort(n *structs.Node) *structs.Allocation { + nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address + + dynamicPortRange := structs.MaxDynamicPort - structs.MinDynamicPort + randomDynamicPort := rand.Intn(dynamicPortRange) + structs.MinDynamicPort + + alloc := AllocWithoutReservedPort() + alloc.NodeID = n.ID + + // Set node IP address. + alloc.Resources.Networks[0].IP = nodeIP + alloc.TaskResources["web"].Networks[0].IP = nodeIP + alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP + + // Set dynamic port to a random value. + alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + + return alloc +} + // ConnectJob adds a Connect proxy sidecar group service to mock.Alloc. func ConnectAlloc() *structs.Allocation { alloc := Alloc() diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 916af7f291f6..c2c529e85937 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -473,6 +473,8 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan return true } if !fit { + metrics.IncrCounterWithLabels([]string{"nomad", "plan", "node_rejected"}, 1, []metrics.Label{{Name: "node_id", Value: nodeID}}) + // Log the reason why the node's allocations could not be made if reason != "" { //TODO This was debug level and should return diff --git a/nomad/server.go b/nomad/server.go index 5fabdc385ddb..79d15371d731 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -226,7 +227,8 @@ type Server struct { vault VaultClient // Worker used for processing - workers []*Worker + workers []*Worker + workersEventCh chan interface{} // aclCache is used to maintain the parsed ACL objects aclCache *lru.TwoQueueCache @@ -342,6 +344,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr blockedEvals: NewBlockedEvals(evalBroker, logger), rpcTLS: incomingTLS, aclCache: aclCache, + workersEventCh: make(chan interface{}, 1), } s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background()) @@ -1450,6 +1453,8 @@ func (s *Server) setupWorkers() error { return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore) } + go s.listenWorkerEvents() + // Start the workers for i := 0; i < s.config.NumSchedulers; i++ { if w, err := NewWorker(s); err != nil { @@ -1462,6 +1467,49 @@ func (s *Server) setupWorkers() error { return nil } +// listenWorkerEvents listens for events emitted by scheduler workers and log +// them if necessary. Some events may be skipped to avoid polluting logs with +// duplicates. +func (s *Server) listenWorkerEvents() { + loggedAt := make(map[string]time.Time) + + gcDeadline := 4 * time.Hour + gcTicker := time.NewTicker(10 * time.Second) + defer gcTicker.Stop() + + for { + select { + case <-gcTicker.C: + for k, v := range loggedAt { + if time.Since(v) >= gcDeadline { + delete(loggedAt, k) + } + } + case e := <-s.workersEventCh: + switch event := e.(type) { + case *scheduler.PortCollisionEvent: + if event == nil || event.Node == nil { + continue + } + + if _, ok := loggedAt[event.Node.ID]; ok { + continue + } + + eventJson, err := json.Marshal(event.Sanitize()) + if err != nil { + s.logger.Debug("failed to encode event to JSON", "error", err) + } + s.logger.Warn("unexpected node port collision, refer to https://www.nomadproject.io/s/port-plan-failure for more information", + "node_id", event.Node.ID, "reason", event.Reason, "event", string(eventJson)) + loggedAt[event.Node.ID] = time.Now() + } + case <-s.shutdownCh: + return + } + } +} + // numPeers is used to check on the number of known peers, including the local // node. func (s *Server) numPeers() (int, error) { diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 7907b027b0f6..b78bff46adb8 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -119,8 +119,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi if netIdx == nil { netIdx = NewNetworkIndex() defer netIdx.Release() - if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) { - return false, "reserved port collision", used, nil + + if collision, reason := netIdx.SetNode(node); collision { + return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil + } + if collision, reason := netIdx.AddAllocs(allocs); collision { + return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil } } diff --git a/nomad/structs/network.go b/nomad/structs/network.go index 88a629abf8b4..812a174f2f7c 100644 --- a/nomad/structs/network.go +++ b/nomad/structs/network.go @@ -5,6 +5,8 @@ import ( "math/rand" "net" "sync" + + "github.com/hashicorp/nomad/helper" ) const ( @@ -67,6 +69,83 @@ func (idx *NetworkIndex) getUsedPortsFor(ip string) Bitmap { return used } +func (idx *NetworkIndex) Copy() *NetworkIndex { + if idx == nil { + return nil + } + + c := new(NetworkIndex) + *c = *idx + + c.AvailNetworks = copyNetworkResources(idx.AvailNetworks) + c.NodeNetworks = copyNodeNetworks(idx.NodeNetworks) + c.AvailAddresses = copyAvailAddresses(idx.AvailAddresses) + if idx.AvailBandwidth != nil && len(idx.AvailBandwidth) == 0 { + c.AvailBandwidth = make(map[string]int) + } else { + c.AvailBandwidth = helper.CopyMapStringInt(idx.AvailBandwidth) + } + if len(idx.UsedPorts) > 0 { + c.UsedPorts = make(map[string]Bitmap, len(idx.UsedPorts)) + for k, v := range idx.UsedPorts { + c.UsedPorts[k], _ = v.Copy() + } + } + if idx.UsedBandwidth != nil && len(idx.UsedBandwidth) == 0 { + c.UsedBandwidth = make(map[string]int) + } else { + c.UsedBandwidth = helper.CopyMapStringInt(idx.UsedBandwidth) + } + + return c +} + +func copyNetworkResources(resources []*NetworkResource) []*NetworkResource { + l := len(resources) + if l == 0 { + return nil + } + + c := make([]*NetworkResource, l) + for i, resource := range resources { + c[i] = resource.Copy() + } + return c +} + +func copyNodeNetworks(resources []*NodeNetworkResource) []*NodeNetworkResource { + l := len(resources) + if l == 0 { + return nil + } + + c := make([]*NodeNetworkResource, l) + for i, resource := range resources { + c[i] = resource.Copy() + } + return c +} + +func copyAvailAddresses(a map[string][]NodeNetworkAddress) map[string][]NodeNetworkAddress { + l := len(a) + if l == 0 { + return nil + } + + c := make(map[string][]NodeNetworkAddress, l) + for k, v := range a { + if len(v) == 0 { + continue + } + c[k] = make([]NodeNetworkAddress, len(v)) + for i, a := range v { + c[k][i] = a + } + } + + return c +} + // Release is called when the network index is no longer needed // to attempt to re-use some of the memory it has allocated func (idx *NetworkIndex) Release() { @@ -89,7 +168,7 @@ func (idx *NetworkIndex) Overcommitted() bool { // SetNode is used to setup the available network resources. Returns // true if there is a collision -func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { +func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) { // COMPAT(0.11): Remove in 0.11 // Grab the network resources, handling both new and old @@ -118,8 +197,9 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { for _, n := range nodeNetworks { for _, a := range n.Addresses { idx.AvailAddresses[a.Alias] = append(idx.AvailAddresses[a.Alias], a) - if idx.AddReservedPortsForIP(a.ReservedPorts, a.Address) { + if c, r := idx.AddReservedPortsForIP(a.ReservedPorts, a.Address); c { collide = true + reason = fmt.Sprintf("collision when reserving ports for node network %s in node %s: %v", a.Alias, node.ID, r) } } } @@ -127,11 +207,16 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { // COMPAT(0.11): Remove in 0.11 // Handle reserving ports, handling both new and old if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" { - collide = idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts) + c, r := idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts) + collide = c + if collide { + reason = fmt.Sprintf("collision when reserving port range for node %s: %v", node.ID, r) + } } else if node.Reserved != nil { for _, n := range node.Reserved.Networks { - if idx.AddReserved(n) { + if c, r := idx.AddReserved(n); c { collide = true + reason = fmt.Sprintf("collision when reserving network %s for node %s: %v", n.IP, node.ID, r) } } } @@ -141,7 +226,7 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { // AddAllocs is used to add the used network resources. Returns // true if there is a collision -func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { +func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool, reason string) { for _, alloc := range allocs { // Do not consider the resource impact of terminal allocations if alloc.TerminalStatus() { @@ -152,38 +237,42 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { // Only look at AllocatedPorts if populated, otherwise use pre 0.12 logic // COMPAT(1.0): Remove when network resources struct is removed. if len(alloc.AllocatedResources.Shared.Ports) > 0 { - if idx.AddReservedPorts(alloc.AllocatedResources.Shared.Ports) { + if c, r := idx.AddReservedPorts(alloc.AllocatedResources.Shared.Ports); c { collide = true + reason = fmt.Sprintf("collision when reserving port for alloc %s: %v", alloc.ID, r) } } else { // Add network resources that are at the task group level if len(alloc.AllocatedResources.Shared.Networks) > 0 { for _, network := range alloc.AllocatedResources.Shared.Networks { - if idx.AddReserved(network) { + if c, r := idx.AddReserved(network); c { collide = true + reason = fmt.Sprintf("collision when reserving port for network %s in alloc %s: %v", network.IP, alloc.ID, r) } } } - for _, task := range alloc.AllocatedResources.Tasks { - if len(task.Networks) == 0 { + for task, resources := range alloc.AllocatedResources.Tasks { + if len(resources.Networks) == 0 { continue } - n := task.Networks[0] - if idx.AddReserved(n) { + n := resources.Networks[0] + if c, r := idx.AddReserved(n); c { collide = true + reason = fmt.Sprintf("collision when reserving port for network %s in task %s of alloc %s: %v", n.IP, task, alloc.ID, r) } } } } else { // COMPAT(0.11): Remove in 0.11 - for _, task := range alloc.TaskResources { - if len(task.Networks) == 0 { + for task, resources := range alloc.TaskResources { + if len(resources.Networks) == 0 { continue } - n := task.Networks[0] - if idx.AddReserved(n) { + n := resources.Networks[0] + if c, r := idx.AddReserved(n); c { collide = true + reason = fmt.Sprintf("(deprecated) collision when reserving port for network %s in task %s of alloc %s: %v", n.IP, task, alloc.ID, r) } } } @@ -193,7 +282,7 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { // AddReserved is used to add a reserved network usage, returns true // if there is a port collision -func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) { +func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool, reasons []string) { // Add the port usage used := idx.getUsedPortsFor(n.IP) @@ -201,10 +290,12 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) { for _, port := range ports { // Guard against invalid port if port.Value < 0 || port.Value >= maxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port.Value)} } if used.Check(uint(port.Value)) { collide = true + reason := fmt.Sprintf("port %d already in use", port.Value) + reasons = append(reasons, reason) } else { used.Set(uint(port.Value)) } @@ -216,14 +307,16 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) { return } -func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool) { +func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool, reasons []string) { for _, port := range ports { used := idx.getUsedPortsFor(port.HostIP) if port.Value < 0 || port.Value >= maxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port.Value)} } if used.Check(uint(port.Value)) { collide = true + reason := fmt.Sprintf("port %d already in use", port.Value) + reasons = append(reasons, reason) } else { used.Set(uint(port.Value)) } @@ -235,7 +328,7 @@ func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool) { // AddReservedPortRange marks the ports given as reserved on all network // interfaces. The port format is comma delimited, with spans given as n1-n2 // (80,100-200,205) -func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) { +func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool, reasons []string) { // Convert the ports into a slice of ints resPorts, err := ParsePortRanges(ports) if err != nil { @@ -251,10 +344,12 @@ func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) { for _, port := range resPorts { // Guard against invalid port if port >= maxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port)} } if used.Check(uint(port)) { collide = true + reason := fmt.Sprintf("port %d already in use", port) + reasons = append(reasons, reason) } else { used.Set(uint(port)) } @@ -265,7 +360,7 @@ func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) { } // AddReservedPortsForIP -func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool) { +func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool, reasons []string) { // Convert the ports into a slice of ints resPorts, err := ParsePortRanges(ports) if err != nil { @@ -276,10 +371,12 @@ func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide for _, port := range resPorts { // Guard against invalid port if port >= maxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port)} } if used.Check(uint(port)) { collide = true + reason := fmt.Sprintf("port %d already in use", port) + reasons = append(reasons, reason) } else { used.Set(uint(port)) } @@ -352,7 +449,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro return nil, addrErr } - return nil, fmt.Errorf("no addresses available for %q network", port.HostNetwork) + return nil, fmt.Errorf("no addresses available for %s network", port.HostNetwork) } offer = append(offer, *allocPort) @@ -393,7 +490,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro return nil, addrErr } - return nil, fmt.Errorf("no addresses available for %q network", port.HostNetwork) + return nil, fmt.Errorf("no addresses available for %s network", port.HostNetwork) } offer = append(offer, *allocPort) } diff --git a/nomad/structs/network_test.go b/nomad/structs/network_test.go index ea80f1169764..8ef380c5ca5a 100644 --- a/nomad/structs/network_test.go +++ b/nomad/structs/network_test.go @@ -9,6 +9,116 @@ import ( "github.com/stretchr/testify/require" ) +func TestNetworkIndex_Copy(t *testing.T) { + n := &Node{ + NodeResources: &NodeResources{ + Networks: []*NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.168.0.100", + MBits: 1000, + }, + }, + NodeNetworks: []*NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + Speed: 1000, + Addresses: []NodeNetworkAddress{ + { + Alias: "default", + Address: "192.168.0.100", + Family: NodeNetworkAF_IPv4, + }, + }, + }, + }, + }, + Reserved: &Resources{ + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + }, + ReservedResources: &NodeReservedResources{ + Networks: NodeReservedNetworkResources{ + ReservedHostPorts: "22", + }, + }, + } + + allocs := []*Allocation{ + { + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 20, + ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}}, + }, + }, + }, + }, + }, + }, + { + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "api": { + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + ReservedPorts: []Port{{"one", 10000, 0, ""}}, + }, + }, + }, + }, + }, + }, + } + + netIdx := NewNetworkIndex() + netIdx.SetNode(n) + netIdx.AddAllocs(allocs) + + // Copy must be equal. + netIdxCopy := netIdx.Copy() + require.Equal(t, netIdx, netIdxCopy) + + // Modifying copy should not affect original value. + n.NodeResources.Networks[0].Device = "eth1" + n.ReservedResources.Networks.ReservedHostPorts = "22,80" + allocs = append(allocs, &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "db": { + Networks: []*NetworkResource{ + { + Device: "eth1", + IP: "192.168.0.104", + MBits: 50, + ReservedPorts: []Port{{"one", 4567, 0, ""}}, + }, + }, + }, + }, + }, + }) + netIdxCopy.SetNode(n) + netIdxCopy.AddAllocs(allocs) + require.NotEqual(t, netIdx, netIdxCopy) +} + func TestNetworkIndex_Overcommitted(t *testing.T) { t.Skip() idx := NewNetworkIndex() @@ -20,8 +130,8 @@ func TestNetworkIndex_Overcommitted(t *testing.T) { MBits: 505, ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}}, } - collide := idx.AddReserved(reserved) - if collide { + collide, reasons := idx.AddReserved(reserved) + if collide || len(reasons) != 0 { t.Fatalf("bad") } if !idx.Overcommitted() { @@ -71,8 +181,8 @@ func TestNetworkIndex_SetNode(t *testing.T) { }, }, } - collide := idx.SetNode(n) - if collide { + collide, reason := idx.SetNode(n) + if collide || reason != "" { t.Fatalf("bad") } @@ -123,8 +233,8 @@ func TestNetworkIndex_AddAllocs(t *testing.T) { }, }, } - collide := idx.AddAllocs(allocs) - if collide { + collide, reason := idx.AddAllocs(allocs) + if collide || reason != "" { t.Fatalf("bad") } @@ -151,8 +261,8 @@ func TestNetworkIndex_AddReserved(t *testing.T) { MBits: 20, ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}}, } - collide := idx.AddReserved(reserved) - if collide { + collide, reasons := idx.AddReserved(reserved) + if collide || len(reasons) > 0 { t.Fatalf("bad") } @@ -167,8 +277,8 @@ func TestNetworkIndex_AddReserved(t *testing.T) { } // Try to reserve the same network - collide = idx.AddReserved(reserved) - if !collide { + collide, reasons = idx.AddReserved(reserved) + if !collide || len(reasons) == 0 { t.Fatalf("bad") } } @@ -375,8 +485,8 @@ func TestNetworkIndex_SetNode_Old(t *testing.T) { }, }, } - collide := idx.SetNode(n) - if collide { + collide, reason := idx.SetNode(n) + if collide || reason != "" { t.Fatalf("bad") } @@ -427,8 +537,8 @@ func TestNetworkIndex_AddAllocs_Old(t *testing.T) { }, }, } - collide := idx.AddAllocs(allocs) - if collide { + collide, reason := idx.AddAllocs(allocs) + if collide || reason != "" { t.Fatalf("bad") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 240c524e3ef1..2caeebdffa5d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2421,6 +2421,22 @@ func (n *NodeNetworkResource) Equals(o *NodeNetworkResource) bool { return reflect.DeepEqual(n, o) } +func (n *NodeNetworkResource) Copy() *NodeNetworkResource { + if n == nil { + return nil + } + + c := new(NodeNetworkResource) + *c = *n + + if n.Addresses != nil { + c.Addresses = make([]NodeNetworkAddress, len(n.Addresses)) + copy(c.Addresses, n.Addresses) + } + + return c +} + func (n *NodeNetworkResource) HasAlias(alias string) bool { for _, addr := range n.Addresses { if addr.Alias == alias { @@ -2782,6 +2798,13 @@ func (n *NodeResources) Copy() *NodeResources { // Copy the networks newN.Networks = n.Networks.Copy() + if n.NodeNetworks != nil { + newN.NodeNetworks = make([]*NodeNetworkResource, len(n.NodeNetworks)) + for i, nn := range n.NodeNetworks { + newN.NodeNetworks[i] = nn.Copy() + } + } + // Copy the devices if n.Devices != nil { devices := len(n.Devices) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index b9bce1aea532..f6715f22b8b5 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2809,6 +2809,36 @@ func TestComparableResources_Subtract(t *testing.T) { require.Equal(expect, r1) } +func TestNodeNetworkResource_Copy(t *testing.T) { + netResource := &NodeNetworkResource{ + Mode: "host", + Device: "eth0", + MacAddress: "00:00:00:00:00:00", + Speed: 1000, + Addresses: []NodeNetworkAddress{ + { + Family: NodeNetworkAF_IPv4, + Alias: "default", + Address: "192.168.0.2", + ReservedPorts: "22", + Gateway: "192.168.0.1", + }, + }, + } + + // Copy must be equal. + netResourceCopy := netResource.Copy() + require.Equal(t, netResource, netResourceCopy) + + // Modifying copy should not modify original value. + netResourceCopy.Mode = "alloc" + netResourceCopy.Device = "eth1" + netResourceCopy.MacAddress = "11:11:11:11:11:11" + netResourceCopy.Speed = 500 + netResourceCopy.Addresses[0].Alias = "copy" + require.NotEqual(t, netResource, netResourceCopy) +} + func TestEncodeDecode(t *testing.T) { type FooRequest struct { Foo string @@ -5811,6 +5841,47 @@ func TestMultiregion_CopyCanonicalize(t *testing.T) { require.False(old.Diff(nonEmptyOld)) } +func TestNodeResources_Copy(t *testing.T) { + orig := &NodeResources{ + Cpu: NodeCpuResources{ + CpuShares: int64(32000), + }, + Memory: NodeMemoryResources{ + MemoryMB: int64(64000), + }, + Networks: Networks{ + { + Device: "foo", + }, + }, + NodeNetworks: []*NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + MacAddress: "00:00:00:00:00:00", + Speed: 1000, + Addresses: []NodeNetworkAddress{ + { + Family: NodeNetworkAF_IPv4, + Alias: "private", + Address: "192.168.0.100", + ReservedPorts: "22,80", + Gateway: "192.168.0.1", + }, + }, + }, + }, + } + + kopy := orig.Copy() + assert.Equal(t, orig, kopy) + + // Make sure slices aren't shared + kopy.NodeNetworks[0].MacAddress = "11:11:11:11:11:11" + kopy.NodeNetworks[0].Addresses[0].Alias = "public" + assert.NotEqual(t, orig.NodeNetworks[0], kopy.NodeNetworks[0]) +} + func TestNodeResources_Merge(t *testing.T) { res := &NodeResources{ Cpu: NodeCpuResources{ diff --git a/nomad/worker.go b/nomad/worker.go index 2e0d36b25cf5..9223546b9781 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -258,7 +258,7 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua if eval.Type == structs.JobTypeCore { sched = NewCoreScheduler(w.srv, snap) } else { - sched, err = scheduler.NewScheduler(eval.Type, w.logger, snap, w) + sched, err = scheduler.NewScheduler(eval.Type, w.logger, w.srv.workersEventCh, snap, w) if err != nil { return fmt.Errorf("failed to instantiate scheduler: %v", err) } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index cde8d4886bb4..98ffcb8fdf26 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -20,10 +20,11 @@ import ( ) type NoopScheduler struct { - state scheduler.State - planner scheduler.Planner - eval *structs.Evaluation - err error + state scheduler.State + planner scheduler.Planner + eval *structs.Evaluation + eventsCh chan<- interface{} + err error } func (n *NoopScheduler) Process(eval *structs.Evaluation) error { @@ -38,7 +39,7 @@ func (n *NoopScheduler) Process(eval *structs.Evaluation) error { } func init() { - scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, s scheduler.State, p scheduler.Planner) scheduler.Scheduler { + scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, eventsCh chan<- interface{}, s scheduler.State, p scheduler.Planner) scheduler.Scheduler { n := &NoopScheduler{ state: s, planner: p, diff --git a/scheduler/context.go b/scheduler/context.go index fa7b51648703..b73f1356471c 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -42,6 +42,10 @@ type Context interface { // Eligibility returns a tracker for node eligibility in the context of the // eval. Eligibility() *EvalEligibility + + // SendEvent provides best-effort delivery of scheduling and placement + // events. + SendEvent(event interface{}) } // EvalCache is used to cache certain things during an evaluation @@ -72,9 +76,57 @@ func (e *EvalCache) SemverConstraintCache() map[string]VerConstraints { return e.semverCache } +// PortCollisionEvent is an event that can happen during scheduling when +// an unexpected port collision is detected. +type PortCollisionEvent struct { + Reason string + Node *structs.Node + Allocations []*structs.Allocation + + // TODO: this is a large struct, but may be required to debug unexpected + // port collisions. Re-evaluate its need in the future if the bug is fixed + // or not caused by this field. + NetIndex *structs.NetworkIndex +} + +func (ev *PortCollisionEvent) Copy() *PortCollisionEvent { + if ev == nil { + return nil + } + c := new(PortCollisionEvent) + *c = *ev + c.Node = ev.Node.Copy() + if len(ev.Allocations) > 0 { + for i, a := range ev.Allocations { + c.Allocations[i] = a.Copy() + } + + } + c.NetIndex = ev.NetIndex.Copy() + return c +} + +func (ev *PortCollisionEvent) Sanitize() *PortCollisionEvent { + if ev == nil { + return nil + } + clean := ev.Copy() + + clean.Node.SecretID = "" + clean.Node.Meta = make(map[string]string) + + for i, alloc := range ev.Allocations { + clean.Allocations[i] = alloc.CopySkipJob() + clean.Allocations[i].Job = nil + } + + return clean +} + // EvalContext is a Context used during an Evaluation type EvalContext struct { EvalCache + eventsCh chan<- interface{} state State plan *structs.Plan logger log.Logger @@ -83,12 +135,13 @@ type EvalContext struct { } // NewEvalContext constructs a new EvalContext -func NewEvalContext(s State, p *structs.Plan, log log.Logger) *EvalContext { +func NewEvalContext(eventsCh chan<- interface{}, s State, p *structs.Plan, log log.Logger) *EvalContext { ctx := &EvalContext{ - state: s, - plan: p, - logger: log, - metrics: new(structs.AllocMetric), + eventsCh: eventsCh, + state: s, + plan: p, + logger: log, + metrics: new(structs.AllocMetric), } return ctx } @@ -164,6 +217,17 @@ func (e *EvalContext) Eligibility() *EvalEligibility { return e.eligibility } +func (e *EvalContext) SendEvent(event interface{}) { + if e == nil || e.eventsCh == nil { + return + } + + select { + case e.eventsCh <- event: + default: + } +} + type ComputedClassFeasibility byte const ( diff --git a/scheduler/context_test.go b/scheduler/context_test.go index e14290ee36ed..75bc7ed36868 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -21,7 +21,7 @@ func testContext(t testing.TB) (*state.StateStore, *EvalContext) { logger := testlog.HCLogger(t) - ctx := NewEvalContext(state, plan, logger) + ctx := NewEvalContext(nil, state, plan, logger) return state, ctx } @@ -392,3 +392,48 @@ func TestEvalEligibility_GetClasses_JobEligible_TaskGroupIneligible(t *testing.T actClasses := e.GetClasses() require.Equal(t, expClasses, actClasses) } + +func TestPortCollisionEvent_Copy(t *testing.T) { + ev := &PortCollisionEvent{ + Reason: "original", + Node: mock.Node(), + Allocations: []*structs.Allocation{ + mock.Alloc(), + mock.Alloc(), + }, + NetIndex: structs.NewNetworkIndex(), + } + ev.NetIndex.SetNode(ev.Node) + + // Copy must be equal + evCopy := ev.Copy() + require.Equal(t, ev, evCopy) + + // Modifying the copy should not affect the original value + evCopy.Reason = "copy" + require.NotEqual(t, ev.Reason, evCopy.Reason) + + evCopy.Node.Attributes["test"] = "true" + require.NotEqual(t, ev.Node, evCopy.Node) + + evCopy.Allocations = append(evCopy.Allocations, mock.Alloc()) + require.NotEqual(t, ev.Allocations, evCopy.Allocations) + + evCopy.NetIndex.AddReservedPortRange("1000-2000") + require.NotEqual(t, ev.NetIndex, evCopy.NetIndex) +} + +func TestPortCollisionEvent_Sanitize(t *testing.T) { + ev := &PortCollisionEvent{ + Reason: "original", + Node: mock.Node(), + Allocations: []*structs.Allocation{ + mock.Alloc(), + }, + NetIndex: structs.NewNetworkIndex(), + } + + cleanEv := ev.Sanitize() + require.Empty(t, cleanEv.Node.SecretID) + require.Nil(t, cleanEv.Allocations[0].Job) +} diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 516fbf49c1b2..b34e04b4664a 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -76,10 +76,11 @@ func (s *SetStatusError) Error() string { // most workloads. It also supports a 'batch' mode to optimize for fast decision // making at the cost of quality. type GenericScheduler struct { - logger log.Logger - state State - planner Planner - batch bool + logger log.Logger + eventsCh chan<- interface{} + state State + planner Planner + batch bool eval *structs.Evaluation job *structs.Job @@ -100,23 +101,25 @@ type GenericScheduler struct { } // NewServiceScheduler is a factory function to instantiate a new service scheduler -func NewServiceScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler { s := &GenericScheduler{ - logger: logger.Named("service_sched"), - state: state, - planner: planner, - batch: false, + logger: logger.Named("service_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, + batch: false, } return s } // NewBatchScheduler is a factory function to instantiate a new batch scheduler -func NewBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler { s := &GenericScheduler{ - logger: logger.Named("batch_sched"), - state: state, - planner: planner, - batch: true, + logger: logger.Named("batch_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, + batch: true, } return s } @@ -245,7 +248,7 @@ func (s *GenericScheduler) process() (bool, error) { s.failedTGAllocs = nil // Create an evaluation context - s.ctx = NewEvalContext(s.state, s.plan, s.logger) + s.ctx = NewEvalContext(s.eventsCh, s.state, s.plan, s.logger) // Construct the placement stack s.stack = NewGenericStack(s.batch, s.ctx) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index ada9c5ec31a0..e316ffe11a50 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2165,10 +2165,9 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { // Create allocs that are part of the old deployment var allocs []*structs.Allocation for i := 0; i < 10; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNode(nodes[i]) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = nodes[i].ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) alloc.DeploymentID = d.ID alloc.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} @@ -4071,10 +4070,9 @@ func TestBatchSched_Run_LostAlloc(t *testing.T) { // Create two running allocations var allocs []*structs.Allocation for i := 0; i <= 1; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNodeWithoutReservedPort(node) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) alloc.ClientStatus = structs.AllocClientStatusRunning allocs = append(allocs, alloc) @@ -4566,10 +4564,9 @@ func TestBatchSched_ScaleDown_SameName(t *testing.T) { // Create a few running alloc var allocs []*structs.Allocation for i := 0; i < 5; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNodeWithoutReservedPort(node) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusRunning alloc.Metrics = scoreMetric @@ -5535,10 +5532,9 @@ func TestServiceSched_Migrate_CanaryStatus(t *testing.T) { var allocs []*structs.Allocation for i := 0; i < 3; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNodeWithoutReservedPort(node1) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node1.ID alloc.DeploymentID = deployment.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) diff --git a/scheduler/rank.go b/scheduler/rank.go index 799a29c4692f..29b6503e8472 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -201,10 +201,34 @@ OUTER: continue } - // Index the existing network usage + // Index the existing network usage. + // This should never collide, since it represents the current state of + // the node. If it does collide though, it means we found a bug! So + // collect as much information as possible. netIdx := structs.NewNetworkIndex() - netIdx.SetNode(option.Node) - netIdx.AddAllocs(proposed) + if collide, reason := netIdx.SetNode(option.Node); collide { + iter.ctx.SendEvent(&PortCollisionEvent{ + Reason: reason, + NetIndex: netIdx.Copy(), + Node: option.Node, + }) + iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision") + continue + } + if collide, reason := netIdx.AddAllocs(proposed); collide { + event := &PortCollisionEvent{ + Reason: reason, + NetIndex: netIdx.Copy(), + Node: option.Node, + Allocations: make([]*structs.Allocation, len(proposed)), + } + for i, alloc := range proposed { + event.Allocations[i] = alloc.Copy() + } + iter.ctx.SendEvent(event) + iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision") + continue + } // Create a device allocator devAllocator := newDeviceAllocator(iter.ctx, option.Node) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 2cdad54858d2..a6258013983b 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -487,6 +487,221 @@ func TestBinPackIterator_Network_Failure(t *testing.T) { require.Equal(1, ctx.metrics.DimensionExhausted["network: bandwidth exceeded"]) } +func TestBinPackIterator_Network_PortCollision_Node(t *testing.T) { + _, ctx := testContext(t) + eventsCh := make(chan interface{}) + ctx.eventsCh = eventsCh + + // Collide on host with duplicate IPs. + nodes := []*RankedNode{ + { + Node: &structs.Node{ + ID: uuid.Generate(), + Resources: &structs.Resources{ + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.158.0.100", + }, + }, + }, + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 4096, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 4096, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.158.0.100", + }, + }, + NodeNetworks: []*structs.NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + Addresses: []structs.NodeNetworkAddress{ + { + Alias: "default", + Address: "192.168.0.100", + ReservedPorts: "22,80", + }, + { + Alias: "private", + Address: "192.168.0.100", + ReservedPorts: "22", + }, + }, + }, + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0, structs.SchedulerAlgorithmBinpack) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + out := collectRanked(scoreNorm) + + // We expect a placement failure due to port collision. + require.Len(t, out, 0) + require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: port collision"]) +} + +func TestBinPackIterator_Network_PortCollision_Alloc(t *testing.T) { + state, ctx := testContext(t) + eventsCh := make(chan interface{}) + ctx.eventsCh = eventsCh + + nodes := []*RankedNode{ + { + Node: &structs.Node{ + ID: uuid.Generate(), + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2048, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + // Add allocations with port collision. + j := mock.Job() + alloc1 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + JobID: j.ID, + Job: j, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + alloc2 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + JobID: j.ID, + Job: j, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + require.NoError(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))) + require.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0, structs.SchedulerAlgorithmBinpack) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + out := collectRanked(scoreNorm) + + // We expect a placement failure due to port collision. + require.Len(t, out, 0) + require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: port collision"]) +} + func TestBinPackIterator_PlannedAlloc(t *testing.T) { _, ctx := testContext(t) nodes := []*RankedNode{ diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 95877569db30..f74289f76efc 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -28,7 +28,7 @@ var BuiltinSchedulers = map[string]Factory{ // NewScheduler is used to instantiate and return a new scheduler // given the scheduler name, initial state, and planner. -func NewScheduler(name string, logger log.Logger, state State, planner Planner) (Scheduler, error) { +func NewScheduler(name string, logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) (Scheduler, error) { // Lookup the factory function factory, ok := BuiltinSchedulers[name] if !ok { @@ -36,12 +36,12 @@ func NewScheduler(name string, logger log.Logger, state State, planner Planner) } // Instantiate the scheduler - sched := factory(logger, state, planner) + sched := factory(logger, eventsCh, state, planner) return sched, nil } // Factory is used to instantiate a new Scheduler -type Factory func(log.Logger, State, Planner) Scheduler +type Factory func(log.Logger, chan<- interface{}, State, Planner) Scheduler // Scheduler is the top level instance for a scheduler. A scheduler is // meant to only encapsulate business logic, pushing the various plumbing diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 4b1e5c8cbfaa..de7111125cfa 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -20,9 +20,10 @@ const ( // designed for services that should be run on every client. // One for each job, containing an allocation for each node type SystemScheduler struct { - logger log.Logger - state State - planner Planner + logger log.Logger + eventsCh chan<- interface{} + state State + planner Planner eval *structs.Evaluation job *structs.Job @@ -42,11 +43,12 @@ type SystemScheduler struct { // NewSystemScheduler is a factory function to instantiate a new system // scheduler. -func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler { return &SystemScheduler{ - logger: logger.Named("system_sched"), - state: state, - planner: planner, + logger: logger.Named("system_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, } } @@ -118,7 +120,7 @@ func (s *SystemScheduler) process() (bool, error) { s.failedTGAllocs = nil // Create an evaluation context - s.ctx = NewEvalContext(s.state, s.plan, s.logger) + s.ctx = NewEvalContext(s.eventsCh, s.state, s.plan, s.logger) // Construct the placement stack s.stack = NewSystemStack(s.ctx) diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 35ed1ce5189d..cd15a2b2c5de 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -752,10 +752,9 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { var allocs []*structs.Allocation for _, node := range nodes { - alloc := mock.Alloc() + alloc := mock.AllocForNode(node) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } @@ -1956,10 +1955,9 @@ func TestSystemSched_Preemption(t *testing.T) { }, Networks: []*structs.NetworkResource{ { - Device: "eth0", - IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, - MBits: 200, + Device: "eth0", + IP: "192.168.0.100", + MBits: 200, }, }, }, @@ -2045,10 +2043,9 @@ func TestSystemSched_Preemption(t *testing.T) { }, Networks: []*structs.NetworkResource{ { - Device: "eth0", - IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, - MBits: 400, + Device: "eth0", + IP: "192.168.0.100", + MBits: 400, }, }, }, diff --git a/scheduler/testing.go b/scheduler/testing.go index dada4d105a93..bbd7c8c07976 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -262,7 +262,19 @@ func (h *Harness) Snapshot() State { // a snapshot of current state using the harness for planning. func (h *Harness) Scheduler(factory Factory) Scheduler { logger := testlog.HCLogger(h.t) - return factory(logger, h.Snapshot(), h) + eventsCh := make(chan interface{}) + + // Listen for and log events from the scheduler. + go func() { + for e := range eventsCh { + switch event := e.(type) { + case *PortCollisionEvent: + h.t.Errorf("unexpected worker eval event: %v", event.Reason) + } + } + }() + + return factory(logger, eventsCh, h.Snapshot(), h) } // Process is used to process an evaluation given a factory