Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Publish an pod event when the pod resource requests exceed throrttle's threshold for warning users #43

Merged
merged 2 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,4 @@ e2e: fmt lint e2e-setup
e2e-debug: fmt lint e2e-setup
GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=$(E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT) \
GOMEGA_DEFAULT_CONSISTENTLY_DURATION=$(E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION) \
dlv test --headless --listen=0.0.0.0:2345 --api-version=2 --log ./test/integration -- --kubeconfig=$(E2E_KIND_KUBECNOFIG)
dlv test --headless --listen=0.0.0.0:2345 --api-version=2 --log ./test/integration -- --kubeconfig=$(E2E_KIND_KUBECNOFIG) --pause-image=$(E2E_PAUSE_IMAGE)
12 changes: 8 additions & 4 deletions pkg/apis/schedule/v1alpha1/clusterthrottle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ type ClusterThrottleSpec struct {
}

func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

threshold := thr.Spec.Threshold
if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() {
threshold = thr.Status.CalculatedThreshold.Threshold
}

if threshold.IsThrottled(ResourceAmountOfPod(pod), false).IsThrottledFor(pod) {
return CheckThrottleStatusPodRequestsExceedsThreshold
}

if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

alreadyUsed := ResourceAmount{}.Add(thr.Status.Used).Add(reservedResourceAmount)
if threshold.IsThrottled(alreadyUsed, isThrottledOnEqual).IsThrottledFor(pod) {
return CheckThrottleStatusActive
Expand Down
19 changes: 12 additions & 7 deletions pkg/apis/schedule/v1alpha1/throttle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,26 @@ type ThrottleStatus struct {
type CheckThrottleStatus string

var (
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold"
)

func (thr Throttle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

threshold := thr.Spec.Threshold
if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() {
threshold = thr.Status.CalculatedThreshold.Threshold
}

if threshold.IsThrottled(ResourceAmountOfPod(pod), false).IsThrottledFor(pod) {
return CheckThrottleStatusPodRequestsExceedsThreshold
}

if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

alreadyUsed := ResourceAmount{}.Add(thr.Status.Used).Add(reservedResourceAmount)
if threshold.IsThrottled(alreadyUsed, true).IsThrottledFor(pod) {
return CheckThrottleStatusActive
Expand Down
18 changes: 15 additions & 3 deletions pkg/controllers/clusterthrottle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,24 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *v1.Pod, thr
return removed
}

func (c *ClusterThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool) ([]schedulev1alpha1.ClusterThrottle, []schedulev1alpha1.ClusterThrottle, []schedulev1alpha1.ClusterThrottle, error) {
func (c *ClusterThrottleController) CheckThrottled(
pod *v1.Pod,
isThrottledOnEqual bool,
) (
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
error,
) {
throttles, err := c.affectedClusterThrottles(pod)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
affected := []schedulev1alpha1.ClusterThrottle{}
alreadyThrottled := []schedulev1alpha1.ClusterThrottle{}
insufficient := []schedulev1alpha1.ClusterThrottle{}
podRequestsExceedsThreshold := []schedulev1alpha1.ClusterThrottle{}
for _, thr := range throttles {
affected = append(affected, *thr)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})
Expand All @@ -409,9 +419,11 @@ func (c *ClusterThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqu
alreadyThrottled = append(alreadyThrottled, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficient:
insufficient = append(insufficient, *thr)
case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold:
podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr)
}
}
return alreadyThrottled, insufficient, affected, nil
return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil
}

func (c *ClusterThrottleController) setupEventHandler() {
Expand Down
19 changes: 16 additions & 3 deletions pkg/controllers/throttle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,25 @@ func (c *ThrottleController) UnReserveOnThrottle(pod *v1.Pod, thr *schedulev1alp
return removed
}

func (c *ThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool) ([]schedulev1alpha1.Throttle, []schedulev1alpha1.Throttle, []schedulev1alpha1.Throttle, error) {
func (c *ThrottleController) CheckThrottled(
pod *v1.Pod,
isThrottledOnEqual bool,
) (
[]schedulev1alpha1.Throttle,
[]schedulev1alpha1.Throttle,
[]schedulev1alpha1.Throttle,
[]schedulev1alpha1.Throttle,
error,
) {
throttles, err := c.affectedThrottles(pod)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
affected := []schedulev1alpha1.Throttle{}
alreadyThrottled := []schedulev1alpha1.Throttle{}
insufficient := []schedulev1alpha1.Throttle{}
podRequestsExceedsThreshold := []schedulev1alpha1.Throttle{}

for _, thr := range throttles {
affected = append(affected, *thr)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})
Expand All @@ -380,9 +391,11 @@ func (c *ThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool
alreadyThrottled = append(alreadyThrottled, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficient:
insufficient = append(insufficient, *thr)
case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold:
podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr)
}
}
return alreadyThrottled, insufficient, affected, nil
return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil
}

func (c *ThrottleController) setupEventHandler() {
Expand Down
36 changes: 31 additions & 5 deletions pkg/scheduler_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,55 @@ func (pl *KubeThrottler) PreFilter(
state *framework.CycleState,
pod *v1.Pod,
) *framework.Status {
thrActive, thrInsufficient, thrAffected, err := pl.throttleCtr.CheckThrottled(pod, false)
thrActive, thrInsufficient, thrPodRequestsExceeds, thrAffected, err := pl.throttleCtr.CheckThrottled(pod, false)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
klog.V(2).InfoS("PreFilter: throttle check result",
"Pod", pod.Namespace+"/"+pod.Name,
"#ActiveThrottles", len(thrActive), "#InsufficientThrottles", len(thrInsufficient), "#AffectedThrottles", len(thrAffected),
"#ActiveThrottles", len(thrActive),
"#InsufficientThrottles", len(thrInsufficient),
"#PodRequestsExceedsThresholdThrottles", len(thrPodRequestsExceeds),
"#AffectedThrottles", len(thrAffected),
)

clthrActive, clthrInsufficient, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false)
clthrActive, clthrInsufficient, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
klog.V(2).InfoS("PreFilter: clusterthrottle check result",
"Pod", pod.Namespace+"/"+pod.Name,
"#ActiveClusterThrottles", len(clthrActive), "#InsufficientClusterThrottles", len(clthrInsufficient), "#AffectedClusterThrottles", len(clThrAffected),
"#ActiveClusterThrottles", len(clthrActive),
"#InsufficientClusterThrottles", len(clthrInsufficient),
"#PodRequestsExceedsThresholdClusterThrottles", len(clthrPodRequestsExceeds),
"#AffectedClusterThrottles", len(clThrAffected),
)

if len(thrActive)+len(thrInsufficient)+len(clthrActive)+len(clthrInsufficient) == 0 {
if len(thrActive)+len(thrInsufficient)+len(thrPodRequestsExceeds)+
len(clthrActive)+len(clthrInsufficient)+len(clthrPodRequestsExceeds) == 0 {
return framework.NewStatus(framework.Success)
}

reasons := []string{}
if len(clthrPodRequestsExceeds) > 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold, strings.Join(clusterThrottleNames(clthrPodRequestsExceeds), ",")))
}
if len(thrPodRequestsExceeds) > 0 {
reasons = append(reasons, fmt.Sprintf("throttle[%s]=%s", schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold, strings.Join(throttleNames(thrPodRequestsExceeds), ",")))
}
if len(clthrPodRequestsExceeds)+len(thrPodRequestsExceeds) > 0 {
pl.fh.EventRecorder().Eventf(
pod, nil,
v1.EventTypeWarning,
"ResourceRequestsExceedsThrottleThreshold",
pl.Name(),
"It won't be scheduled unless decreasing resource requests or increasing ClusterThrottle/Throttle threshold because its resource requests exceeds their thresholds: %s",
strings.Join(
append(clusterThrottleNames(clthrPodRequestsExceeds), throttleNames(thrPodRequestsExceeds)...),
",",
),
)
}
if len(clthrActive) > 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusActive, strings.Join(clusterThrottleNames(clthrActive), ",")))
}
Expand Down
19 changes: 19 additions & 0 deletions test/integration/clusterthrottle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,25 @@ var _ = Describe("Clusterthrottle Test", func() {
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod2.Name)).Should(Succeed())
})
})
Context("ResourceRequest (pod-requests-exceeds-threshold)", func() {
var pod *corev1.Pod
BeforeEach(func() {
pod = MustCreatePod(ctx, MakePod(DefaultNs, "pod", "1.1").Label(throttleKey, throttleName).Obj())
})
It("should not schedule pod", func() {
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ClusterThottleHasStatus(
ctx, thr.Name,
ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ClthrOpts.WithPodThrottled(false), ClthrOpts.WithCpuThrottled(false),
),
MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold),
MustPodResourceRequestsExceedsThrottleThreshold(ctx, DefaultNs, pod.Name, thr.Namespace+"/"+thr.Name),
)).Should(Succeed())
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed())
})
})
})

