From b7897eed4246d4493f2f837c780dac92aa365ad1 Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Tue, 11 Jan 2022 18:29:00 +0900 Subject: [PATCH 1/2] flag should be parsed --- Makefile | 2 +- test/integration/integration_suite_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 9579f92..0e3c4f0 100644 --- a/Makefile +++ b/Makefile @@ -181,4 +181,4 @@ e2e: fmt lint e2e-setup e2e-debug: fmt lint e2e-setup GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=$(E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT) \ GOMEGA_DEFAULT_CONSISTENTLY_DURATION=$(E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION) \ - dlv test --headless --listen=0.0.0.0:2345 --api-version=2 --log ./test/integration -- --kubeconfig=$(E2E_KIND_KUBECNOFIG) + dlv test --headless --listen=0.0.0.0:2345 --api-version=2 --log ./test/integration -- --kubeconfig=$(E2E_KIND_KUBECNOFIG) --pause-image=$(E2E_PAUSE_IMAGE) diff --git a/test/integration/integration_suite_test.go b/test/integration/integration_suite_test.go index 998a488..beb1ab1 100644 --- a/test/integration/integration_suite_test.go +++ b/test/integration/integration_suite_test.go @@ -67,6 +67,7 @@ func TestIntegration(t *testing.T) { } var _ = BeforeSuite(func() { + flag.Parse() Expect(kubeConfigPath).NotTo(BeEmpty()) Expect(pauseImage).NotTo(BeEmpty()) restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) From 0949469e4bdb2bc816c410b680740c043448608c Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Tue, 11 Jan 2022 18:29:13 +0900 Subject: [PATCH 2/2] Publish an pod event when the pod's resource requests exceeds throttles' threshold --- .../v1alpha1/clusterthrottle_types.go | 12 ++++--- pkg/apis/schedule/v1alpha1/throttle_types.go | 19 ++++++---- pkg/controllers/clusterthrottle_controller.go | 18 ++++++++-- pkg/controllers/throttle_controller.go | 19 ++++++++-- pkg/scheduler_plugin/plugin.go | 36 ++++++++++++++++--- test/integration/clusterthrottle_test.go | 19 ++++++++++ test/integration/throttle_test.go | 19 ++++++++++ test/integration/util_pod_test.go | 13 +++++++ 8 files changed, 133 insertions(+), 22 deletions(-) diff --git a/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go b/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go index 7a8d567..cdc67f1 100644 --- a/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go +++ b/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go @@ -28,15 +28,19 @@ type ClusterThrottleSpec struct { } func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus { - if thr.Status.Throttled.IsThrottledFor(pod) { - return CheckThrottleStatusActive - } - threshold := thr.Spec.Threshold if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() { threshold = thr.Status.CalculatedThreshold.Threshold } + if threshold.IsThrottled(ResourceAmountOfPod(pod), false).IsThrottledFor(pod) { + return CheckThrottleStatusPodRequestsExceedsThreshold + } + + if thr.Status.Throttled.IsThrottledFor(pod) { + return CheckThrottleStatusActive + } + alreadyUsed := ResourceAmount{}.Add(thr.Status.Used).Add(reservedResourceAmount) if threshold.IsThrottled(alreadyUsed, isThrottledOnEqual).IsThrottledFor(pod) { return CheckThrottleStatusActive diff --git a/pkg/apis/schedule/v1alpha1/throttle_types.go b/pkg/apis/schedule/v1alpha1/throttle_types.go index 660ca70..9a7ce47 100644 --- a/pkg/apis/schedule/v1alpha1/throttle_types.go +++ b/pkg/apis/schedule/v1alpha1/throttle_types.go @@ -119,21 +119,26 @@ type ThrottleStatus struct { type CheckThrottleStatus string var ( - CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled" - CheckThrottleStatusActive CheckThrottleStatus = "active" - CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient" + CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled" + CheckThrottleStatusActive CheckThrottleStatus = "active" + CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient" + CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold" ) func (thr Throttle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus { - if thr.Status.Throttled.IsThrottledFor(pod) { - return CheckThrottleStatusActive - } - threshold := thr.Spec.Threshold if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() { threshold = thr.Status.CalculatedThreshold.Threshold } + if threshold.IsThrottled(ResourceAmountOfPod(pod), false).IsThrottledFor(pod) { + return CheckThrottleStatusPodRequestsExceedsThreshold + } + + if thr.Status.Throttled.IsThrottledFor(pod) { + return CheckThrottleStatusActive + } + alreadyUsed := ResourceAmount{}.Add(thr.Status.Used).Add(reservedResourceAmount) if threshold.IsThrottled(alreadyUsed, true).IsThrottledFor(pod) { return CheckThrottleStatusActive diff --git a/pkg/controllers/clusterthrottle_controller.go b/pkg/controllers/clusterthrottle_controller.go index 52036c9..829c27d 100644 --- a/pkg/controllers/clusterthrottle_controller.go +++ b/pkg/controllers/clusterthrottle_controller.go @@ -377,14 +377,24 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *v1.Pod, thr return removed } -func (c *ClusterThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool) ([]schedulev1alpha1.ClusterThrottle, []schedulev1alpha1.ClusterThrottle, []schedulev1alpha1.ClusterThrottle, error) { +func (c *ClusterThrottleController) CheckThrottled( + pod *v1.Pod, + isThrottledOnEqual bool, +) ( + []schedulev1alpha1.ClusterThrottle, + []schedulev1alpha1.ClusterThrottle, + []schedulev1alpha1.ClusterThrottle, + []schedulev1alpha1.ClusterThrottle, + error, +) { throttles, err := c.affectedClusterThrottles(pod) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } affected := []schedulev1alpha1.ClusterThrottle{} alreadyThrottled := []schedulev1alpha1.ClusterThrottle{} insufficient := []schedulev1alpha1.ClusterThrottle{} + podRequestsExceedsThreshold := []schedulev1alpha1.ClusterThrottle{} for _, thr := range throttles { affected = append(affected, *thr) reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}) @@ -409,9 +419,11 @@ func (c *ClusterThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqu alreadyThrottled = append(alreadyThrottled, *thr) case schedulev1alpha1.CheckThrottleStatusInsufficient: insufficient = append(insufficient, *thr) + case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold: + podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr) } } - return alreadyThrottled, insufficient, affected, nil + return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil } func (c *ClusterThrottleController) setupEventHandler() { diff --git a/pkg/controllers/throttle_controller.go b/pkg/controllers/throttle_controller.go index e8850e6..39fe362 100644 --- a/pkg/controllers/throttle_controller.go +++ b/pkg/controllers/throttle_controller.go @@ -348,14 +348,25 @@ func (c *ThrottleController) UnReserveOnThrottle(pod *v1.Pod, thr *schedulev1alp return removed } -func (c *ThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool) ([]schedulev1alpha1.Throttle, []schedulev1alpha1.Throttle, []schedulev1alpha1.Throttle, error) { +func (c *ThrottleController) CheckThrottled( + pod *v1.Pod, + isThrottledOnEqual bool, +) ( + []schedulev1alpha1.Throttle, + []schedulev1alpha1.Throttle, + []schedulev1alpha1.Throttle, + []schedulev1alpha1.Throttle, + error, +) { throttles, err := c.affectedThrottles(pod) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } affected := []schedulev1alpha1.Throttle{} alreadyThrottled := []schedulev1alpha1.Throttle{} insufficient := []schedulev1alpha1.Throttle{} + podRequestsExceedsThreshold := []schedulev1alpha1.Throttle{} + for _, thr := range throttles { affected = append(affected, *thr) reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}) @@ -380,9 +391,11 @@ func (c *ThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool alreadyThrottled = append(alreadyThrottled, *thr) case schedulev1alpha1.CheckThrottleStatusInsufficient: insufficient = append(insufficient, *thr) + case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold: + podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr) } } - return alreadyThrottled, insufficient, affected, nil + return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil } func (c *ThrottleController) setupEventHandler() { diff --git a/pkg/scheduler_plugin/plugin.go b/pkg/scheduler_plugin/plugin.go index 8250d85..c181951 100644 --- a/pkg/scheduler_plugin/plugin.go +++ b/pkg/scheduler_plugin/plugin.go @@ -150,29 +150,55 @@ func (pl *KubeThrottler) PreFilter( state *framework.CycleState, pod *v1.Pod, ) *framework.Status { - thrActive, thrInsufficient, thrAffected, err := pl.throttleCtr.CheckThrottled(pod, false) + thrActive, thrInsufficient, thrPodRequestsExceeds, thrAffected, err := pl.throttleCtr.CheckThrottled(pod, false) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } klog.V(2).InfoS("PreFilter: throttle check result", "Pod", pod.Namespace+"/"+pod.Name, - "#ActiveThrottles", len(thrActive), "#InsufficientThrottles", len(thrInsufficient), "#AffectedThrottles", len(thrAffected), + "#ActiveThrottles", len(thrActive), + "#InsufficientThrottles", len(thrInsufficient), + "#PodRequestsExceedsThresholdThrottles", len(thrPodRequestsExceeds), + "#AffectedThrottles", len(thrAffected), ) - clthrActive, clthrInsufficient, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false) + clthrActive, clthrInsufficient, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } klog.V(2).InfoS("PreFilter: clusterthrottle check result", "Pod", pod.Namespace+"/"+pod.Name, - "#ActiveClusterThrottles", len(clthrActive), "#InsufficientClusterThrottles", len(clthrInsufficient), "#AffectedClusterThrottles", len(clThrAffected), + "#ActiveClusterThrottles", len(clthrActive), + "#InsufficientClusterThrottles", len(clthrInsufficient), + "#PodRequestsExceedsThresholdClusterThrottles", len(clthrPodRequestsExceeds), + "#AffectedClusterThrottles", len(clThrAffected), ) - if len(thrActive)+len(thrInsufficient)+len(clthrActive)+len(clthrInsufficient) == 0 { + if len(thrActive)+len(thrInsufficient)+len(thrPodRequestsExceeds)+ + len(clthrActive)+len(clthrInsufficient)+len(clthrPodRequestsExceeds) == 0 { return framework.NewStatus(framework.Success) } reasons := []string{} + if len(clthrPodRequestsExceeds) > 0 { + reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold, strings.Join(clusterThrottleNames(clthrPodRequestsExceeds), ","))) + } + if len(thrPodRequestsExceeds) > 0 { + reasons = append(reasons, fmt.Sprintf("throttle[%s]=%s", schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold, strings.Join(throttleNames(thrPodRequestsExceeds), ","))) + } + if len(clthrPodRequestsExceeds)+len(thrPodRequestsExceeds) > 0 { + pl.fh.EventRecorder().Eventf( + pod, nil, + v1.EventTypeWarning, + "ResourceRequestsExceedsThrottleThreshold", + pl.Name(), + "It won't be scheduled unless decreasing resource requests or increasing ClusterThrottle/Throttle threshold because its resource requests exceeds their thresholds: %s", + strings.Join( + append(clusterThrottleNames(clthrPodRequestsExceeds), throttleNames(thrPodRequestsExceeds)...), + ",", + ), + ) + } if len(clthrActive) > 0 { reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusActive, strings.Join(clusterThrottleNames(clthrActive), ","))) } diff --git a/test/integration/clusterthrottle_test.go b/test/integration/clusterthrottle_test.go index 440eb7a..a8aa4a8 100644 --- a/test/integration/clusterthrottle_test.go +++ b/test/integration/clusterthrottle_test.go @@ -142,6 +142,25 @@ var _ = Describe("Clusterthrottle Test", func() { Consistently(PodIsNotScheduled(ctx, DefaultNs, pod2.Name)).Should(Succeed()) }) }) + Context("ResourceRequest (pod-requests-exceeds-threshold)", func() { + var pod *corev1.Pod + BeforeEach(func() { + pod = MustCreatePod(ctx, MakePod(DefaultNs, "pod", "1.1").Label(throttleKey, throttleName).Obj()) + }) + It("should not schedule pod", func() { + Eventually(AsyncAll( + WakeupBackoffPod(ctx), + ClusterThottleHasStatus( + ctx, thr.Name, + ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold), + ClthrOpts.WithPodThrottled(false), ClthrOpts.WithCpuThrottled(false), + ), + MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold), + MustPodResourceRequestsExceedsThrottleThreshold(ctx, DefaultNs, pod.Name, thr.Namespace+"/"+thr.Name), + )).Should(Succeed()) + Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed()) + }) + }) }) When("Many pods are created at once", func() { diff --git a/test/integration/throttle_test.go b/test/integration/throttle_test.go index ea24b9e..ded9451 100644 --- a/test/integration/throttle_test.go +++ b/test/integration/throttle_test.go @@ -143,6 +143,25 @@ var _ = Describe("Throttle Test", func() { Consistently(PodIsNotScheduled(ctx, DefaultNs, pod2.Name)).Should(Succeed()) }) }) + Context("ResourceRequest (pod-requests-exceeds-threshold)", func() { + var pod *corev1.Pod + BeforeEach(func() { + pod = MustCreatePod(ctx, MakePod(DefaultNs, "pod", "1.1").Label(throttleKey, throttleName).Obj()) + }) + It("should not schedule pod", func() { + Eventually(AsyncAll( + WakeupBackoffPod(ctx), + ThrottleHasStatus( + ctx, DefaultNs, thr.Name, + ThOpts.WithCalculatedThreshold(thr.Spec.Threshold), + ThOpts.WithPodThrottled(false), ThOpts.WithCpuThrottled(false), + ), + MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold), + MustPodResourceRequestsExceedsThrottleThreshold(ctx, DefaultNs, pod.Name, thr.Namespace+"/"+thr.Name), + )).Should(Succeed()) + Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed()) + }) + }) }) When("Many pods are created at once", func() { diff --git a/test/integration/util_pod_test.go b/test/integration/util_pod_test.go index 31d8a7a..9b9d613 100644 --- a/test/integration/util_pod_test.go +++ b/test/integration/util_pod_test.go @@ -78,6 +78,19 @@ func MustPodFailedScheduling(ctx context.Context, ns, n string, throttleStatus v } } +func MustPodResourceRequestsExceedsThrottleThreshold(ctx context.Context, ns, n, throttleNamespacedName string) func(g Gomega) { + return func(g Gomega) { + events, err := k8sCli.CoreV1().Events(ns).List(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.name=%s,reason=ResourceRequestsExceedsThrottleThreshold", n), + }) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(events.Items).Should(ContainElement(WithTransform( + func(e corev1.Event) string { return e.Message }, + ContainSubstring(throttleNamespacedName), + ))) + } +} + func PodIsScheduled(ctx context.Context, namespace, name string) func(g Gomega) { return func(g Gomega) { got, err := k8sCli.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})