Skip to content

Commit

Permalink
Introduce interface with multiple implementations for resource distance
Browse files Browse the repository at this point in the history
  • Loading branch information
preetapan committed Oct 30, 2018
1 parent 8321d37 commit 21a9b8b
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 92 deletions.
1 change: 1 addition & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7388,6 +7388,7 @@ func (a *Allocation) ComparableResources() *ComparableResources {
Memory: AllocatedMemoryResources{
MemoryMB: int64(resources.MemoryMB),
},
Networks: resources.Networks,
},
Shared: AllocatedSharedResources{
DiskMB: int64(resources.DiskMB),
Expand Down
208 changes: 118 additions & 90 deletions scheduler/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,82 @@ type groupedAllocs struct {
allocs []*structs.Allocation
}

// PreemptionResource interface is implemented by different
// types of resources.
type PreemptionResource interface {
// MeetsRequirements returns true if the available resources match needed resources
MeetsRequirements() bool

// Distance returns values in the range [0, MaxFloat], lower is better
Distance() float64
}

// NetworkPreemptionResource implements PreemptionResource for network assignments
// It only looks at MBits needed
type NetworkPreemptionResource struct {
availableResources *structs.NetworkResource
resourceNeeded *structs.NetworkResource
}

func (n *NetworkPreemptionResource) MeetsRequirements() bool {
mbitsAvailable := n.availableResources.MBits
mbitsNeeded := n.resourceNeeded.MBits
if mbitsAvailable == 0 || mbitsNeeded == 0 {
return false
}
return mbitsAvailable >= mbitsNeeded
}

func (n *NetworkPreemptionResource) Distance() float64 {
networkCoord := math.MaxFloat64
if n.availableResources != nil && n.resourceNeeded != nil {
networkCoord = float64(n.resourceNeeded.MBits-n.availableResources.MBits) / float64(n.resourceNeeded.MBits)
}

originDist := math.Sqrt(
math.Pow(networkCoord, 2))
return originDist
}

// BasePreemptionResource implements PreemptionResource for CPU/Memory/Disk
type BasePreemptionResource struct {
availableResources *structs.ComparableResources
resourceNeeded *structs.ComparableResources
}

func (b *BasePreemptionResource) MeetsRequirements() bool {
super, _ := b.availableResources.Superset(b.resourceNeeded)
return super
}

func (b *BasePreemptionResource) Distance() float64 {
return basicResourceDistance(b.resourceNeeded, b.availableResources)
}

// PreemptionResourceFactory returns a new PreemptionResource
type PreemptionResourceFactory func(availableResources *structs.ComparableResources, resourceAsk *structs.ComparableResources) PreemptionResource

// GetNetworkPreemptionResourceFactory returns a preemption resource factory for network assignments
func GetNetworkPreemptionResourceFactory() PreemptionResourceFactory {
return func(availableResources *structs.ComparableResources, resourceNeeded *structs.ComparableResources) PreemptionResource {
available := availableResources.Flattened.Networks[0]
return &NetworkPreemptionResource{
availableResources: available,
resourceNeeded: resourceNeeded.Flattened.Networks[0],
}
}
}

// GetBasePreemptionResourceFactory returns a preemption resource factory for CPU/Memory/Disk
func GetBasePreemptionResourceFactory() PreemptionResourceFactory {
return func(availableResources *structs.ComparableResources, resourceNeeded *structs.ComparableResources) PreemptionResource {
return &BasePreemptionResource{
availableResources: availableResources,
resourceNeeded: resourceNeeded,
}
}
}

// Preemptor is used to track existing allocations
// and find suitable allocations to preempt
type Preemptor struct {
Expand Down Expand Up @@ -59,7 +135,7 @@ func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) {
// SetPreemptions initializes a map tracking existing counts of preempted allocations
// per job/task group. This is used while scoring preemption options
func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) {
// Clear out existing values
// Clear out existing values since this can be called more than once
for k := range p.currentPreemptions {
delete(p.currentPreemptions, k)
}
Expand Down Expand Up @@ -88,10 +164,10 @@ func (p *Preemptor) getNumPreemptions(alloc *structs.Allocation) int {
return numCurrentPreemptionsForJob
}

// preemptForTaskGroup computes a list of allocations to preempt to accommodate
// PreemptForTaskGroup computes a list of allocations to preempt to accommodate
// the resources asked for. Only allocs with a job priority < 10 of jobPriority are considered
// This method is meant only for finding preemptible allocations based on CPU/Memory/Disk
func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources) []*structs.Allocation {
func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources) []*structs.Allocation {
resourcesNeeded := resourceAsk.Comparable()

// Subtract current allocations
Expand All @@ -111,7 +187,7 @@ func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources)
for len(allocGrp.allocs) > 0 && !allRequirementsMet {
closestAllocIndex := -1
bestDistance := math.MaxFloat64
// find the alloc with the closest distance
// Find the alloc with the closest distance
for index, alloc := range allocGrp.allocs {
currentPreemptionCount := p.getNumPreemptions(alloc)
maxParallel := 0
Expand Down Expand Up @@ -143,7 +219,7 @@ func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources)
allocGrp.allocs[closestAllocIndex] = allocGrp.allocs[len(allocGrp.allocs)-1]
allocGrp.allocs = allocGrp.allocs[:len(allocGrp.allocs)-1]

// this is the remaining total of resources needed
// This is the remaining total of resources needed
resourcesNeeded.Subtract(closestAlloc.ComparableResources())
}
if allRequirementsMet {
Expand All @@ -159,23 +235,17 @@ func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources)
resourcesNeeded = resourceAsk.Comparable()
// We do another pass to eliminate unnecessary preemptions
// This filters out allocs whose resources are already covered by another alloc
basePreemptionResource := GetBasePreemptionResourceFactory()

// Sort bestAllocs by distance descending (without penalty)
sort.Slice(bestAllocs, func(i, j int) bool {
distance1 := basicResourceDistance(resourcesNeeded, bestAllocs[i].ComparableResources())
distance2 := basicResourceDistance(resourcesNeeded, bestAllocs[j].ComparableResources())
return distance1 > distance2
})

filteredBestAllocs := filterSupersetTaskGroup(bestAllocs, p.nodeRemainingResources, resourcesNeeded)
filteredBestAllocs := filterSuperset(bestAllocs, p.nodeRemainingResources, resourcesNeeded, basePreemptionResource)
return filteredBestAllocs

}

