diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a62f9c64231e..c3c928600fb8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7388,6 +7388,7 @@ func (a *Allocation) ComparableResources() *ComparableResources { Memory: AllocatedMemoryResources{ MemoryMB: int64(resources.MemoryMB), }, + Networks: resources.Networks, }, Shared: AllocatedSharedResources{ DiskMB: int64(resources.DiskMB), diff --git a/scheduler/preemption.go b/scheduler/preemption.go index bdd6428f0fe0..6ecf49cd8289 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -17,6 +17,82 @@ type groupedAllocs struct { allocs []*structs.Allocation } +// PreemptionResource interface is implemented by different +// types of resources. +type PreemptionResource interface { + // MeetsRequirements returns true if the available resources match needed resources + MeetsRequirements() bool + + // Distance returns values in the range [0, MaxFloat], lower is better + Distance() float64 +} + +// NetworkPreemptionResource implements PreemptionResource for network assignments +// It only looks at MBits needed +type NetworkPreemptionResource struct { + availableResources *structs.NetworkResource + resourceNeeded *structs.NetworkResource +} + +func (n *NetworkPreemptionResource) MeetsRequirements() bool { + mbitsAvailable := n.availableResources.MBits + mbitsNeeded := n.resourceNeeded.MBits + if mbitsAvailable == 0 || mbitsNeeded == 0 { + return false + } + return mbitsAvailable >= mbitsNeeded +} + +func (n *NetworkPreemptionResource) Distance() float64 { + networkCoord := math.MaxFloat64 + if n.availableResources != nil && n.resourceNeeded != nil { + networkCoord = float64(n.resourceNeeded.MBits-n.availableResources.MBits) / float64(n.resourceNeeded.MBits) + } + + originDist := math.Sqrt( + math.Pow(networkCoord, 2)) + return originDist +} + +// BasePreemptionResource implements PreemptionResource for CPU/Memory/Disk +type BasePreemptionResource struct { + availableResources *structs.ComparableResources + resourceNeeded *structs.ComparableResources +} + +func (b *BasePreemptionResource) MeetsRequirements() bool { + super, _ := b.availableResources.Superset(b.resourceNeeded) + return super +} + +func (b *BasePreemptionResource) Distance() float64 { + return basicResourceDistance(b.resourceNeeded, b.availableResources) +} + +// PreemptionResourceFactory returns a new PreemptionResource +type PreemptionResourceFactory func(availableResources *structs.ComparableResources, resourceAsk *structs.ComparableResources) PreemptionResource + +// GetNetworkPreemptionResourceFactory returns a preemption resource factory for network assignments +func GetNetworkPreemptionResourceFactory() PreemptionResourceFactory { + return func(availableResources *structs.ComparableResources, resourceNeeded *structs.ComparableResources) PreemptionResource { + available := availableResources.Flattened.Networks[0] + return &NetworkPreemptionResource{ + availableResources: available, + resourceNeeded: resourceNeeded.Flattened.Networks[0], + } + } +} + +// GetBasePreemptionResourceFactory returns a preemption resource factory for CPU/Memory/Disk +func GetBasePreemptionResourceFactory() PreemptionResourceFactory { + return func(availableResources *structs.ComparableResources, resourceNeeded *structs.ComparableResources) PreemptionResource { + return &BasePreemptionResource{ + availableResources: availableResources, + resourceNeeded: resourceNeeded, + } + } +} + // Preemptor is used to track existing allocations // and find suitable allocations to preempt type Preemptor struct { @@ -59,7 +135,7 @@ func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) { // SetPreemptions initializes a map tracking existing counts of preempted allocations // per job/task group. This is used while scoring preemption options func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) { - // Clear out existing values + // Clear out existing values since this can be called more than once for k := range p.currentPreemptions { delete(p.currentPreemptions, k) } @@ -88,10 +164,10 @@ func (p *Preemptor) getNumPreemptions(alloc *structs.Allocation) int { return numCurrentPreemptionsForJob } -// preemptForTaskGroup computes a list of allocations to preempt to accommodate +// PreemptForTaskGroup computes a list of allocations to preempt to accommodate // the resources asked for. Only allocs with a job priority < 10 of jobPriority are considered // This method is meant only for finding preemptible allocations based on CPU/Memory/Disk -func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources) []*structs.Allocation { +func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources) []*structs.Allocation { resourcesNeeded := resourceAsk.Comparable() // Subtract current allocations @@ -111,7 +187,7 @@ func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources) for len(allocGrp.allocs) > 0 && !allRequirementsMet { closestAllocIndex := -1 bestDistance := math.MaxFloat64 - // find the alloc with the closest distance + // Find the alloc with the closest distance for index, alloc := range allocGrp.allocs { currentPreemptionCount := p.getNumPreemptions(alloc) maxParallel := 0 @@ -143,7 +219,7 @@ func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources) allocGrp.allocs[closestAllocIndex] = allocGrp.allocs[len(allocGrp.allocs)-1] allocGrp.allocs = allocGrp.allocs[:len(allocGrp.allocs)-1] - // this is the remaining total of resources needed + // This is the remaining total of resources needed resourcesNeeded.Subtract(closestAlloc.ComparableResources()) } if allRequirementsMet { @@ -159,23 +235,17 @@ func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources) resourcesNeeded = resourceAsk.Comparable() // We do another pass to eliminate unnecessary preemptions // This filters out allocs whose resources are already covered by another alloc + basePreemptionResource := GetBasePreemptionResourceFactory() - // Sort bestAllocs by distance descending (without penalty) - sort.Slice(bestAllocs, func(i, j int) bool { - distance1 := basicResourceDistance(resourcesNeeded, bestAllocs[i].ComparableResources()) - distance2 := basicResourceDistance(resourcesNeeded, bestAllocs[j].ComparableResources()) - return distance1 > distance2 - }) - - filteredBestAllocs := filterSupersetTaskGroup(bestAllocs, p.nodeRemainingResources, resourcesNeeded) + filteredBestAllocs := filterSuperset(bestAllocs, p.nodeRemainingResources, resourcesNeeded, basePreemptionResource) return filteredBestAllocs } -// preemptForNetwork tries to find allocations to preempt to meet network resources. +// PreemptForNetwork tries to find allocations to preempt to meet network resources. // This is called once per task when assigning a network to the task. While finding allocations // to preempt, this only considers allocations that share the same network device -func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResource, netIdx *structs.NetworkIndex) []*structs.Allocation { +func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResource, netIdx *structs.NetworkIndex) []*structs.Allocation { // Early return if there are no current allocs if len(p.currentAllocs) == 0 { @@ -189,7 +259,7 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc // Create a map from each device to allocs // We do this because to place a task we have to be able to // preempt allocations that are using the same device. - // + // This step also filters out high priority allocations and allocations // that are not using any network resources for _, alloc := range p.currentAllocs { @@ -200,7 +270,9 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc if p.jobPriority-alloc.Job.Priority < 10 { continue } - networks := alloc.CompatibleNetworkResources() + networks := alloc.ComparableResources().Flattened.Networks + + // Only include if the alloc has a network device if len(networks) > 0 { device := networks[0].Device allocsForDevice := deviceToAllocs[device] @@ -221,6 +293,7 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc for device, currentAllocs := range deviceToAllocs { totalBandwidth := netIdx.AvailBandwidth[device] + // If the device doesn't have enough total available bandwidth, skip if totalBandwidth < MbitsNeeded { continue @@ -272,13 +345,12 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc for _, allocsGrp := range allocsByPriority { allocs := allocsGrp.allocs - // Sort by distance function that takes into account needed MBits - // as well as penalty for preempting an allocation - // whose task group already has existing preemptions + // Sort by distance function sort.Slice(allocs, func(i, j int) bool { return p.distanceComparatorForNetwork(allocs, networkResourceAsk, i, j) }) + // Iterate over allocs until end of if requirements have been met for _, alloc := range allocs { preemptedBandwidth += alloc.Resources.Networks[0].MBits allocsToPreempt = append(allocsToPreempt, alloc) @@ -287,12 +359,13 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc break } } - // If we met bandwidth needs we can break out of loop that's iterating by priority within a device + + // If we met bandwidth needs we can break out of iterating by priority within a device if met { break } } - // If we met bandwidth needs we can break out of loop that's iterating by allocs sharing the same network device + // If we met bandwidth needs we don't need to examine the next network device if met { break } @@ -315,32 +388,14 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc }, } - // Sort by distance reversed to surface any superset allocs first - // This sort only looks at mbits because we should still not prefer - // allocs that have a maxParallel penalty - sort.Slice(allocsToPreempt, func(i, j int) bool { - firstAlloc := allocsToPreempt[i] - secondAlloc := allocsToPreempt[j] - - firstAllocNetworks := firstAlloc.ComparableResources().Flattened.Networks - var firstAllocNetResourceUsed *structs.NetworkResource - if len(firstAllocNetworks) > 0 { - firstAllocNetResourceUsed = firstAllocNetworks[0] - } - - secondAllocNetworks := secondAlloc.ComparableResources().Flattened.Networks - var secondAllocNetResourceUsed *structs.NetworkResource - if len(secondAllocNetworks) > 0 { - secondAllocNetResourceUsed = secondAllocNetworks[0] - } - - distance1 := networkResourceDistance(firstAllocNetResourceUsed, networkResourceAsk) - distance2 := networkResourceDistance(secondAllocNetResourceUsed, networkResourceAsk) - return distance1 > distance2 - }) - // Do a final pass to eliminate any superset allocations - filteredBestAllocs := filterSupersetNetwork(allocsToPreempt, networkResourceAsk, nodeRemainingResources) + preemptionResourceFactory := GetNetworkPreemptionResourceFactory() + resourcesNeeded := &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Networks: []*structs.NetworkResource{networkResourceAsk}, + }, + } + filteredBestAllocs := filterSuperset(allocsToPreempt, nodeRemainingResources, resourcesNeeded, preemptionResourceFactory) return filteredBestAllocs } @@ -400,14 +455,6 @@ func scoreForNetwork(resourceUsed *structs.NetworkResource, resourceNeeded *stru return networkResourceDistance(resourceUsed, resourceNeeded) + maxParallelScorePenalty } -// meetsNetworkRequirements checks if the first resource meets or exceeds the second resource's network MBits requirements -func meetsNetworkRequirements(firstMbits int, secondMbits int) bool { - if firstMbits == 0 || secondMbits == 0 { - return false - } - return firstMbits >= secondMbits -} - // filterAndGroupPreemptibleAllocs groups allocations by priority after removing any from jobs of // a higher priority than jobPriority func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocation) []*groupedAllocs { @@ -447,40 +494,20 @@ func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocat return groupedSortedAllocs } -// filterSupersetTaskGroup is used as a final step to remove +// filterSuperset is used as a final step to remove // any allocations that meet a superset of requirements from // the set of allocations to preempt -func filterSupersetTaskGroup(bestAllocs []*structs.Allocation, +func filterSuperset(bestAllocs []*structs.Allocation, nodeRemainingResources *structs.ComparableResources, - resourceAsk *structs.ComparableResources) []*structs.Allocation { - - var preemptedResources *structs.ComparableResources - var filteredBestAllocs []*structs.Allocation + resourceAsk *structs.ComparableResources, + preemptionResourceFactory PreemptionResourceFactory) []*structs.Allocation { - // Do another pass to eliminate allocations that are a superset of other allocations - // in the preemption set - for _, alloc := range bestAllocs { - if preemptedResources == nil { - preemptedResources = alloc.ComparableResources().Copy() - } else { - preemptedResources.Add(alloc.ComparableResources().Copy()) - } - filteredBestAllocs = append(filteredBestAllocs, alloc) - availableResources := preemptedResources.Copy() - availableResources.Add(nodeRemainingResources) - - requirementsMet, _ := availableResources.Superset(resourceAsk) - if requirementsMet { - break - } - } - return filteredBestAllocs -} - -// filterSupersetNetwork is similar to filterSupersetTaskGroup but only -// considers network Mbits -func filterSupersetNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk *structs.NetworkResource, - nodeRemainingResources *structs.ComparableResources) []*structs.Allocation { + // Sort bestAllocs by distance descending (without penalty) + sort.Slice(bestAllocs, func(i, j int) bool { + distance1 := preemptionResourceFactory(bestAllocs[i].ComparableResources(), resourceAsk).Distance() + distance2 := preemptionResourceFactory(bestAllocs[j].ComparableResources(), resourceAsk).Distance() + return distance1 > distance2 + }) var preemptedResources *structs.ComparableResources var filteredBestAllocs []*structs.Allocation @@ -497,7 +524,8 @@ func filterSupersetNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk availableResources := preemptedResources.Copy() availableResources.Add(nodeRemainingResources) - requirementsMet := meetsNetworkRequirements(availableResources.Flattened.Networks[0].MBits, networkResourcesAsk.MBits) + premptionResource := preemptionResourceFactory(availableResources, resourceAsk) + requirementsMet := premptionResource.MeetsRequirements() if requirementsMet { break } @@ -506,7 +534,7 @@ func filterSupersetNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk } // distanceComparatorForNetwork is used as the sorting function when finding allocations to preempt. It uses -// both a coordinayte distance function based on Mbits needed, and a penalty if the allocation under consideration +// both a coordinate distance function based on Mbits needed, and a penalty if the allocation under consideration // belongs to a job that already has more preempted allocations func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, networkResourceAsk *structs.NetworkResource, i int, j int) bool { firstAlloc := allocs[i] @@ -518,7 +546,7 @@ func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, n maxParallel1 = tg1.Migrate.MaxParallel } // Dereference network usage on first alloc if its there - firstAllocNetworks := firstAlloc.CompatibleNetworkResources() + firstAllocNetworks := firstAlloc.ComparableResources().Flattened.Networks var firstAllocNetResourceUsed *structs.NetworkResource if len(firstAllocNetworks) > 0 { firstAllocNetResourceUsed = firstAllocNetworks[0] @@ -532,8 +560,8 @@ func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, n if tg2 != nil && tg2.Migrate != nil { maxParallel2 = tg2.Migrate.MaxParallel } - // Dereference network usage on first alloc if its there - secondAllocNetworks := secondAlloc.CompatibleNetworkResources() + // Dereference network usage on second alloc if its there + secondAllocNetworks := secondAlloc.ComparableResources().Flattened.Networks var secondAllocNetResourceUsed *structs.NetworkResource if len(secondAllocNetworks) > 0 { secondAllocNetResourceUsed = secondAllocNetworks[0] diff --git a/scheduler/rank.go b/scheduler/rank.go index 47cb8a89c8bf..402dd9c7081b 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -241,7 +241,7 @@ OUTER: // Look for preemptible allocations to satisfy the network resource for this task preemptor.SetCandidates(proposed) - netPreemptions := preemptor.preemptForNetwork(ask, netIdx) + netPreemptions := preemptor.PreemptForNetwork(ask, netIdx) if netPreemptions == nil { iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("unable to meet network resource %v after preemption", ask)) @@ -303,7 +303,7 @@ OUTER: // Initialize preemptor with candidate set preemptor.SetCandidates(current) - preemptedAllocs := preemptor.preemptForTaskGroup(total) + preemptedAllocs := preemptor.PreemptForTaskGroup(total) allocsToPreempt = append(allocsToPreempt, preemptedAllocs...) // If we were unable to find preempted allocs to meet these requirements