Skip to content

Commit

Permalink
core: Change the CPUPolicyUnit for dedicated pinning
Browse files Browse the repository at this point in the history
Before the introduction of the exclusively pinned CPUs,
the logic of CPUPolicyUnit allowed to check the filtering
constraints for each VM in the vm group individually. The
constraint was that the number of VM's CPUs had to be >=
host CPUs.

With the introduction of the exclusively pinned CPUs, that is
no more possible - if the group contained VMs with both, shared
and exclusively pinned CPUs, we need to calculate the CPU count
constraints for the whole group (similar to huge pages in
HugePagesFilterPolicyUnit).

Now the algorithm for calculating if the vm group fits into
the host is as follows:
1. Calculate the host CPU count
2. Calculate the currently exclusively pinned CPUs (including pending)
3. Calculate the sum of all dedicated CPUs of the vm group
4. Calculate how many shared CPUs are required to be left on the host
   as the maximum of required shared CPUs for vmGroup, pending VMs and running VMs.

The host can fit the VMs if:
hostCpuCount - takenCpus - addedExclusivelyPinnedCpus - maxSharedCpuCount >= 0

Note that the calculation of the previous values may differ based on
the cluster setting of "Count threads as cores".
  • Loading branch information
ljelinkova committed May 12, 2022
1 parent aa7bfe9 commit a42e0fa
Show file tree
Hide file tree
Showing 12 changed files with 286 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuLevelFilterPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuOverloadPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuPinningPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuTopologyPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.EmulatedMachineFilterPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.EvenDistributionBalancePolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.EvenDistributionCPUWeightPolicyUnit;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class InternalPolicyUnits {

mandatoryUnits.add(CompatibilityVersionFilterPolicyUnit.class);
mandatoryUnits.add(CpuLevelFilterPolicyUnit.class);
mandatoryUnits.add(CpuTopologyPolicyUnit.class);
mandatoryUnits.add(CpuPinningPolicyUnit.class);
mandatoryUnits.add(HostDeviceFilterPolicyUnit.class);
mandatoryUnits.add(PinToHostPolicyUnit.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ovirt.engine.core.bll.scheduling.pending;

import org.ovirt.engine.core.bll.scheduling.utils.VmSpecificPendingResourceEqualizer;
import org.ovirt.engine.core.common.businessentities.CpuPinningPolicy;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.compat.Guid;
Expand All @@ -10,15 +11,20 @@
* by not yet started VM on a specified host
*/
public class PendingCpuCores extends PendingResource {

private CpuPinningPolicy cpuPinningPolicy;

private int coreCount;

public PendingCpuCores(VDS host, VM vm, int coreCount) {
super(host, vm);
this.cpuPinningPolicy = vm.getCpuPinningPolicy();
this.coreCount = coreCount;
}

public PendingCpuCores(Guid host, VM vm, int coreCount) {
super(host, vm);
this.cpuPinningPolicy = vm.getCpuPinningPolicy();
this.coreCount = coreCount;
}

Expand All @@ -30,6 +36,10 @@ public void setCoreCount(int coreCount) {
this.coreCount = coreCount;
}

public CpuPinningPolicy getCpuPinningPolicy() {
return cpuPinningPolicy;
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,149 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;

import org.ovirt.engine.core.bll.scheduling.PolicyUnitImpl;
import org.ovirt.engine.core.bll.scheduling.SchedulingContext;
import org.ovirt.engine.core.bll.scheduling.SchedulingUnit;
import org.ovirt.engine.core.bll.scheduling.SlaValidator;
import org.ovirt.engine.core.bll.scheduling.pending.PendingCpuCores;
import org.ovirt.engine.core.bll.scheduling.pending.PendingCpuPinning;
import org.ovirt.engine.core.bll.scheduling.pending.PendingResourceManager;
import org.ovirt.engine.core.bll.scheduling.utils.VdsCpuUnitPinningHelper;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.common.businessentities.VdsCpuUnit;
import org.ovirt.engine.core.common.errors.EngineMessage;
import org.ovirt.engine.core.common.scheduling.PerHostMessages;
import org.ovirt.engine.core.common.scheduling.PolicyUnit;
import org.ovirt.engine.core.common.scheduling.PolicyUnitType;
import org.ovirt.engine.core.common.utils.VmCpuCountHelper;
import org.ovirt.engine.core.compat.Guid;
import org.ovirt.engine.core.vdsbroker.ResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SchedulingUnit(
guid = "6d636bf6-a35c-4f9d-b68d-0731f720cddc",
name = "CPU",
type = PolicyUnitType.FILTER,
description = "Filters out hosts with less CPUs than VM's CPUs"
)
description = "Filters out hosts with less CPUs than VM's CPUs")
public class CPUPolicyUnit extends PolicyUnitImpl {
private static final Logger log = LoggerFactory.getLogger(CPUPolicyUnit.class);

@Inject
private VdsCpuUnitPinningHelper vdsCpuUnitPinningHelper;

@Inject
private ResourceManager resourceManager;

public CPUPolicyUnit(PolicyUnit policyUnit,
PendingResourceManager pendingResourceManager) {
super(policyUnit, pendingResourceManager);
}

/**
* Filters out the hosts that do not have enough free CPUs to accommodate the shared and exclusively pinned CPUs
* required by the vmGroup.
*/
@Override
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, VM vm, PerHostMessages messages) {
List<VDS> list = new ArrayList<>();

for (VDS vds : hosts) {
if (VmCpuCountHelper.isResizeAndPinPolicy(vm) && !VmCpuCountHelper.isDynamicCpuTopologySet(vm)) {
if (vds.getCpuCores() / vds.getCpuSockets() > 1) {
list.add(vds);
} else {
messages.addMessage(vds.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES.toString());
log.debug("Host '{}' has only one core per socket. Resize and pin requires more than one core per socket", vds.getName());
}
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, List<VM> vmGroup, PerHostMessages messages) {
List<VDS> candidates = new ArrayList<>();

int maxVmGroupSharedCpuCount = countMaxVmGroupSharedCpuCount(vmGroup);

for (VDS host : hosts) {

if (host.getCpuSockets() == null || host.getCpuCores() == null || host.getCpuThreads() == null) {
log.warn("Unknown number of cores for host {}.", host.getName());
continue;
}
Integer cores = SlaValidator.getEffectiveCpuCores(vds, context.getCluster().getCountThreadsAsCores());
if (cores != null) {
int numOfCpus = VmCpuCountHelper.isDynamicCpuTopologySet(vm) ?
vm.getCurrentNumOfCpus(false) : vm.getNumOfCpus(false);
if (vm.getCpuPinningPolicy().isExclusive()) {
int futureCpus = context.getCluster().getCountThreadsAsCores() ? cores - vds.getVmsCoresCount() :
cores - (int) Math.ceil(vds.getVmsCoresCount() / (vds.getCpuThreads() / vds.getCpuCores()));
if (numOfCpus > futureCpus) {
messageNotEnoughCores(vds, cores, vm, messages);
continue;
}
} else {
int takenCpus = 0;
// takenCpus are CPUs (threads), we should consider switching it to cores when necessary.
takenCpus = context.getCluster().getCountThreadsAsCores() ?
vdsCpuUnitPinningHelper.getDedicatedCount(vds.getId()) :
vdsCpuUnitPinningHelper.countTakenCores(vds);
cores = cores - takenCpus;
if (numOfCpus > cores) {
messageNotEnoughCores(vds, cores, vm, messages);
continue;
}
}

// total number of host CPUs (either core or threads based on countThreadsAsCores
Integer hostCpuCount =
SlaValidator.getEffectiveCpuCores(host, context.getCluster().getCountThreadsAsCores());

// all CPUs (either core or threads based on countThreadsAsCores
// unavailable due to the exclusive pinning (counted for running VMs, pending, vmGroup)
int exclusiveCpus = countExclusiveCPUs(host, vmGroup, context.getCluster().getCountThreadsAsCores());

// the maximal number of CPU count for all VMs (running, pending, vmGroup)
// it determines how many shared CPUs needs to be available on the host
long maxSharedCpuCount = Math.max(countMaxRunningVmsSharedCpuCount(host), maxVmGroupSharedCpuCount);

if (hostCpuCount - exclusiveCpus - maxSharedCpuCount < 0) {
messageNotEnoughCores(host, messages);
continue;
}

list.add(vds);
candidates.add(host);
}
return list;
return candidates;
}

/**
* Counts how many CPUs are unavailable due to the exclusive pinning. The method
* takes a copy of the current CPU topology, applies pending resources and pins all of the exclusively
* pinned VMs. Then counts either the unavailable threads or the whole cores based on the countThreadsAsCores
* parameter.
* @param host Host candidate
* @param vmGroup VMs to be scheduled
* @param countThreadsAsCores If the threads should be counted as cores
* @return Number of CPUs that are exclusively pinned or blocked by an exclusive pinning
*/
private int countExclusiveCPUs(VDS host, List<VM> vmGroup, boolean countThreadsAsCores) {

Map<Guid, List<VdsCpuUnit>> vmToPendingExclusiveCpuPinnings =
PendingCpuPinning.collectForHost(getPendingResourceManager(), host.getId());

return vdsCpuUnitPinningHelper.countUnavailableCpus(
host,
vmGroup,
vmToPendingExclusiveCpuPinnings,
countThreadsAsCores);
}

/**
* Finds the VM with the maximum number of shared CPUs and returns the CPU count
* @param vmGroup VMs to be scheduled
* @return Max count of shared CPUs in the vm group
*/
private int countMaxVmGroupSharedCpuCount(List<VM> vmGroup) {
// the number of host's shared CPUs has to >= as the number of CPUs of any VM
int maxVmGroupSharedCpuCount = vmGroup.stream()
.filter(vm -> !vm.getCpuPinningPolicy().isExclusive())
.mapToInt(vm -> VmCpuCountHelper.getDynamicNumOfCpu(vm))
.max()
.orElse(0);
return maxVmGroupSharedCpuCount;
}

/**
* Checks running VMs and pending resources and finds the VM with the maximum number
* of shared CPU and returns the CPU count
* @param host Host candidate
* @return Max count of shared CPUs in running VMs and pending resources
*/
private long countMaxRunningVmsSharedCpuCount(VDS host) {

long maxPendingSharedCount = pendingResourceManager.pendingHostResources(host.getId(), PendingCpuCores.class)
.stream()
.filter(pending -> !pending.getCpuPinningPolicy().isExclusive())
.mapToLong(pending -> pending.getCoreCount())
.max()
.orElse(0);

long maxRunningSharedCpuCount = resourceManager.getVdsManager(host.getId()).getMinRequiredSharedCpusCount();

return Math.max(maxPendingSharedCount, maxRunningSharedCpuCount);
}

private void messageNotEnoughCores(VDS vds, int cores, VM vm, PerHostMessages messages) {
private void messageNotEnoughCores(VDS vds, PerHostMessages messages) {
messages.addMessage(vds.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES.toString());
log.debug("Host '{}' has less cores ({}) than vm cores ({})",
vds.getName(),
cores,
VmCpuCountHelper.getDynamicNumOfCpu(vm));
log.debug("Host '{}' has not enough available cores to schedule vms)",
vds.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.ovirt.engine.core.bll.scheduling.policyunits;

import java.util.ArrayList;
import java.util.List;

import org.ovirt.engine.core.bll.scheduling.PolicyUnitImpl;
import org.ovirt.engine.core.bll.scheduling.SchedulingContext;
import org.ovirt.engine.core.bll.scheduling.SchedulingUnit;
import org.ovirt.engine.core.bll.scheduling.pending.PendingResourceManager;
import org.ovirt.engine.core.common.businessentities.CpuPinningPolicy;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.common.errors.EngineMessage;
import org.ovirt.engine.core.common.scheduling.PerHostMessages;
import org.ovirt.engine.core.common.scheduling.PolicyUnit;
import org.ovirt.engine.core.common.scheduling.PolicyUnitType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SchedulingUnit(
guid = "35c2f1a5-8928-48e9-81ac-4c49eb49d60e",
name = "CPUTopology",
type = PolicyUnitType.FILTER,
description = "Runs VMs only on hosts with a proper CPU topology")
public class CpuTopologyPolicyUnit extends PolicyUnitImpl {

private static final Logger log = LoggerFactory.getLogger(CpuTopologyPolicyUnit.class);

public CpuTopologyPolicyUnit(PolicyUnit policyUnit, PendingResourceManager pendingResourceManager) {
super(policyUnit, pendingResourceManager);
}

@Override
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, VM vm, PerHostMessages messages) {
List<VDS> candidates = new ArrayList<>();

for (VDS host : hosts) {

if (host.getCpuSockets() == null || host.getCpuCores() == null || host.getCpuThreads() == null) {
log.warn("Unknown number of cores for host {}.", host.getName());
continue;
}

// when the VM uses Resize and PIN CPU pinning policy the host needs to have
// more then one core per socket
if (vm.getCpuPinningPolicy() == CpuPinningPolicy.RESIZE_AND_PIN_NUMA) {
int coresPerSocket = 0;

coresPerSocket = host.getCpuCores() / host.getCpuSockets();

if (coresPerSocket <= 1) {
messages.addMessage(host.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES_PER_SOCKET_FOR_RESIZE_AND_PIN.toString());
log.debug(
"Host '{}' has only one core per socket. Resize and pin requires more than one core per socket",
host.getName());
continue;
}
} else if (vm.getCpuPinningPolicy() == CpuPinningPolicy.DEDICATED) {
// the dedicated pinning requires that all vThreads of the same vCore are pinned
// to the same physical core. That is why the number of threads per core for the VM
// cannot exceed the number of threads per core on the host
int threadsPerCore = host.getCpuThreads() / host.getCpuCores();

if (threadsPerCore < vm.getThreadsPerCpu()) {
messages.addMessage(host.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_THREADS_PER_CORE_FOR_DEDICATED.toString());
log.debug(
"Host '{}' has only one core per socket. Resize and pin requires more than one core per socket",
host.getName());
continue;
}
}
candidates.add(host);
}
return candidates;
}
}
Loading

0 comments on commit a42e0fa

Please sign in to comment.