// preemptForNetwork tries to find allocations to preempt to meet network resources.
// PreemptForNetwork tries to find allocations to preempt to meet network resources.
// This is called once per task when assigning a network to the task. While finding allocations
// to preempt, this only considers allocations that share the same network device
func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResource, netIdx *structs.NetworkIndex) []*structs.Allocation {
func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResource, netIdx *structs.NetworkIndex) []*structs.Allocation {

// Early return if there are no current allocs
if len(p.currentAllocs) == 0 {
Expand All @@ -189,7 +259,7 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc
// Create a map from each device to allocs
// We do this because to place a task we have to be able to
// preempt allocations that are using the same device.
//

// This step also filters out high priority allocations and allocations
// that are not using any network resources
for _, alloc := range p.currentAllocs {
Expand All @@ -200,7 +270,9 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc
if p.jobPriority-alloc.Job.Priority < 10 {
continue
}
networks := alloc.CompatibleNetworkResources()
networks := alloc.ComparableResources().Flattened.Networks

// Only include if the alloc has a network device
if len(networks) > 0 {
device := networks[0].Device
allocsForDevice := deviceToAllocs[device]
Expand All @@ -221,6 +293,7 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc

for device, currentAllocs := range deviceToAllocs {
totalBandwidth := netIdx.AvailBandwidth[device]

// If the device doesn't have enough total available bandwidth, skip
if totalBandwidth < MbitsNeeded {
continue
Expand Down Expand Up @@ -272,13 +345,12 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc
for _, allocsGrp := range allocsByPriority {
allocs := allocsGrp.allocs

// Sort by distance function that takes into account needed MBits
// as well as penalty for preempting an allocation
// whose task group already has existing preemptions
// Sort by distance function
sort.Slice(allocs, func(i, j int) bool {
return p.distanceComparatorForNetwork(allocs, networkResourceAsk, i, j)
})

// Iterate over allocs until end of if requirements have been met
for _, alloc := range allocs {
preemptedBandwidth += alloc.Resources.Networks[0].MBits
allocsToPreempt = append(allocsToPreempt, alloc)
Expand All @@ -287,12 +359,13 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc
break
}
}
// If we met bandwidth needs we can break out of loop that's iterating by priority within a device

// If we met bandwidth needs we can break out of iterating by priority within a device
if met {
break
}
}
// If we met bandwidth needs we can break out of loop that's iterating by allocs sharing the same network device
// If we met bandwidth needs we don't need to examine the next network device
if met {
break
}
Expand All @@ -315,32 +388,14 @@ func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResourc
},
}

// Sort by distance reversed to surface any superset allocs first
// This sort only looks at mbits because we should still not prefer
// allocs that have a maxParallel penalty
sort.Slice(allocsToPreempt, func(i, j int) bool {
firstAlloc := allocsToPreempt[i]
secondAlloc := allocsToPreempt[j]

firstAllocNetworks := firstAlloc.ComparableResources().Flattened.Networks
var firstAllocNetResourceUsed *structs.NetworkResource
if len(firstAllocNetworks) > 0 {
firstAllocNetResourceUsed = firstAllocNetworks[0]
}

secondAllocNetworks := secondAlloc.ComparableResources().Flattened.Networks
var secondAllocNetResourceUsed *structs.NetworkResource
if len(secondAllocNetworks) > 0 {
secondAllocNetResourceUsed = secondAllocNetworks[0]
}

distance1 := networkResourceDistance(firstAllocNetResourceUsed, networkResourceAsk)
distance2 := networkResourceDistance(secondAllocNetResourceUsed, networkResourceAsk)
return distance1 > distance2
})

// Do a final pass to eliminate any superset allocations
filteredBestAllocs := filterSupersetNetwork(allocsToPreempt, networkResourceAsk, nodeRemainingResources)
preemptionResourceFactory := GetNetworkPreemptionResourceFactory()
resourcesNeeded := &structs.ComparableResources{
Flattened: structs.AllocatedTaskResources{
Networks: []*structs.NetworkResource{networkResourceAsk},
},
}
filteredBestAllocs := filterSuperset(allocsToPreempt, nodeRemainingResources, resourcesNeeded, preemptionResourceFactory)
return filteredBestAllocs
}

Expand Down Expand Up @@ -400,14 +455,6 @@ func scoreForNetwork(resourceUsed *structs.NetworkResource, resourceNeeded *stru
return networkResourceDistance(resourceUsed, resourceNeeded) + maxParallelScorePenalty
}

// meetsNetworkRequirements checks if the first resource meets or exceeds the second resource's network MBits requirements
func meetsNetworkRequirements(firstMbits int, secondMbits int) bool {
if firstMbits == 0 || secondMbits == 0 {
return false
}
return firstMbits >= secondMbits
}

// filterAndGroupPreemptibleAllocs groups allocations by priority after removing any from jobs of
// a higher priority than jobPriority
func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocation) []*groupedAllocs {
Expand Down Expand Up @@ -447,40 +494,20 @@ func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocat
return groupedSortedAllocs
}

// filterSupersetTaskGroup is used as a final step to remove
// filterSuperset is used as a final step to remove
// any allocations that meet a superset of requirements from
// the set of allocations to preempt
func filterSupersetTaskGroup(bestAllocs []*structs.Allocation,
func filterSuperset(bestAllocs []*structs.Allocation,
nodeRemainingResources *structs.ComparableResources,
resourceAsk *structs.ComparableResources) []*structs.Allocation {

var preemptedResources *structs.ComparableResources
var filteredBestAllocs []*structs.Allocation
resourceAsk *structs.ComparableResources,
preemptionResourceFactory PreemptionResourceFactory) []*structs.Allocation {

// Do another pass to eliminate allocations that are a superset of other allocations
// in the preemption set
for _, alloc := range bestAllocs {
if preemptedResources == nil {
preemptedResources = alloc.ComparableResources().Copy()
} else {
preemptedResources.Add(alloc.ComparableResources().Copy())
}
filteredBestAllocs = append(filteredBestAllocs, alloc)
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)

requirementsMet, _ := availableResources.Superset(resourceAsk)
if requirementsMet {
break
}
}
return filteredBestAllocs
}

// filterSupersetNetwork is similar to filterSupersetTaskGroup but only
// considers network Mbits
func filterSupersetNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk *structs.NetworkResource,
nodeRemainingResources *structs.ComparableResources) []*structs.Allocation {
// Sort bestAllocs by distance descending (without penalty)
sort.Slice(bestAllocs, func(i, j int) bool {
distance1 := preemptionResourceFactory(bestAllocs[i].ComparableResources(), resourceAsk).Distance()
distance2 := preemptionResourceFactory(bestAllocs[j].ComparableResources(), resourceAsk).Distance()
return distance1 > distance2
})

var preemptedResources *structs.ComparableResources
var filteredBestAllocs []*structs.Allocation
Expand All @@ -497,7 +524,8 @@ func filterSupersetNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)

