Skip to content

Commit

Permalink
core: Change the CPUPolicyUnit for dedicated pinning
Browse files Browse the repository at this point in the history
Before the introduction of the exclusively pinned CPUs,
the logic of CPUPolicyUnit allowed to check the filtering
constraints for each VM in the vm group individually. The
constraint was that the number of VM's CPUs had to be >=
host CPUs.

With the introduction of the exclusively pinned CPUs, that is
no more possible - if the group contained VMs with both, shared
and exclusively pinned CPUs, we need to calculate the CPU count
constraints for the whole group (similar to huge pages in
HugePagesFilterPolicyUnit).

Now the algorithm for calculating if the vm group fits into
the host is as follows:
1. Calculate the host CPU count
2. Calculate the currently exclusively pinned CPUs (including pending)
3. Calculate the sum of all dedicated CPUs of the vm group
4. Calculate how many shared CPUs are required to be left on the host
   as the maximum of required shared CPUs for vmGroup, pending VMs and running VMs.

The host can fit the VMs if:
hostCpuCount - takenCpus - addedExclusivelyPinnedCpus - maxSharedCpuCount >= 0

Note that the calculation of the previous values may differ based on
the cluster setting of "Count threads as cores".
  • Loading branch information
ljelinkova committed Apr 27, 2022
1 parent 20f4a88 commit d0705be
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ovirt.engine.core.bll.scheduling.pending;

