From 6597eb13530467338460c0891a19f7458687a40c Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Mon, 29 Mar 2021 12:02:13 -0500 Subject: [PATCH] UPSTREAM: : add management support to kubelet UPSTREAM: : management workloads enhancement 741 UPSTREAM: : lower verbosity of managed workloads logging Support for managed workloads was introduced by PR#627. However, the the CPU manager reconcile loop now seems to flood kubelet log with "reconcileState: skipping pod; pod is managed" warnings. Lower the verbosity of these log messages. UPSTREAM: : set correctly static pods CPUs when workload partitioning is disabled UPSTREAM: : Remove reserved CPUs from default set Remove reserved CPUs from default set when workload partitioning is enabled. Co-Authored-By: Brent Rowsell Signed-off-by: Artyom Lukianov Signed-off-by: Don Penney OpenShift-Rebase-Source: b762ced05a6 OpenShift-Rebase-Source: 63cf7937fbc OpenShift-Rebase-Source: 32af64c1458 UPSTREAM: : add management support to kubelet UPSTREAM: : OCPBUGS-29520: fix cpu manager default cpuset check in workload partitioned env (this can be squashed to 04070bb40de8dc3570b0ed68ec1debb133e03606 UPSTREAM: : add management support to kubelet) Workload partitioning makes the separation between reserved and workload cpus more strict. It is therefore expected the reserved cpus are NOT part of the default cpuset and the existing check was overzealous. First execution of kubelet after reboot never gets here as the cpuset is computed on line 209. However a kubelet restart without reboot skips this code, recovers from state file and runs the check on line 220. This was uncovered by decoupling the cpu manager state file cleanup from kubelet restart, doing it only once at reboot as part of OCPBUGS-24366 UPSTREAM: : add management workload check for guaranteed qos when static pods have workload partitioning enabled we should not alter their resources if they are Guaranteed QoS, this change adds a check for Guaranteed QoS Signed-off-by: ehila test: add unit tests for error states Signed-off-by: ehila --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 6 + pkg/kubelet/cm/cpumanager/policy_static.go | 21 +- .../cm/cpumanager/policy_static_test.go | 48 +- pkg/kubelet/cm/qos_container_manager_linux.go | 4 + pkg/kubelet/config/file.go | 11 + pkg/kubelet/kubelet.go | 5 + pkg/kubelet/kubelet_node_status.go | 27 + pkg/kubelet/managed/cpu_shares.go | 30 + pkg/kubelet/managed/managed.go | 204 ++++++ pkg/kubelet/managed/managed_test.go | 675 ++++++++++++++++++ 10 files changed, 1017 insertions(+), 14 deletions(-) create mode 100644 pkg/kubelet/managed/cpu_shares.go create mode 100644 pkg/kubelet/managed/managed.go create mode 100644 pkg/kubelet/managed/managed_test.go diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 8b5049d7d7476..affaf97da8c4d 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/utils/cpuset" ) @@ -407,6 +408,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec failure = []reconciledContainer{} m.removeStaleState() + workloadEnabled := managed.IsEnabled() for _, pod := range m.activePods() { pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) if !ok { @@ -414,6 +416,10 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec failure = append(failure, reconciledContainer{pod.Name, "", ""}) continue } + if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled { + klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name) + continue + } allContainers := pod.Spec.InitContainers allContainers = append(allContainers, pod.Spec.Containers...) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index d22a6a64d5ea9..d4579b9aa7fc6 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/utils/cpuset" @@ -203,6 +204,10 @@ func (p *staticPolicy) validateState(s state.State) error { // state is empty initialize allCPUs := p.topology.CPUDetails.CPUs() s.SetDefaultCPUSet(allCPUs) + if managed.IsEnabled() { + defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs) + s.SetDefaultCPUSet(defaultCpus) + } return nil } @@ -210,7 +215,9 @@ func (p *staticPolicy) validateState(s state.State) error { // 1. Check if the reserved cpuset is not part of default cpuset because: // - kube/system reserved have changed (increased) - may lead to some containers not being able to start // - user tampered with file - if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) { + // 2. This only applies when managed mode is disabled. Active workload partitioning feature + // removes the reserved cpus from the default cpu mask on purpose. + if !managed.IsEnabled() && !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) { return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"", p.reservedCPUs.String(), tmpDefaultCPUset.String()) } @@ -241,9 +248,17 @@ func (p *staticPolicy) validateState(s state.State) error { } } totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...) - if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) { + availableCPUs := p.topology.CPUDetails.CPUs() + + // CPU (workload) partitioning removes reserved cpus + // from the default mask intentionally + if managed.IsEnabled() { + availableCPUs = availableCPUs.Difference(p.reservedCPUs) + } + + if !totalKnownCPUs.Equals(availableCPUs) { return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"", - p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String()) + availableCPUs.String(), totalKnownCPUs.String()) } return nil diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index bcd1e19ba297c..f86b01d9ea7f8 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -21,6 +21,8 @@ import ( "reflect" "testing" + "k8s.io/kubernetes/pkg/kubelet/managed" + v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -939,17 +941,18 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { // above test cases are without kubelet --reserved-cpus cmd option // the following tests are with --reserved-cpus configured type staticPolicyTestWithResvList struct { - description string - topo *topology.CPUTopology - numReservedCPUs int - reserved cpuset.CPUSet - stAssignments state.ContainerCPUAssignments - stDefaultCPUSet cpuset.CPUSet - pod *v1.Pod - expErr error - expNewErr error - expCPUAlloc bool - expCSet cpuset.CPUSet + description string + topo *topology.CPUTopology + numReservedCPUs int + reserved cpuset.CPUSet + stAssignments state.ContainerCPUAssignments + stDefaultCPUSet cpuset.CPUSet + pod *v1.Pod + expErr error + expNewErr error + expCPUAlloc bool + expCSet cpuset.CPUSet + managementPartition bool } func TestStaticPolicyStartWithResvList(t *testing.T) { @@ -981,9 +984,32 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"), }, + { + description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled", + topo: topoDualSocketHT, + numReservedCPUs: 2, + stAssignments: state.ContainerCPUAssignments{}, + managementPartition: true, + expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + }, + { + description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled during recovery", + topo: topoDualSocketHT, + numReservedCPUs: 2, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + managementPartition: true, + expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { + wasManaged := managed.IsEnabled() + managed.TestOnlySetEnabled(testCase.managementPartition) + defer func() { + managed.TestOnlySetEnabled(wasManaged) + }() + p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) if !reflect.DeepEqual(err, testCase.expNewErr) { t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", diff --git a/pkg/kubelet/cm/qos_container_manager_linux.go b/pkg/kubelet/cm/qos_container_manager_linux.go index a740acbbb499b..0fa8cb55b3e3e 100644 --- a/pkg/kubelet/cm/qos_container_manager_linux.go +++ b/pkg/kubelet/cm/qos_container_manager_linux.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1/resource" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" kubefeatures "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/managed" ) const ( @@ -174,6 +175,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass] reuseReqs := make(v1.ResourceList, 4) for i := range pods { pod := pods[i] + if enabled, _, _ := managed.IsPodManaged(pod); enabled { + continue + } qosClass := v1qos.GetPodQOS(pod) if qosClass != v1.PodQOSBurstable { // we only care about the burstable qos tier diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 79e2af6ed6216..d2526ec989d97 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/kubelet/managed" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" utilio "k8s.io/utils/io" ) @@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) { if podErr != nil { return pod, podErr } + if managed.IsEnabled() { + if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil { + klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace) + } else if newPod != nil { + klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations) + pod = newPod + } else { + klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace) + } + } return pod, nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9471c3a8b8261..39d6bbce0cebb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -88,6 +88,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/kuberuntime" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics/collectors" "k8s.io/kubernetes/pkg/kubelet/network/dns" @@ -629,6 +630,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.runtimeService = kubeDeps.RemoteRuntimeService + if managed.IsEnabled() { + klog.InfoS("Pinned Workload Management Enabled") + } + if kubeDeps.KubeClient != nil { klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient) } diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index ca5e732ea388e..347ffc3418934 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -38,7 +38,9 @@ import ( "k8s.io/klog/v2" kubeletapis "k8s.io/kubelet/pkg/apis" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/nodestatus" taintutil "k8s.io/kubernetes/pkg/util/taints" volutil "k8s.io/kubernetes/pkg/volume/util" @@ -117,6 +119,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate + if managed.IsEnabled() { + requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate + } if requiresUpdate { if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil { klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node)) @@ -127,6 +132,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { return true } +// addManagementNodeCapacity adds the managednode capacity to the node +func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool { + updateDefaultResources(initialNode, existingNode) + machineInfo, err := kl.cadvisor.MachineInfo() + if err != nil { + klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err) + return false + } + cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU] + cpuRequestInMilli := cpuRequest.MilliValue() + newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format) + managedResourceName := managed.GenerateResourceName("management") + if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) { + return false + } + existingNode.Status.Capacity[managedResourceName] = *newCPURequest + return true +} + // reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool { requiresUpdate := updateDefaultResources(initialNode, existingNode) @@ -418,6 +442,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) { } } } + if managed.IsEnabled() { + kl.addManagementNodeCapacity(node, node) + } kl.setNodeStatus(ctx, node) diff --git a/pkg/kubelet/managed/cpu_shares.go b/pkg/kubelet/managed/cpu_shares.go new file mode 100644 index 0000000000000..de60b6d6e755e --- /dev/null +++ b/pkg/kubelet/managed/cpu_shares.go @@ -0,0 +1,30 @@ +package managed + +const ( + // These limits are defined in the kernel: + // https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428 + MinShares = 2 + MaxShares = 262144 + + SharesPerCPU = 1024 + MilliCPUToCPU = 1000 +) + +// MilliCPUToShares converts the milliCPU to CFS shares. +func MilliCPUToShares(milliCPU int64) uint64 { + if milliCPU == 0 { + // Docker converts zero milliCPU to unset, which maps to kernel default + // for unset: 1024. Return 2 here to really match kernel default for + // zero milliCPU. + return MinShares + } + // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. + shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU + if shares < MinShares { + return MinShares + } + if shares > MaxShares { + return MaxShares + } + return uint64(shares) +} diff --git a/pkg/kubelet/managed/managed.go b/pkg/kubelet/managed/managed.go new file mode 100644 index 0000000000000..3d9ff87aa625b --- /dev/null +++ b/pkg/kubelet/managed/managed.go @@ -0,0 +1,204 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package managed + +import ( + "encoding/json" + "fmt" + "os" + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +var ( + pinnedManagementEnabled bool + pinnedManagementFilename = "/etc/kubernetes/openshift-workload-pinning" +) + +const ( + qosWarning = "skipping pod CPUs requests modifications because it has guaranteed QoS class" + WorkloadsAnnotationPrefix = "target.workload.openshift.io/" + WorkloadsCapacitySuffix = "workload.openshift.io/cores" + ContainerAnnotationPrefix = "resources.workload.openshift.io/" + WorkloadAnnotationWarning = "workload.openshift.io/warning" +) + +type WorkloadContainerAnnotation struct { + CpuShares uint64 `json:"cpushares"` +} + +func NewWorkloadContainerAnnotation(cpushares uint64) WorkloadContainerAnnotation { + return WorkloadContainerAnnotation{ + CpuShares: cpushares, + } +} + +func (w WorkloadContainerAnnotation) Serialize() ([]byte, error) { + return json.Marshal(w) +} + +func init() { + readEnablementFile() +} + +func readEnablementFile() { + if _, err := os.Stat(pinnedManagementFilename); err == nil { + pinnedManagementEnabled = true + } +} + +// TestOnlySetEnabled allows changing the state of management partition enablement +// This method MUST NOT be used outside of test code +func TestOnlySetEnabled(enabled bool) { + pinnedManagementEnabled = enabled +} + +func IsEnabled() bool { + return pinnedManagementEnabled +} + +// IsPodManaged returns true and the name of the workload if enabled. +// returns true, workload name, and the annotation payload. +func IsPodManaged(pod *v1.Pod) (bool, string, string) { + if pod.ObjectMeta.Annotations == nil { + return false, "", "" + } + for annotation, value := range pod.ObjectMeta.Annotations { + if strings.HasPrefix(annotation, WorkloadsAnnotationPrefix) { + return true, strings.TrimPrefix(annotation, WorkloadsAnnotationPrefix), value + } + } + return false, "", "" +} + +// ModifyStaticPodForPinnedManagement will modify a pod for pod management +func ModifyStaticPodForPinnedManagement(pod *v1.Pod) (*v1.Pod, string, error) { + pod = pod.DeepCopy() + enabled, workloadName, value := IsPodManaged(pod) + if !enabled { + return nil, "", nil + } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if isPodGuaranteed(pod) { + stripWorkloadAnnotations(pod.Annotations) + pod.Annotations[WorkloadAnnotationWarning] = qosWarning + return pod, "", nil + } + pod.Annotations[fmt.Sprintf("%v%v", WorkloadsAnnotationPrefix, workloadName)] = value + if err := updateContainers(workloadName, pod); err != nil { + return nil, "", err + } + return pod, workloadName, nil +} + +func GenerateResourceName(workloadName string) v1.ResourceName { + return v1.ResourceName(fmt.Sprintf("%v.%v", workloadName, WorkloadsCapacitySuffix)) +} + +func updateContainers(workloadName string, pod *v1.Pod) error { + updateContainer := func(container *v1.Container) error { + if container.Resources.Requests == nil { + return fmt.Errorf("managed container %v does not have Resource.Requests", container.Name) + } + if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok { + return fmt.Errorf("managed container %v does not have cpu requests", container.Name) + } + if _, ok := container.Resources.Requests[v1.ResourceMemory]; !ok { + return fmt.Errorf("managed container %v does not have memory requests", container.Name) + } + if container.Resources.Limits == nil { + container.Resources.Limits = v1.ResourceList{} + } + cpuRequest := container.Resources.Requests[v1.ResourceCPU] + cpuRequestInMilli := cpuRequest.MilliValue() + + containerAnnotation := NewWorkloadContainerAnnotation(MilliCPUToShares(cpuRequestInMilli)) + jsonAnnotation, _ := containerAnnotation.Serialize() + containerNameKey := fmt.Sprintf("%v%v", ContainerAnnotationPrefix, container.Name) + + newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format) + + pod.Annotations[containerNameKey] = string(jsonAnnotation) + container.Resources.Requests[GenerateResourceName(workloadName)] = *newCPURequest + container.Resources.Limits[GenerateResourceName(workloadName)] = *newCPURequest + + delete(container.Resources.Requests, v1.ResourceCPU) + return nil + } + for idx := range pod.Spec.Containers { + if err := updateContainer(&pod.Spec.Containers[idx]); err != nil { + return err + } + } + for idx := range pod.Spec.InitContainers { + if err := updateContainer(&pod.Spec.InitContainers[idx]); err != nil { + return err + } + } + return nil +} + +// isPodGuaranteed checks if the pod has a guaranteed QoS. +// This QoS check is different from the library versions that check QoS, +// this is because of the order at which changes get observed. +// (i.e. the library assumes the defaulter has ran on the pod resource before calculating QoS). +// +// The files will get interpreted before they reach the API server and before the defaulter applies changes, +// this function takes into account the case where only `limits.cpu` are provided but no `requests.cpu` are since that +// counts as a Guaranteed QoS after the defaulter runs. +func isPodGuaranteed(pod *v1.Pod) bool { + isGuaranteed := func(containers []v1.Container) bool { + for _, c := range containers { + // only memory and CPU resources are relevant to decide pod QoS class + for _, r := range []v1.ResourceName{v1.ResourceMemory, v1.ResourceCPU} { + limit := c.Resources.Limits[r] + request, requestExist := c.Resources.Requests[r] + if limit.IsZero() { + return false + } + if !requestExist { + continue + } + // In some corner case, when you set CPU requests to 0 the k8s defaulter will change it to the value + // specified under the limit. + if r == v1.ResourceCPU && request.IsZero() { + continue + } + if !limit.Equal(request) { + return false + } + } + } + return true + } + return isGuaranteed(pod.Spec.InitContainers) && isGuaranteed(pod.Spec.Containers) +} + +func stripWorkloadAnnotations(annotations map[string]string) { + for k := range annotations { + if strings.HasPrefix(k, WorkloadsAnnotationPrefix) { + delete(annotations, k) + } + if strings.HasPrefix(k, ContainerAnnotationPrefix) { + delete(annotations, k) + } + } +} diff --git a/pkg/kubelet/managed/managed_test.go b/pkg/kubelet/managed/managed_test.go new file mode 100644 index 0000000000000..16acda0868cdb --- /dev/null +++ b/pkg/kubelet/managed/managed_test.go @@ -0,0 +1,675 @@ +package managed + +import ( + "fmt" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestModifyStaticPodForPinnedManagementErrorStates(t *testing.T) { + + workloadAnnotations := map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + } + + testCases := []struct { + pod *v1.Pod + expectedError error + }{ + { + pod: createPod(workloadAnnotations, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: nil, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have Resource.Requests"), + }, + { + pod: createPod(workloadAnnotations, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have cpu requests"), + }, + { + pod: createPod(workloadAnnotations, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have memory requests"), + }, + { + pod: createPod(workloadAnnotations, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, nil), + expectedError: fmt.Errorf("managed container nginx does not have cpu requests"), + }, + { + pod: createPod(workloadAnnotations, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + }, + }, + }, nil), + expectedError: fmt.Errorf("managed container nginx does not have memory requests"), + }, + { + pod: createPod(nil, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have cpu requests"), + }, + { + pod: createPod(map[string]string{"something": "else"}, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have memory requests"), + }, + } + + for _, tc := range testCases { + pod, workloadName, err := ModifyStaticPodForPinnedManagement(tc.pod) + if err != nil && err.Error() != tc.expectedError.Error() { + t.Errorf("ModifyStaticPodForPinned got error of (%v) but expected (%v)", err, tc.expectedError) + } + if pod != nil { + t.Errorf("ModifyStaticPodForPinned should return pod with nil value") + } + if workloadName != "" { + t.Errorf("ModifyStaticPodForPinned should return empty workloadName but got %v", workloadName) + } + } +} + +func TestStaticPodManaged(t *testing.T) { + testCases := []struct { + pod *v1.Pod + expectedAnnotations map[string]string + isGuaranteed bool + }{ + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/nginx": `{"cpushares":102}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c2", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c_3", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":102}`, + "resources.workload.openshift.io/c2": `{"cpushares":1024}`, + "resources.workload.openshift.io/c_3": `{"cpushares":1024}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("20m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":20}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":20}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("0m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + } + + for _, tc := range testCases { + pod, workloadName, err := ModifyStaticPodForPinnedManagement(tc.pod) + if err != nil { + t.Errorf("ModifyStaticPodForPinned should not error") + } + for expectedKey, expectedValue := range tc.expectedAnnotations { + value, exists := pod.Annotations[expectedKey] + if !exists { + t.Errorf("%v key not found", expectedKey) + } + if expectedValue != value { + t.Errorf("'%v' key's value does not equal '%v' and got '%v'", expectedKey, expectedValue, value) + } + } + for _, container := range pod.Spec.Containers { + if container.Resources.Requests.Cpu().String() != "0" && !tc.isGuaranteed { + t.Errorf("cpu requests should be 0 got %v", container.Resources.Requests.Cpu().String()) + } + if container.Resources.Requests.Memory().String() == "0" && !tc.isGuaranteed { + t.Errorf("memory requests were %v but should be %v", container.Resources.Requests.Memory().String(), container.Resources.Requests.Memory().String()) + } + if _, exists := container.Resources.Requests[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed capacity label missing from pod %v and container %v", tc.pod.Name, container.Name) + } + if _, exists := container.Resources.Limits[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed capacity label missing from pod %v and container %v limits", tc.pod.Name, container.Name) + } + } + } +} + +func TestStaticPodThrottle(t *testing.T) { + testCases := []struct { + pod *v1.Pod + expectedAnnotations map[string]string + isGuaranteed bool + }{ + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/nginx": `{"cpushares":102}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c2", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c_3", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":102}`, + "resources.workload.openshift.io/c2": `{"cpushares":1024}`, + "resources.workload.openshift.io/c_3": `{"cpushares":1024}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + } + + for _, tc := range testCases { + pod, workloadName, err := ModifyStaticPodForPinnedManagement(tc.pod) + if err != nil { + t.Errorf("ModifyStaticPodForPinned should not error") + } + for expectedKey, expectedValue := range tc.expectedAnnotations { + value, exists := pod.Annotations[expectedKey] + if !exists { + t.Errorf("%v key not found", expectedKey) + } + if expectedValue != value { + t.Errorf("'%v' key's value does not equal '%v' and got '%v'", expectedKey, expectedValue, value) + } + } + for _, container := range pod.Spec.Containers { + if container.Resources.Requests.Cpu().String() != "0" && !tc.isGuaranteed { + t.Errorf("cpu requests should be 0 got %v", container.Resources.Requests.Cpu().String()) + } + if container.Resources.Requests.Memory().String() == "0" && !tc.isGuaranteed { + t.Errorf("memory requests were %v but should be %v", container.Resources.Requests.Memory().String(), container.Resources.Requests.Memory().String()) + } + if _, exists := container.Resources.Requests[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed capacity label missing from pod %v and container %v", tc.pod.Name, container.Name) + } + if _, exists := container.Resources.Limits[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed limits capacity label missing from pod %v and container %v", tc.pod.Name, container.Name) + } + } + } +} + +func createPod(annotations map[string]string, initContainer, container *v1.Container) *v1.Pod { + pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: annotations, + }, + Spec: v1.PodSpec{ + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } + + if initContainer != nil { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, *initContainer) + } + + if container != nil { + pod.Spec.Containers = append(pod.Spec.Containers, *container) + } + + return pod +}