Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Change the CPUPolicyUnit for dedicated cpu pinning #299

Merged
merged 1 commit into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuLevelFilterPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuOverloadPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuPinningPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.CpuTopologyPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.EmulatedMachineFilterPolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.EvenDistributionBalancePolicyUnit;
import org.ovirt.engine.core.bll.scheduling.policyunits.EvenDistributionCPUWeightPolicyUnit;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class InternalPolicyUnits {

mandatoryUnits.add(CompatibilityVersionFilterPolicyUnit.class);
mandatoryUnits.add(CpuLevelFilterPolicyUnit.class);
mandatoryUnits.add(CpuTopologyPolicyUnit.class);
mandatoryUnits.add(CpuPinningPolicyUnit.class);
mandatoryUnits.add(HostDeviceFilterPolicyUnit.class);
mandatoryUnits.add(PinToHostPolicyUnit.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ovirt.engine.core.bll.scheduling.pending;

import org.ovirt.engine.core.bll.scheduling.utils.VmSpecificPendingResourceEqualizer;
import org.ovirt.engine.core.common.businessentities.CpuPinningPolicy;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.compat.Guid;
Expand All @@ -10,15 +11,20 @@
* by not yet started VM on a specified host
*/
public class PendingCpuCores extends PendingResource {

private CpuPinningPolicy cpuPinningPolicy;

private int coreCount;

public PendingCpuCores(VDS host, VM vm, int coreCount) {
super(host, vm);
this.cpuPinningPolicy = vm.getCpuPinningPolicy();
this.coreCount = coreCount;
}

public PendingCpuCores(Guid host, VM vm, int coreCount) {
super(host, vm);
this.cpuPinningPolicy = vm.getCpuPinningPolicy();
this.coreCount = coreCount;
}

Expand All @@ -30,6 +36,10 @@ public void setCoreCount(int coreCount) {
this.coreCount = coreCount;
}

public CpuPinningPolicy getCpuPinningPolicy() {
return cpuPinningPolicy;
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,171 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;

import org.ovirt.engine.core.bll.scheduling.PolicyUnitImpl;
import org.ovirt.engine.core.bll.scheduling.SchedulingContext;
import org.ovirt.engine.core.bll.scheduling.SchedulingUnit;
import org.ovirt.engine.core.bll.scheduling.SlaValidator;
import org.ovirt.engine.core.bll.scheduling.pending.PendingCpuCores;
import org.ovirt.engine.core.bll.scheduling.pending.PendingCpuPinning;
import org.ovirt.engine.core.bll.scheduling.pending.PendingResourceManager;
import org.ovirt.engine.core.bll.scheduling.utils.VdsCpuUnitPinningHelper;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.common.businessentities.VdsCpuUnit;
import org.ovirt.engine.core.common.errors.EngineMessage;
import org.ovirt.engine.core.common.scheduling.PerHostMessages;
import org.ovirt.engine.core.common.scheduling.PolicyUnit;
import org.ovirt.engine.core.common.scheduling.PolicyUnitType;
import org.ovirt.engine.core.common.utils.VmCpuCountHelper;
import org.ovirt.engine.core.compat.Guid;
import org.ovirt.engine.core.vdsbroker.ResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SchedulingUnit(
guid = "6d636bf6-a35c-4f9d-b68d-0731f720cddc",
name = "CPU",
type = PolicyUnitType.FILTER,
description = "Filters out hosts with less CPUs than VM's CPUs"
)
description = "Filters out hosts with less CPUs than VM's CPUs")
public class CPUPolicyUnit extends PolicyUnitImpl {
private static final Logger log = LoggerFactory.getLogger(CPUPolicyUnit.class);

@Inject
private VdsCpuUnitPinningHelper vdsCpuUnitPinningHelper;

@Inject
private ResourceManager resourceManager;

public CPUPolicyUnit(PolicyUnit policyUnit,
PendingResourceManager pendingResourceManager) {
super(policyUnit, pendingResourceManager);
}

/**
* Filters out the hosts that do not have enough free CPUs to accommodate the shared and exclusively pinned CPUs
* required by the vmGroup.
*/
@Override
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, VM vm, PerHostMessages messages) {
List<VDS> list = new ArrayList<>();

for (VDS vds : hosts) {
if (VmCpuCountHelper.isResizeAndPinPolicy(vm) && !VmCpuCountHelper.isDynamicCpuTopologySet(vm)) {
if (vds.getCpuCores() / vds.getCpuSockets() > 1) {
list.add(vds);
} else {
messages.addMessage(vds.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES.toString());
log.debug("Host '{}' has only one core per socket. Resize and pin requires more than one core per socket", vds.getName());
}
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, List<VM> vmGroup, PerHostMessages messages) {
List<VDS> candidates = new ArrayList<>();

int maxVmGroupSharedCpuCount = countMaxVmGroupSharedCpuCount(vmGroup);

for (VDS host : hosts) {

if (host.getCpuSockets() == null || host.getCpuCores() == null || host.getCpuThreads() == null) {
log.warn("Unknown number of cores for host {}.", host.getName());
continue;
}
Integer cores = SlaValidator.getEffectiveCpuCores(vds, context.getCluster().getCountThreadsAsCores());
if (cores != null) {
int numOfCpus = VmCpuCountHelper.isDynamicCpuTopologySet(vm) ?
vm.getCurrentNumOfCpus(false) : vm.getNumOfCpus(false);
if (vm.getCpuPinningPolicy().isExclusive()) {
int futureCpus = context.getCluster().getCountThreadsAsCores() ? cores - vds.getVmsCoresCount() :
cores - (int) Math.ceil(vds.getVmsCoresCount() / (vds.getCpuThreads() / vds.getCpuCores()));
if (numOfCpus > futureCpus) {
messageNotEnoughCores(vds, cores, vm, messages);
continue;
}
} else {
int takenCpus = 0;
// takenCpus are CPUs (threads), we should consider switching it to cores when necessary.
takenCpus = context.getCluster().getCountThreadsAsCores() ?
vdsCpuUnitPinningHelper.getDedicatedCount(vds.getId()) :
vdsCpuUnitPinningHelper.countTakenCores(vds);
cores = cores - takenCpus;
if (numOfCpus > cores) {
messageNotEnoughCores(vds, cores, vm, messages);
continue;
}
}

// total number of host CPUs (either core or threads based on countThreadsAsCores)
Integer hostCpuCount =
SlaValidator.getEffectiveCpuCores(host, context.getCluster().getCountThreadsAsCores());

ahadas marked this conversation as resolved.
Show resolved Hide resolved
// all CPUs (either core or threads based on countThreadsAsCores)
// unavailable due to the exclusive pinning (counted for running VMs, pending, vmGroup)
int exclusiveCpus = countExclusiveCPUs(host, vmGroup, context.getCluster().getCountThreadsAsCores());

// the maximal number of CPU count for all VMs (running, pending, vmGroup)
// it determines how many shared CPUs needs to be available on the host
long maxSharedCpuCount = Math.max(countMaxRunningVmsSharedCpuCount(host), maxVmGroupSharedCpuCount);

if (hostCpuCount - exclusiveCpus - maxSharedCpuCount < 0) {
messageNotEnoughCores(host, messages);
continue;
}

list.add(vds);
candidates.add(host);
}
return candidates;
}

/**
* Counts how many CPUs will be unavailable due to the exclusive pinning. The method
* takes a copy of the current CPU topology, applies pending resources and pins all of the exclusively
* pinned VMs. Then counts either the unavailable threads or the whole cores based on the countThreadsAsCores
* parameter.
* @param host Host candidate
* @param vmGroup VMs to be scheduled
* @param countThreadsAsCores If the threads should be counted as cores
* @return Number of CPUs that are exclusively pinned or blocked by an exclusive pinning or Integer.MAX_VALUE
* if the pinning of the exclusive CPUs is not possible (shared pinnings should be checked in {@link CpuPinningPolicyUnit}
*/
private int countExclusiveCPUs(VDS host, List<VM> vmGroup, boolean countThreadsAsCores) {

List<VdsCpuUnit> cpuTopology = getEffectiveCpuTopology(host);

if (cpuTopology.isEmpty()) {
return 0;
}
return list;

for (VM vm : vmGroup) {
if (!host.getId().equals(vm.getRunOnVds())) {
List<VdsCpuUnit> allocatedCpus =
vdsCpuUnitPinningHelper.updatePhysicalCpuAllocations(vm, cpuTopology, host.getId());
if (vm.getCpuPinningPolicy().isExclusive() && (allocatedCpus == null || allocatedCpus.isEmpty())) {
return Integer.MAX_VALUE;
}
}
}

return vdsCpuUnitPinningHelper.countUnavailableCpus(
cpuTopology,
countThreadsAsCores);
}

private List<VdsCpuUnit> getEffectiveCpuTopology(VDS host) {
List<VdsCpuUnit> cpuTopology = resourceManager.getVdsManager(host.getId()).getCpuTopology();

Map<Guid, List<VdsCpuUnit>> vmToPendingExclusiveCpuPinnings =
PendingCpuPinning.collectForHost(getPendingResourceManager(), host.getId());

vdsCpuUnitPinningHelper.previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingExclusiveCpuPinnings);
return cpuTopology;
}

/**
* Finds the VM with the maximum number of shared CPUs and returns the CPU count
* @param vmGroup VMs to be scheduled
* @return Max count of shared CPUs in the vm group
*/
private int countMaxVmGroupSharedCpuCount(List<VM> vmGroup) {
// the number of host's shared CPUs has to >= as the number of CPUs of any VM
int maxVmGroupSharedCpuCount = vmGroup.stream()
.filter(vm -> !vm.getCpuPinningPolicy().isExclusive())
.mapToInt(vm -> VmCpuCountHelper.getDynamicNumOfCpu(vm))
.max()
.orElse(0);
return maxVmGroupSharedCpuCount;
}

/**
* Checks running VMs and pending resources and finds the VM with the maximum number
* of shared CPU and returns the CPU count
* @param host Host candidate
* @return Max count of shared CPUs in running VMs and pending resources
*/
private long countMaxRunningVmsSharedCpuCount(VDS host) {

long maxPendingSharedCount = pendingResourceManager.pendingHostResources(host.getId(), PendingCpuCores.class)
.stream()
.filter(pending -> !pending.getCpuPinningPolicy().isExclusive())
.mapToLong(pending -> pending.getCoreCount())
.max()
.orElse(0);

long maxRunningSharedCpuCount = resourceManager.getVdsManager(host.getId()).getMinRequiredSharedCpusCount();

return Math.max(maxPendingSharedCount, maxRunningSharedCpuCount);
}

private void messageNotEnoughCores(VDS vds, int cores, VM vm, PerHostMessages messages) {
private void messageNotEnoughCores(VDS vds, PerHostMessages messages) {
messages.addMessage(vds.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES.toString());
log.debug("Host '{}' has less cores ({}) than vm cores ({})",
vds.getName(),
cores,
VmCpuCountHelper.getDynamicNumOfCpu(vm));
log.debug("Host '{}' has not enough available cores to schedule vms)",
vds.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.ovirt.engine.core.bll.scheduling.policyunits;

import java.util.ArrayList;
import java.util.List;

import org.ovirt.engine.core.bll.scheduling.PolicyUnitImpl;
import org.ovirt.engine.core.bll.scheduling.SchedulingContext;
import org.ovirt.engine.core.bll.scheduling.SchedulingUnit;
import org.ovirt.engine.core.bll.scheduling.pending.PendingResourceManager;
import org.ovirt.engine.core.common.businessentities.CpuPinningPolicy;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.common.errors.EngineMessage;
import org.ovirt.engine.core.common.scheduling.PerHostMessages;
import org.ovirt.engine.core.common.scheduling.PolicyUnit;
import org.ovirt.engine.core.common.scheduling.PolicyUnitType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SchedulingUnit(
guid = "35c2f1a5-8928-48e9-81ac-4c49eb49d60e",
name = "CPUTopology",
type = PolicyUnitType.FILTER,
description = "Runs VMs only on hosts with a proper CPU topology")
public class CpuTopologyPolicyUnit extends PolicyUnitImpl {

private static final Logger log = LoggerFactory.getLogger(CpuTopologyPolicyUnit.class);

public CpuTopologyPolicyUnit(PolicyUnit policyUnit, PendingResourceManager pendingResourceManager) {
super(policyUnit, pendingResourceManager);
}

@Override
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, VM vm, PerHostMessages messages) {
List<VDS> candidates = new ArrayList<>();

for (VDS host : hosts) {

if (host.getCpuSockets() == null || host.getCpuCores() == null || host.getCpuThreads() == null) {
log.warn("Unknown number of cores for host {}.", host.getName());
continue;
}

// when the VM uses Resize and PIN CPU pinning policy the host needs to have
// more than one core per socket
if (vm.getCpuPinningPolicy() == CpuPinningPolicy.RESIZE_AND_PIN_NUMA) {
int coresPerSocket = host.getCpuCores() / host.getCpuSockets();

if (coresPerSocket <= 1) {
messages.addMessage(host.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES_PER_SOCKET_FOR_RESIZE_AND_PIN.toString());
log.debug(
"Host '{}' has only one core per socket. Resize and pin requires more than one core per socket",
host.getName());
continue;
}
} else if (vm.getCpuPinningPolicy() == CpuPinningPolicy.DEDICATED) {
// the dedicated pinning requires that all vThreads of the same vCore are pinned
// to the same physical core. That is why the number of threads per core for the VM
// cannot exceed the number of threads per core on the host
ahadas marked this conversation as resolved.
Show resolved Hide resolved
int threadsPerCore = host.getCpuThreads() / host.getCpuCores();

if (threadsPerCore < vm.getThreadsPerCpu()) {
messages.addMessage(host.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_THREADS_PER_CORE_FOR_DEDICATED.toString());
log.debug(
"Host '{}' has not enough CPU threads per core to run the VM with Dedicated CPU pinning policy",
host.getName());
continue;
}
}
candidates.add(host);
}
return candidates;
}
}
Loading