Skip to content

Commit

Permalink
UPSTREAM: <carry>: add management support to kubelet
Browse files Browse the repository at this point in the history
UPSTREAM: <carry>: management workloads enhancement 741

UPSTREAM: <carry>: lower verbosity of managed workloads logging

Support for managed workloads was introduced by PR#627.  However, the
the CPU manager reconcile loop now seems to flood kubelet log with
"reconcileState: skipping pod; pod is managed" warnings.  Lower the
verbosity of these log messages.

UPSTREAM: <carry>: set correctly static pods CPUs when workload partitioning is disabled

UPSTREAM: <carry>: Remove reserved CPUs from default set

Remove reserved CPUs from default set when workload partitioning is
enabled.

Co-Authored-By: Brent Rowsell <browsell@redhat.com>
Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
Signed-off-by: Don Penney <dpenney@redhat.com>
OpenShift-Rebase-Source: b762ced
OpenShift-Rebase-Source: 63cf793
OpenShift-Rebase-Source: 32af64c

UPSTREAM: <carry>: add management support to kubelet

UPSTREAM: <carry>: OCPBUGS-29520: fix cpu manager default cpuset check in workload partitioned env

(this can be squashed to  04070bb UPSTREAM: : add management support to kubelet)

Workload partitioning makes the separation between reserved and workload cpus more strict. It is therefore expected the reserved cpus are NOT part of the default cpuset and the existing check was overzealous.

First execution of kubelet after reboot never gets here as the cpuset is computed on line 209. However a kubelet restart without reboot skips this code, recovers from state file and runs the check on line 220.

This was uncovered by decoupling the cpu manager state file cleanup from kubelet restart, doing it only once at reboot as part of OCPBUGS-24366

UPSTREAM: <carry>: add management workload check for guaranteed qos

when static pods have workload partitioning enabled we should not alter their resources if they are Guaranteed QoS, this change adds a check for Guaranteed QoS

Signed-off-by: ehila <ehila@redhat.com>

test: add unit tests for error states

Signed-off-by: ehila <ehila@redhat.com>
  • Loading branch information
rphillips authored and soltysh committed Jul 1, 2024
1 parent 062903c commit 6597eb1
Show file tree
Hide file tree
Showing 10 changed files with 1,017 additions and 14 deletions.
6 changes: 6 additions & 0 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/utils/cpuset"
)
Expand Down Expand Up @@ -407,13 +408,18 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
failure = []reconciledContainer{}

m.removeStaleState()
workloadEnabled := managed.IsEnabled()
for _, pod := range m.activePods() {
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
klog.V(4).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
failure = append(failure, reconciledContainer{pod.Name, "", ""})
continue
}
if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled {
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
continue
}

allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
Expand Down
21 changes: 18 additions & 3 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/cpuset"
Expand Down Expand Up @@ -203,14 +204,20 @@ func (p *staticPolicy) validateState(s state.State) error {
// state is empty initialize
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
if managed.IsEnabled() {
defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs)
s.SetDefaultCPUSet(defaultCpus)
}
return nil
}

// State has already been initialized from file (is not empty)
// 1. Check if the reserved cpuset is not part of default cpuset because:
// - kube/system reserved have changed (increased) - may lead to some containers not being able to start
// - user tampered with file
if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
// 2. This only applies when managed mode is disabled. Active workload partitioning feature
// removes the reserved cpus from the default cpu mask on purpose.
if !managed.IsEnabled() && !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
p.reservedCPUs.String(), tmpDefaultCPUset.String())
}
Expand Down Expand Up @@ -241,9 +248,17 @@ func (p *staticPolicy) validateState(s state.State) error {
}
}
totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) {
availableCPUs := p.topology.CPUDetails.CPUs()

// CPU (workload) partitioning removes reserved cpus
// from the default mask intentionally
if managed.IsEnabled() {
availableCPUs = availableCPUs.Difference(p.reservedCPUs)
}

if !totalKnownCPUs.Equals(availableCPUs) {
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String())
availableCPUs.String(), totalKnownCPUs.String())
}

return nil
Expand Down
48 changes: 37 additions & 11 deletions pkg/kubelet/cm/cpumanager/policy_static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"reflect"
"testing"