requirementsMet := meetsNetworkRequirements(availableResources.Flattened.Networks[0].MBits, networkResourcesAsk.MBits)
premptionResource := preemptionResourceFactory(availableResources, resourceAsk)
requirementsMet := premptionResource.MeetsRequirements()
if requirementsMet {
break
}
Expand All @@ -506,7 +534,7 @@ func filterSupersetNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk
}

// distanceComparatorForNetwork is used as the sorting function when finding allocations to preempt. It uses
// both a coordinayte distance function based on Mbits needed, and a penalty if the allocation under consideration
// both a coordinate distance function based on Mbits needed, and a penalty if the allocation under consideration
// belongs to a job that already has more preempted allocations
func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, networkResourceAsk *structs.NetworkResource, i int, j int) bool {
firstAlloc := allocs[i]
Expand All @@ -518,7 +546,7 @@ func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, n
maxParallel1 = tg1.Migrate.MaxParallel
}
// Dereference network usage on first alloc if its there
firstAllocNetworks := firstAlloc.CompatibleNetworkResources()
firstAllocNetworks := firstAlloc.ComparableResources().Flattened.Networks
var firstAllocNetResourceUsed *structs.NetworkResource
if len(firstAllocNetworks) > 0 {
firstAllocNetResourceUsed = firstAllocNetworks[0]
Expand All @@ -532,8 +560,8 @@ func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, n
if tg2 != nil && tg2.Migrate != nil {
maxParallel2 = tg2.Migrate.MaxParallel
}
// Dereference network usage on first alloc if its there
secondAllocNetworks := secondAlloc.CompatibleNetworkResources()
// Dereference network usage on second alloc if its there
secondAllocNetworks := secondAlloc.ComparableResources().Flattened.Networks
var secondAllocNetResourceUsed *structs.NetworkResource
if len(secondAllocNetworks) > 0 {
secondAllocNetResourceUsed = secondAllocNetworks[0]
Expand Down
4 changes: 2 additions & 2 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ OUTER:
// Look for preemptible allocations to satisfy the network resource for this task
preemptor.SetCandidates(proposed)

netPreemptions := preemptor.preemptForNetwork(ask, netIdx)
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
if netPreemptions == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("unable to meet network resource %v after preemption", ask))
Expand Down Expand Up @@ -303,7 +303,7 @@ OUTER:
// Initialize preemptor with candidate set
preemptor.SetCandidates(current)

preemptedAllocs := preemptor.preemptForTaskGroup(total)
preemptedAllocs := preemptor.PreemptForTaskGroup(total)
allocsToPreempt = append(allocsToPreempt, preemptedAllocs...)

// If we were unable to find preempted allocs to meet these requirements
Expand Down

0 comments on commit 21a9b8b

Please sign in to comment.