import org.ovirt.engine.core.bll.scheduling.utils.VmSpecificPendingResourceEqualizer;
import org.ovirt.engine.core.common.businessentities.CpuPinningPolicy;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.compat.Guid;
Expand All @@ -10,15 +11,20 @@
* by not yet started VM on a specified host
*/
public class PendingCpuCores extends PendingResource {

private CpuPinningPolicy cpuPinningPolicy;

private int coreCount;

public PendingCpuCores(VDS host, VM vm, int coreCount) {
super(host, vm);
this.cpuPinningPolicy = vm.getCpuPinningPolicy();
this.coreCount = coreCount;
}

public PendingCpuCores(Guid host, VM vm, int coreCount) {
super(host, vm);
this.cpuPinningPolicy = vm.getCpuPinningPolicy();
this.coreCount = coreCount;
}

Expand All @@ -30,6 +36,10 @@ public void setCoreCount(int coreCount) {
this.coreCount = coreCount;
}

public CpuPinningPolicy getCpuPinningPolicy() {
return cpuPinningPolicy;
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.LongStream;

import javax.inject.Inject;

import org.ovirt.engine.core.bll.scheduling.PolicyUnitImpl;
import org.ovirt.engine.core.bll.scheduling.SchedulingContext;
import org.ovirt.engine.core.bll.scheduling.SchedulingUnit;
import org.ovirt.engine.core.bll.scheduling.SlaValidator;
import org.ovirt.engine.core.bll.scheduling.pending.PendingCpuCores;
import org.ovirt.engine.core.bll.scheduling.pending.PendingCpuPinning;
import org.ovirt.engine.core.bll.scheduling.pending.PendingResourceManager;
import org.ovirt.engine.core.bll.scheduling.utils.VdsCpuUnitPinningHelper;
import org.ovirt.engine.core.common.businessentities.CpuPinningPolicy;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.common.businessentities.VdsCpuUnit;
import org.ovirt.engine.core.common.errors.EngineMessage;
import org.ovirt.engine.core.common.scheduling.PerHostMessages;
import org.ovirt.engine.core.common.scheduling.PolicyUnit;
import org.ovirt.engine.core.common.scheduling.PolicyUnitType;
import org.ovirt.engine.core.common.utils.VmCpuCountHelper;
import org.ovirt.engine.core.compat.Guid;
import org.ovirt.engine.core.vdsbroker.ResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,61 +40,103 @@ public class CPUPolicyUnit extends PolicyUnitImpl {
@Inject
private VdsCpuUnitPinningHelper vdsCpuUnitPinningHelper;

@Inject
private ResourceManager resourceManager;

public CPUPolicyUnit(PolicyUnit policyUnit,
PendingResourceManager pendingResourceManager) {
super(policyUnit, pendingResourceManager);
}

/**
* Filters out the hosts that do not have enough free CPUs to accommodate the shared and exclusively
* pinned CPUs required by the vmGroup.
*/
@Override
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, VM vm, PerHostMessages messages) {
List<VDS> list = new ArrayList<>();

for (VDS vds : hosts) {
if (VmCpuCountHelper.isResizeAndPinPolicy(vm) && !VmCpuCountHelper.isDynamicCpuTopologySet(vm)) {
if (vds.getCpuCores() / vds.getCpuSockets() > 1) {
list.add(vds);
} else {
messages.addMessage(vds.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES.toString());
log.debug("Host '{}' has only one core per socket. Resize and pin requires more than one core per socket", vds.getName());
}
public List<VDS> filter(SchedulingContext context, List<VDS> hosts, List<VM> vmGroup, PerHostMessages messages) {
List<VDS> candidates = new ArrayList<>();

boolean anyVmWithUnsetDynamicTopology = vmGroup.stream()
.anyMatch(vm -> VmCpuCountHelper.isResizeAndPinPolicy(vm)
&& !VmCpuCountHelper.isDynamicCpuTopologySet(vm));

// the number of host's shared CPUs has to >= as the number of CPUs of any VM
int maxVmGroupSharedCpuCount = vmGroup.stream()
.filter(vm -> !vm.getCpuPinningPolicy().isExclusivelyPinned())
.mapToInt(vm -> VmCpuCountHelper.isDynamicCpuTopologySet(vm)
? vm.getCurrentNumOfCpus(false)
: vm.getNumOfCpus(false))
.max()
.orElse(0);

for (VDS host : hosts) {

Integer hostCpuCount = SlaValidator.getEffectiveCpuCores(host, context.getCluster().getCountThreadsAsCores());

if (hostCpuCount == null) {
candidates.add(host);
continue;
}
Integer cores = SlaValidator.getEffectiveCpuCores(vds, context.getCluster().getCountThreadsAsCores());
if (cores != null) {
int numOfCpus = VmCpuCountHelper.isDynamicCpuTopologySet(vm) ?
vm.getCurrentNumOfCpus(false) : vm.getNumOfCpus(false);
if (vm.getCpuPinningPolicy() == CpuPinningPolicy.DEDICATED) {
int futureCpus = context.getCluster().getCountThreadsAsCores() ? cores - vds.getVmsCoresCount() :
cores - (int) Math.ceil(vds.getVmsCoresCount() / (vds.getCpuThreads() / vds.getCpuCores()));
if (numOfCpus > futureCpus) {
messageNotEnoughCores(vds, cores, vm, messages);
continue;
}
} else {
int takenCpus = 0;
// takenCpus are CPUs (threads), we should consider switching it to cores when necessary.
takenCpus = context.getCluster().getCountThreadsAsCores() ?
vdsCpuUnitPinningHelper.getDedicatedCount(vds.getId()) :
vdsCpuUnitPinningHelper.countTakenCores(vds);
cores = cores - takenCpus;
if (numOfCpus > cores) {
messageNotEnoughCores(vds, cores, vm, messages);
continue;
}

// when the dynamic topology is not set, the host requires more then one core per socket
if (anyVmWithUnsetDynamicTopology) {
int coresPerSocket = 0;

if (host.getCpuCores() != null && host.getCpuSockets() != null && host.getCpuSockets() != 0) {
coresPerSocket = host.getCpuCores() / host.getCpuSockets();
}

if (coresPerSocket <= 1) {
messages.addMessage(host.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES.toString());
log.debug(
"Host '{}' has only one core per socket. Resize and pin requires more than one core per socket",
host.getName());
continue;
}
}

Map<Guid, List<VdsCpuUnit>> vmToPendingExclusiveCpuPinnings =
PendingCpuPinning.collectForHost(getPendingResourceManager(), host.getId());
int unavailableCpus = vdsCpuUnitPinningHelper.countUnavailableCpus(
host,
vmToPendingExclusiveCpuPinnings,
context.getCluster().getCountThreadsAsCores());

int addedExclusivelyPinnedCpus = vmGroup.stream()
.filter(vm -> vm.getCpuPinningPolicy().isExclusivelyPinned())
.mapToInt(vm -> vm.getNumOfCpus())
.sum();

long maxPendingSharedCount = pendingResourceManager.pendingHostResources(host.getId(), PendingCpuCores.class)
.stream()
.filter(pending -> !pending.getCpuPinningPolicy().isExclusivelyPinned())
.mapToLong(pending -> pending.getCoreCount())
.max().orElse(0);

long maxRunningSharedCpuCount = resourceManager.getVdsManager(host.getId()).getMinRequiredSharedCpusCount();

long maxSharedCpuCount = LongStream.of(
maxVmGroupSharedCpuCount,
maxPendingSharedCount,
maxRunningSharedCpuCount).max().orElse(0);

if (hostCpuCount - unavailableCpus - addedExclusivelyPinnedCpus - maxSharedCpuCount < 0) {
messageNotEnoughCores(host, hostCpuCount, unavailableCpus, addedExclusivelyPinnedCpus, maxVmGroupSharedCpuCount, messages);
continue;
}

list.add(vds);
candidates.add(host);
}
return list;
return candidates;
}

private void messageNotEnoughCores(VDS vds, int cores, VM vm, PerHostMessages messages) {
private void messageNotEnoughCores(VDS vds, int hostCpu, int unavailable, int exclusive, int shared, PerHostMessages messages) {
messages.addMessage(vds.getId(), EngineMessage.VAR__DETAIL__NOT_ENOUGH_CORES.toString());
log.debug("Host '{}' has less cores ({}) than vm cores ({})",
log.debug("Host '{}' has not enough cores (total - {}, unavailable for schedulling - {}) to schedule vms with requested number of CPUs (exclusive - {}, shared - {})",
vds.getName(),
cores,
VmCpuCountHelper.getDynamicNumOfCpu(vm));
hostCpu,
unavailable,
exclusive,
shared);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public boolean isDedicatedCpuPinningPossibleAtHost(Map<Guid, List<VdsCpuUnit>> v
return false;
}

previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingPinnings, vm.getCpuPinningPolicy());
previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingPinnings);

int socketsLeft = vm.getNumOfSockets();

Expand All @@ -67,11 +67,11 @@ public boolean isDedicatedCpuPinningPossibleAtHost(Map<Guid, List<VdsCpuUnit>> v
return false;
}

private void previewPinOfPendingExclusiveCpus(List<VdsCpuUnit> cpuTopology, Map<Guid, List<VdsCpuUnit>> vmToPendingPinning, CpuPinningPolicy cpuPinningPolicy) {
private void previewPinOfPendingExclusiveCpus(List<VdsCpuUnit> cpuTopology, Map<Guid, List<VdsCpuUnit>> vmToPendingPinning) {
for (var vmToPendingPinningEntry : vmToPendingPinning.entrySet()) {
vmToPendingPinningEntry.getValue().forEach(vdsCpuUnit -> {
VdsCpuUnit cpuUnit = getCpu(cpuTopology, vdsCpuUnit.getCpu());
cpuUnit.pinVm(vmToPendingPinningEntry.getKey(), cpuPinningPolicy);
cpuUnit.pinVm(vmToPendingPinningEntry.getKey(), vdsCpuUnit.getCpuPinningPolicy());
});
}
}
Expand All @@ -95,7 +95,7 @@ public List<VdsCpuUnit> updatePhysicalCpuAllocations(VM vm, Map<Guid, List<VdsCp
return new ArrayList<>();
}

previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingPinnings, vm.getCpuPinningPolicy());
previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingPinnings);

if (vm.getCpuPinningPolicy() != CpuPinningPolicy.DEDICATED) {
List<VdsCpuUnit> cpusToBeAllocated = new ArrayList<>();
Expand Down Expand Up @@ -171,11 +171,14 @@ private List<VdsCpuUnit> allocateDedicatedCpus(List<VdsCpuUnit> cpuTopology, VM
return cpusToBeAllocated;
}

public int countTakenCores(VDS host) {
public int countTakenCores(VDS host, Map<Guid, List<VdsCpuUnit>> vmToPendingDedicatedCpuPinnings) {
List<VdsCpuUnit> cpuTopology = resourceManager.getVdsManager(host.getId()).getCpuTopology();
previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingDedicatedCpuPinnings);

if (cpuTopology.isEmpty()) {
return 0;
}

int numOfTakenCores = 0;
for (int socket : getOnlineSockets(cpuTopology)) {
for (int core : getOnlineCores(cpuTopology, socket)) {
Expand All @@ -187,6 +190,14 @@ public int countTakenCores(VDS host) {
return numOfTakenCores;
}

public int countUnavailableCpus(VDS host, Map<Guid, List<VdsCpuUnit>> vmToPendingDedicatedCpuPinnings, boolean countThreadsAsCores) {
if (countThreadsAsCores) {
return getDedicatedCount(host.getId(), vmToPendingDedicatedCpuPinnings);
} else {
return countTakenCores(host, vmToPendingDedicatedCpuPinnings);
}
}

private List<VdsCpuUnit> getCoresInSocket(List<VdsCpuUnit> cpuTopology, int socketId) {
return cpuTopology.stream().filter(vdsCpuUnit -> vdsCpuUnit.getSocket() == socketId).collect(Collectors.toList());
}
Expand Down Expand Up @@ -215,9 +226,13 @@ private List<VdsCpuUnit> getNonDedicatedCpusInCore(List<VdsCpuUnit> cpuTopology,
return getCpusInCore(getCoresInSocket(cpuTopology, socketId), coreId).stream().filter(cpu -> !cpu.isDedicated()).collect(Collectors.toList());
}

public int getDedicatedCount(Guid vdsId) {
return (int) resourceManager.getVdsManager(vdsId).getCpuTopology().stream()
.filter(VdsCpuUnit::isDedicated).count();
public int getDedicatedCount(Guid vdsId, Map<Guid, List<VdsCpuUnit>> vmToPendingPinnings) {
List<VdsCpuUnit> cpuTopology = resourceManager.getVdsManager(vdsId).getCpuTopology();
previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingPinnings);

return (int) cpuTopology.stream()
.filter(VdsCpuUnit::isDedicated)
.count();
}

private List<Integer> getOnlineSockets(List<VdsCpuUnit> cpuTopology) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.ovirt.engine.core.bll.scheduling.policyunits;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -16,21 +18,33 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.ovirt.engine.core.bll.scheduling.SchedulingContext;
import org.ovirt.engine.core.bll.scheduling.pending.PendingResourceManager;
import org.ovirt.engine.core.bll.scheduling.utils.VdsCpuUnitPinningHelper;
import org.ovirt.engine.core.common.businessentities.Cluster;
import org.ovirt.engine.core.common.businessentities.VDS;
import org.ovirt.engine.core.common.businessentities.VM;
import org.ovirt.engine.core.common.scheduling.PerHostMessages;
import org.ovirt.engine.core.compat.Guid;
import org.ovirt.engine.core.vdsbroker.ResourceManager;
import org.ovirt.engine.core.vdsbroker.VdsManager;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class CPUPolicyUnitTest {
@Mock
private VdsCpuUnitPinningHelper vdsCpuUnitPinningHelper;

@Mock
private PendingResourceManager pendingHostResources;

@Mock
private VdsManager vdsManager;

@Mock
private ResourceManager resourceManager;

@InjectMocks
private final CPUPolicyUnit cpuPolicyUnit = new CPUPolicyUnit(null, null);
private final CPUPolicyUnit cpuPolicyUnit = new CPUPolicyUnit(null, pendingHostResources);

private VDS vdsWithInvalidCpuInfo;

Expand Down Expand Up @@ -58,6 +72,9 @@ public void setUp() {

cluster = new Cluster();
cluster.setId(Guid.newGuid());

when(pendingHostResources.pendingHostResources(any(), any())).thenReturn(Collections.emptyList());
when(resourceManager.getVdsManager(any())).thenReturn(vdsManager);
}

/**
Expand Down Expand Up @@ -113,7 +130,8 @@ public void shouldKeepHostsWithEnoughThreads() {
private List<VDS> filter() {
return cpuPolicyUnit.filter(new SchedulingContext(cluster, Collections.emptyMap()),
Arrays.asList(vdsWithInvalidCpuInfo, vdsWithCores),
vm, mock(PerHostMessages.class));
Collections.singletonList(vm),
mock(PerHostMessages.class));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ public static CpuPinningPolicy forValue(Integer value) {
public int getValue() {
return value;
}

public boolean isExclusivelyPinned() {
return this == DEDICATED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public class VdsManager {
private HostConnectionRefresherInterface hostRefresher;
private volatile boolean inServerRebootTimeout;
private List<VdsCpuUnit> cpuTopology;
private int minRequiredSharedCpusCount;

VdsManager(VDS vds, ResourceManager resourceManager) {
this.resourceManager = resourceManager;
Expand Down Expand Up @@ -1329,4 +1330,12 @@ public void unpinVmCpus(Guid vmId) {
cpuTopology.stream().filter(cpu -> cpu.getVmIds().contains(vmId))
.forEach(cpu -> cpu.unPinVm(vmId));
}

public void setMinRequiredSharedCpusCount(int minSharedCpusCount) {
this.minRequiredSharedCpusCount = minSharedCpusCount;
}

public int getMinRequiredSharedCpusCount() {
return minRequiredSharedCpusCount;
}
}
Loading

0 comments on commit d0705be

Please sign in to comment.