"k8s.io/kubernetes/pkg/kubelet/managed"

v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
Expand Down Expand Up @@ -939,17 +941,18 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
// above test cases are without kubelet --reserved-cpus cmd option
// the following tests are with --reserved-cpus configured
type staticPolicyTestWithResvList struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
reserved cpuset.CPUSet
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expErr error
expNewErr error
expCPUAlloc bool
expCSet cpuset.CPUSet
description string
topo *topology.CPUTopology
numReservedCPUs int
reserved cpuset.CPUSet
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expErr error
expNewErr error
expCPUAlloc bool
expCSet cpuset.CPUSet
managementPartition bool
}

func TestStaticPolicyStartWithResvList(t *testing.T) {
Expand Down Expand Up @@ -981,9 +984,32 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
},
{
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled",
topo: topoDualSocketHT,
numReservedCPUs: 2,
stAssignments: state.ContainerCPUAssignments{},
managementPartition: true,
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
},
{
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled during recovery",
topo: topoDualSocketHT,
numReservedCPUs: 2,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
managementPartition: true,
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
wasManaged := managed.IsEnabled()
managed.TestOnlySetEnabled(testCase.managementPartition)
defer func() {
managed.TestOnlySetEnabled(wasManaged)
}()

p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
if !reflect.DeepEqual(err, testCase.expNewErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/cm/qos_container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1/resource"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/managed"
)

const (
Expand Down Expand Up @@ -174,6 +175,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
reuseReqs := make(v1.ResourceList, 4)
for i := range pods {
pod := pods[i]
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
continue
}
qosClass := v1qos.GetPodQOS(pod)
if qosClass != v1.PodQOSBurstable {
// we only care about the burstable qos tier
Expand Down
11 changes: 11 additions & 0 deletions pkg/kubelet/config/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/managed"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
utilio "k8s.io/utils/io"
)
Expand Down Expand Up @@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
if podErr != nil {
return pod, podErr
}
if managed.IsEnabled() {
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
} else if newPod != nil {
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
pod = newPod
} else {
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
}
}
return pod, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
Expand Down Expand Up @@ -629,6 +630,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,

klet.runtimeService = kubeDeps.RemoteRuntimeService

if managed.IsEnabled() {
klog.InfoS("Pinned Workload Management Enabled")
}

if kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import (
"k8s.io/klog/v2"
kubeletapis "k8s.io/kubelet/pkg/apis"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
taintutil "k8s.io/kubernetes/pkg/util/taints"
volutil "k8s.io/kubernetes/pkg/volume/util"
Expand Down Expand Up @@ -117,6 +119,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
if managed.IsEnabled() {
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
}
if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
Expand All @@ -127,6 +132,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return true
}

// addManagementNodeCapacity adds the managednode capacity to the node
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
updateDefaultResources(initialNode, existingNode)
machineInfo, err := kl.cadvisor.MachineInfo()
if err != nil {
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
return false
}
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
cpuRequestInMilli := cpuRequest.MilliValue()
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
managedResourceName := managed.GenerateResourceName("management")
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
return false
}
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
return true
}

// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
requiresUpdate := updateDefaultResources(initialNode, existingNode)
Expand Down Expand Up @@ -418,6 +442,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
}
}
}
if managed.IsEnabled() {
kl.addManagementNodeCapacity(node, node)
}

kl.setNodeStatus(ctx, node)

Expand Down
30 changes: 30 additions & 0 deletions pkg/kubelet/managed/cpu_shares.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package managed

const (
// These limits are defined in the kernel:
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
MinShares = 2
MaxShares = 262144

SharesPerCPU = 1024
MilliCPUToCPU = 1000
)

// MilliCPUToShares converts the milliCPU to CFS shares.
func MilliCPUToShares(milliCPU int64) uint64 {
if milliCPU == 0 {
// Docker converts zero milliCPU to unset, which maps to kernel default
// for unset: 1024. Return 2 here to really match kernel default for
// zero milliCPU.
return MinShares
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
if shares < MinShares {
return MinShares
}
if shares > MaxShares {
return MaxShares
}
return uint64(shares)
}
Loading

0 comments on commit 6597eb1

Please sign in to comment.