Skip to content

Commit

Permalink
core: introduce isolated threads cpus
Browse files Browse the repository at this point in the history
This patch introduce the logic for isolated threads CpuPinningPolicy. In
this policy, each virtual CPU (thread) is taking a whole physical core
on the host. These cores are exclusive for the VM to use.

Change-Id: I1a3c00ea73f92208725f287c5b6de8d01365a3c0
Signed-off-by: Liran Rotenberg <lrotenbe@redhat.com>
  • Loading branch information
liranr23 committed Apr 12, 2022
1 parent 3c09efb commit 34a5f31
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ protected void addNumaPinningForDedicated(Guid vdsId) {
}

protected void setDedicatedCpus(VdsManager vdsManager) {
if (getVm().getCpuPinningPolicy() != CpuPinningPolicy.DEDICATED) {
if (getVm().getCpuPinningPolicy() != CpuPinningPolicy.DEDICATED &&
getVm().getCpuPinningPolicy() != CpuPinningPolicy.ISOLATE_THREADS) {
return;
}
getVm().setCurrentCpuPinning(getDedicatedCpuPinning(vdsManager));
Expand All @@ -418,7 +419,7 @@ protected void setDedicatedCpus(VdsManager vdsManager) {
protected String getDedicatedCpuPinning(VdsManager vdsManager) {
List<VdsCpuUnit> vdsCpuUnits = vdsManager.getCpuTopology().stream()
.filter(cpu -> cpu.getVmIds().contains(getVmId())).sorted().collect(Collectors.toList());
return CpuPinningHelper.createCpuPinningString(vdsCpuUnits);
return CpuPinningHelper.createCpuPinningString(vdsCpuUnits, getVm().getCpuPinningPolicy());
}

protected VdsManager getVdsManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,8 @@ public String createNumaPinningForDedicated(VM vm, Guid vdsId) {

return NumaPinningHelper.createNumaPinningAccordingToCpuPinning(
vm.getvNumaNodeList(),
cpuUnits);
cpuUnits,
vm.getCpuPinningPolicy());
}

public String createNumaPinningForDedicated(VM vm, List<VdsCpuUnit> cpuUnits) {
Expand All @@ -1368,7 +1369,8 @@ public String createNumaPinningForDedicated(VM vm, List<VdsCpuUnit> cpuUnits) {

return NumaPinningHelper.createNumaPinningAccordingToCpuPinning(
vm.getvNumaNodeList(),
cpuUnits);
cpuUnits,
vm.getCpuPinningPolicy());
}

public void autoSelectResumeBehavior(VmBase vmBase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public List<VDS> filter(final SchedulingContext context,
}
break;
case DEDICATED:
case ISOLATE_THREADS:
for (final VDS host : hosts) {
if (host.getCpuTopology() == null || host.getCpuTopology().isEmpty()) {
// means CPU topology not reported for this host, lets ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public boolean isDedicatedCpuPinningPossibleAtHost(Map<Guid, List<VdsCpuUnit>> v
VM vm, Guid hostId) {
List<VdsCpuUnit> cpuTopology = resourceManager.getVdsManager(hostId).getCpuTopology();

if (vm.getCpuPinningPolicy() != CpuPinningPolicy.DEDICATED) {
// TODO: Implementation for siblings and isolate threads
if (vm.getCpuPinningPolicy() != CpuPinningPolicy.DEDICATED &&
vm.getCpuPinningPolicy() != CpuPinningPolicy.ISOLATE_THREADS) {
// TODO: Implementation for siblings
return false;
}

Expand All @@ -53,7 +54,17 @@ public boolean isDedicatedCpuPinningPossibleAtHost(Map<Guid, List<VdsCpuUnit>> v
int socketsLeft = vm.getNumOfSockets();

for (int socket : getOnlineSockets(cpuTopology)) {
int coresInSocket = getAvailableCores(cpuTopology, socket, vm.getThreadsPerCpu());
int coresInSocket;
switch (vm.getCpuPinningPolicy()) {
case DEDICATED:
coresInSocket = getAvailableCores(cpuTopology, socket, vm.getThreadsPerCpu());
break;
case ISOLATE_THREADS:
coresInSocket = getAvailableCoresIsolated(cpuTopology, socket);
break;
default:
throw new IllegalStateException("Unexpected value: " + vm.getCpuPinningPolicy());
}
int totalSocketsTaken = coresInSocket / vm.getCpuPerSocket();
if (!vm.getvNumaNodeList().isEmpty()) {
int highestAmountOfvNumaNodesInSocket = getVirtualNumaNodesInSocket(cpuTopology, vm, hostId, socket);
Expand Down Expand Up @@ -97,7 +108,8 @@ public List<VdsCpuUnit> updatePhysicalCpuAllocations(VM vm, Map<Guid, List<VdsCp

previewPinOfPendingExclusiveCpus(cpuTopology, vmToPendingPinnings, vm.getCpuPinningPolicy());

if (vm.getCpuPinningPolicy() != CpuPinningPolicy.DEDICATED) {
if (vm.getCpuPinningPolicy() != CpuPinningPolicy.DEDICATED &&
vm.getCpuPinningPolicy() != CpuPinningPolicy.ISOLATE_THREADS) {
List<VdsCpuUnit> cpusToBeAllocated = new ArrayList<>();
String cpuPinning = vm.getVmPinning();
if (cpuPinning == null || cpuPinning.isEmpty()) {
Expand All @@ -118,6 +130,14 @@ public List<VdsCpuUnit> updatePhysicalCpuAllocations(VM vm, Map<Guid, List<VdsCp

filterSocketsWithInsufficientMemoryForNumaNode(cpuTopology, vm, hostId);
List<VdsCpuUnit> cpusToBeAllocated = allocateDedicatedCpus(cpuTopology, vm, hostId);
if (vm.getCpuPinningPolicy() == CpuPinningPolicy.ISOLATE_THREADS) {
List<Integer> socketsUsed = cpusToBeAllocated.stream().map(VdsCpuUnit::getSocket).distinct().collect(Collectors.toList());
int cpus = 0;
for (int socket: socketsUsed) {
cpus += getCoresInSocket(cpusToBeAllocated, socket).stream().map(VdsCpuUnit::getCore).distinct().count();
}
return cpus == vm.getNumOfCpus() ? cpusToBeAllocated : null;
}
return cpusToBeAllocated.size() == vm.getNumOfCpus() ? cpusToBeAllocated : null;
}

Expand All @@ -126,6 +146,7 @@ private List<VdsCpuUnit> allocateDedicatedCpus(List<VdsCpuUnit> cpuTopology, VM
List<VdsCpuUnit> cpusToBeAllocated = new ArrayList<>();
int socketsLeft = vm.getNumOfSockets();
int onlineSockets = getOnlineSockets(cpuTopology).size();
int totalCpuCount = 0;
while (onlineSockets > 0 && socketsLeft > 0) {
List<VdsCpuUnit> cpusInChosenSocket = getMaxFreedSocket(cpuTopology);
if (cpusInChosenSocket.isEmpty()) {
Expand All @@ -140,18 +161,37 @@ private List<VdsCpuUnit> allocateDedicatedCpus(List<VdsCpuUnit> cpuTopology, VM
// coreCount is based on the VM topology
int coreCount = 0;
for (int core : getOnlineCores(cpusInChosenSocket)) {
List<VdsCpuUnit> freeCpusInCore = getFreeCpusInCore(cpusInChosenSocket, core);
int coreThreads = freeCpusInCore.size();
while (coreThreads >= vm.getThreadsPerCpu() &&
cpusToBeAllocated.size() < vm.getNumOfCpus() &&
coreCount / vm.getCpuPerSocket() < amountOfVSocketsInPSockets) {
for (int thread = 0; thread < vm.getThreadsPerCpu() && cpusToBeAllocated.size() < vm.getNumOfCpus(); thread++) {
VdsCpuUnit cpuUnit = freeCpusInCore.remove(0);
cpuUnit.pinVm(vm.getId(), vm.getCpuPinningPolicy());
cpusToBeAllocated.add(cpuUnit);
}
coreCount++;
coreThreads -= vm.getThreadsPerCpu();
switch (vm.getCpuPinningPolicy()) {
case DEDICATED:
List<VdsCpuUnit> freeCpusInCore = getFreeCpusInCore(cpusInChosenSocket, core);
int coreThreads = freeCpusInCore.size();
while (coreThreads >= vm.getThreadsPerCpu() &&
cpusToBeAllocated.size() < vm.getNumOfCpus() &&
coreCount / vm.getCpuPerSocket() < amountOfVSocketsInPSockets) {
for (int thread = 0; thread < vm.getThreadsPerCpu() && cpusToBeAllocated.size() < vm.getNumOfCpus(); thread++) {
VdsCpuUnit cpuUnit = freeCpusInCore.remove(0);
cpuUnit.pinVm(vm.getId(), vm.getCpuPinningPolicy());
cpusToBeAllocated.add(cpuUnit);
}
coreCount++;
coreThreads -= vm.getThreadsPerCpu();
}
break;
case ISOLATE_THREADS:
List<VdsCpuUnit> cpusInCore = getCpusInCore(cpusInChosenSocket, core);
if (cpusInCore.stream().anyMatch(VdsCpuUnit::isPinned)) {
continue;
}
if (totalCpuCount < vm.getNumOfCpus() && coreCount / vm.getCpuPerSocket() < amountOfVSocketsInPSockets) {
cpusInCore.forEach(cpu -> {
cpu.pinVm(vm.getId(), vm.getCpuPinningPolicy());
cpusToBeAllocated.add(cpu);
});
totalCpuCount++;
if (totalCpuCount % vm.getThreadsPerCpu() == 0) {
coreCount++;
}
}
}
}
socketsLeft -= coreCount / vm.getCpuPerSocket();
Expand Down Expand Up @@ -253,6 +293,18 @@ private int getAvailableCores(List<VdsCpuUnit> cpuTopology, int socket, int vThr
return count;
}

private int getAvailableCoresIsolated(List<VdsCpuUnit> cpuTopology, int socket) {
int count = 0;
for (int core : getOnlineCores(cpuTopology, socket)) {
List<VdsCpuUnit> cpusInCore = getCpusInCore(getCoresInSocket(cpuTopology, socket), core);
if (cpusInCore.stream().anyMatch(VdsCpuUnit::isPinned)) {
continue;
}
count++;
}
return count;
}

/**
* Filters and removes physical sockets from being available based on the VM NUMA nodes memory requirements.
* The memory is accumulated per physical socket (can be multiple physical NUMA nodes), as we use INTERLEAVED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public enum CpuPinningPolicy {
/** Switch the VM topology to be maximized on the selected host. Sets the CPU pinning and NUMA accordingly */
RESIZE_AND_PIN_NUMA(2),
/** Dynamically dedicate pCPUs to be exclusively pinned to VM virtual CPU topology **/
DEDICATED(3);
DEDICATED(3),
ISOLATE_THREADS(4);

private int value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.ovirt.engine.core.common.businessentities.CpuPinningPolicy;
import org.ovirt.engine.core.common.businessentities.VdsCpuUnit;

public class CpuPinningHelper {
Expand Down Expand Up @@ -105,10 +97,38 @@ private static List<Integer> createRange(int start, int end) {
* @param vdsCpuUnits a list of VdsCpuUnit
* @return a map of CPU pinning
*/
public static Map<Integer, Integer> createCpuPinningMap(List<VdsCpuUnit> vdsCpuUnits) {
public static Map<Integer, Set<Integer>> createDedicatedCpuPinningMap(List<VdsCpuUnit> vdsCpuUnits) {
return IntStream.range(0, vdsCpuUnits.size())
.boxed()
.collect(Collectors.toMap(Function.identity(), i -> vdsCpuUnits.get(i).getCpu()));
.collect(Collectors.toMap(Function.identity(), i -> new LinkedHashSet<>(vdsCpuUnits.get(i).getCpu())));
}

public static Map<Integer, Set<Integer>> createCpuPinningMap(List<VdsCpuUnit> vdsCpuUnits, CpuPinningPolicy cpuPinningPolicy) {
switch(cpuPinningPolicy) {
case DEDICATED:
return createDedicatedCpuPinningMap(vdsCpuUnits);
case ISOLATE_THREADS:
return createIsolatedThreadsCpuPinningMap(vdsCpuUnits);
}
return null;
}

private static Map<Integer, Set<Integer>> createIsolatedThreadsCpuPinningMap(List<VdsCpuUnit> vdsCpuUnits) {
List<Integer> socketsUsed = vdsCpuUnits.stream().map(VdsCpuUnit::getSocket).distinct().collect(Collectors.toList());
Map<Integer, Set<Integer>> mapping = new HashMap<>();
int vcpu = 0;
for (int socket : socketsUsed) {
List<VdsCpuUnit> cpusInSocket = vdsCpuUnits.stream().filter(cpu -> cpu.getSocket() == socket).collect(Collectors.toList());
List<Integer> cores = cpusInSocket.stream().map(VdsCpuUnit::getCore).distinct().collect(Collectors.toList());
for (int core : cores) {
mapping.put(vcpu++, cpusInSocket
.stream()
.filter(cpu -> cpu.getCore() == core)
.map(VdsCpuUnit::getCpu)
.collect(Collectors.toSet()));
}
}
return mapping;
}

/**
Expand All @@ -118,11 +138,20 @@ public static Map<Integer, Integer> createCpuPinningMap(List<VdsCpuUnit> vdsCpuU
* @param vdsCpuUnits a list of VdsCpuUnit
* @return a string of CPU pinning
*/
public static String createCpuPinningString(List<VdsCpuUnit> vdsCpuUnits) {
return createCpuPinningMap(vdsCpuUnits)
public static String createCpuPinningString(List<VdsCpuUnit> vdsCpuUnits, CpuPinningPolicy cpuPinningPolicy) {
return createMappingString(createCpuPinningMap(vdsCpuUnits, cpuPinningPolicy));
}

public static String createMappingString(Map<Integer, Set<Integer>> mapping) {
return mapping
.entrySet()
.stream()
.map(entry -> String.format("%d#%d", entry.getKey(), entry.getValue()))
.map(entry -> {
String stringValues = new ArrayList<>(entry.getValue()).stream()
.map(String::valueOf)
.collect(Collectors.joining(","));
return String.format("%d#%s", entry.getKey(), stringValues);
})
.collect(Collectors.joining("_"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,39 +283,29 @@ public static void applyAutoPinningPolicy(VM vm, VdsDynamic vdsDynamic, List<Vds

public static String createNumaPinningAccordingToCpuPinning(
List<VmNumaNode> vmNodeList,
List<VdsCpuUnit> cpuUnits) {
List<VdsCpuUnit> cpuUnits,
CpuPinningPolicy cpuPinningPolicy) {

Map<Integer, Integer> cpuMappings =
CpuPinningHelper.createCpuPinningMap(cpuUnits);
Map<Integer, Set<Integer>> cpuMappings =
CpuPinningHelper.createCpuPinningMap(cpuUnits, cpuPinningPolicy);
Map<Integer, Set<Integer>> numaMappings = new HashMap<>();

for (VmNumaNode vmNode : vmNodeList) {
Set<Integer> pNumaNodesIndexes = new HashSet<>();
for (Integer vCpuId : vmNode.getCpuIds()) {
Integer pCpu = cpuMappings.get(vCpuId);
VdsCpuUnit found =
cpuUnits.stream().filter(cpuUnit -> cpuUnit.getCpu() == pCpu).findFirst().orElse(null);
if (found != null) {
pNumaNodesIndexes.add(found.getNuma());
}
Set<Integer> pCpus = cpuMappings.get(vCpuId);
pCpus.forEach(pCpu -> {
VdsCpuUnit found =
cpuUnits.stream().filter(cpuUnit -> cpuUnit.getCpu() == pCpu).findFirst().orElse(null);
if (found != null) {
pNumaNodesIndexes.add(found.getNuma());
}
});
}
numaMappings.put(vmNode.getIndex(), pNumaNodesIndexes);
}

return createNumaMappingString(numaMappings);
}

private static String createNumaMappingString(Map<Integer, Set<Integer>> numaMapping) {
return numaMapping
.entrySet()
.stream()
.map(entry -> {
String stringValues = (String) new ArrayList<Integer>(entry.getValue()).stream()
.map(String::valueOf)
.collect(Collectors.joining(","));
return String.format("%d#%s", entry.getKey(), stringValues);
})
.collect(Collectors.joining("_"));
return CpuPinningHelper.createMappingString(numaMappings);
}

public static Map<Integer, Collection<Integer>> parseNumaMapping(String numaPinningString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,11 @@ private void writeCpuPinningPolicyMetadata() {
if (vm.getVmPinning() != null && cpuPinningPolicy == CpuPinningPolicy.NONE) {
cpuPinningPolicy = CpuPinningPolicy.MANUAL;
}
writer.writeElement(OVIRT_VM_URI, "cpuPolicy", cpuPinningPolicy.name().toLowerCase());
String cpuPinningPolicyName = cpuPinningPolicy.name().toLowerCase();
if (cpuPinningPolicy == CpuPinningPolicy.ISOLATE_THREADS) {
cpuPinningPolicyName = "isolate-threads";
}
writer.writeElement(OVIRT_VM_URI, "cpuPolicy", cpuPinningPolicyName);
}

private void writePowerEvents() {
Expand Down

0 comments on commit 34a5f31

Please sign in to comment.