When("Many pods are created at once", func() {
Expand Down
1 change: 1 addition & 0 deletions test/integration/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestIntegration(t *testing.T) {
}

var _ = BeforeSuite(func() {
flag.Parse()
Expect(kubeConfigPath).NotTo(BeEmpty())
Expect(pauseImage).NotTo(BeEmpty())
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
Expand Down
19 changes: 19 additions & 0 deletions test/integration/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,25 @@ var _ = Describe("Throttle Test", func() {
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod2.Name)).Should(Succeed())
})
})
Context("ResourceRequest (pod-requests-exceeds-threshold)", func() {
var pod *corev1.Pod
BeforeEach(func() {
pod = MustCreatePod(ctx, MakePod(DefaultNs, "pod", "1.1").Label(throttleKey, throttleName).Obj())
})
It("should not schedule pod", func() {
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ThrottleHasStatus(
ctx, DefaultNs, thr.Name,
ThOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ThOpts.WithPodThrottled(false), ThOpts.WithCpuThrottled(false),
),
MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold),
MustPodResourceRequestsExceedsThrottleThreshold(ctx, DefaultNs, pod.Name, thr.Namespace+"/"+thr.Name),
)).Should(Succeed())
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed())
})
})
})

When("Many pods are created at once", func() {
Expand Down
13 changes: 13 additions & 0 deletions test/integration/util_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ func MustPodFailedScheduling(ctx context.Context, ns, n string, throttleStatus v
}
}

func MustPodResourceRequestsExceedsThrottleThreshold(ctx context.Context, ns, n, throttleNamespacedName string) func(g Gomega) {
return func(g Gomega) {
events, err := k8sCli.CoreV1().Events(ns).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s,reason=ResourceRequestsExceedsThrottleThreshold", n),
})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(events.Items).Should(ContainElement(WithTransform(
func(e corev1.Event) string { return e.Message },
ContainSubstring(throttleNamespacedName),
)))
}
}

func PodIsScheduled(ctx context.Context, namespace, name string) func(g Gomega) {
return func(g Gomega) {
got, err := k8sCli.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
Expand Down