Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WaitForPodsReady: Reset .status.requeueState.count once the workload is deactivated #2219

46 changes: 31 additions & 15 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,6 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

if ptr.Deref(wl.Spec.Active, true) {
var updated bool

// If a deactivated workload is re-activated we need to reset the RequeueState.
if workload.IsEvictedByDeactivation(&wl) && wl.Status.RequeueState != nil {
wl.Status.RequeueState = nil
updated = true
}

if cond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadRequeued); cond != nil && cond.Status == metav1.ConditionFalse {
switch cond.Reason {
case kueue.WorkloadEvictedByDeactivation:
Expand All @@ -194,16 +187,39 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if updated {
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
}
} else if !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) {
// if job is not active and does not have condition reason of WorkloadEvictedByDeactivation, update its condition
workload.SetEvictedCondition(&wl, kueue.WorkloadEvictedByDeactivation, "The workload is deactivated")
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err)
} else {
var updated, evicted bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to declare evicted as *string as it is used to bump the metric.

I worry that if we have a bool, then someone can forget updating line 219 when adding a new scenario for deactivation. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimowo Uhm, do you assume that the evicted has the reason for eviction? (here is InactiveWorkload)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, never mind, I thought for another scenario that Pods ready, but in that case it would still be WorkloadInactive

Copy link
Member Author

@tenzen-y tenzen-y May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, after we introduce a dedicated reason, we may refactor here somehow, though.

if !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) {
message := "The workload is deactivated"

// The deactivation reason could be deduced as the maximum number of re-queuing retries if the workload met all criteria below:
// 1. The waitForPodsReady feature is enabled, which means that it has "PodsReady" condition.
// 2. The workload has already exceeded the PodsReadyTimeout.
// 3. The workload already has been re-queued previously, which means it doesn't have the requeueAt field.
// 4. The number of re-queued has already reached the waitForPodsReady.requeuingBackoffLimitCount.
if apimeta.IsStatusConditionFalse(wl.Status.Conditions, kueue.WorkloadPodsReady) &&
((!workload.HasRequeueState(&wl) && ptr.Equal(r.waitForPodsReady.requeuingBackoffLimitCount, ptr.To[int32](0))) ||
(workload.HasRequeueState(&wl) && wl.Status.RequeueState.RequeueAt == nil &&
ptr.Equal(wl.Status.RequeueState.Count, r.waitForPodsReady.requeuingBackoffLimitCount))) {
message = fmt.Sprintf("%s by exceeded the maximum number of re-queuing retries", message)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimowo @alculquicondor I added the dedicated message like this based on your recommendation. HDYT?
Also, let me try to add a dedicated reason in another enhancement PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the dedicated message is useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
message = fmt.Sprintf("%s by exceeded the maximum number of re-queuing retries", message)
message = fmt.Sprintf("%s due to exceeding the maximum number of re-queuing retries", message)

nit, I think it will read better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, thank you for this recommendation!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
workload.SetEvictedCondition(&wl, kueue.WorkloadEvictedByDeactivation, message)
updated = true
evicted = true
}
if wl.Status.Admission != nil {
metrics.ReportEvictedWorkloads(string(wl.Status.Admission.ClusterQueue), kueue.WorkloadEvictedByDeactivation)
if wl.Status.RequeueState != nil {
wl.Status.RequeueState = nil
updated = true
}
if updated {
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err)
}
if evicted && wl.Status.Admission != nil {
metrics.ReportEvictedWorkloads(string(wl.Status.Admission.ClusterQueue), kueue.WorkloadEvictedByDeactivation)
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}

cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
Expand Down
106 changes: 85 additions & 21 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ var (
workloadCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(
kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion", "Status.RequeueState.RequeueAt",
kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion",
),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"),
Expand Down Expand Up @@ -646,8 +646,6 @@ func TestReconcile(t *testing.T) {
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated",
}).
// The fake test not allow to save state with nil values when updating by Patch/Apply. So we are skipping this case.
// RequeueState(ptr.To[int32](4), ptr.To(metav1.NewTime(testStartTime.Truncate(time.Second)))).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Expand Down Expand Up @@ -700,7 +698,7 @@ func TestReconcile(t *testing.T) {
Reason: kueue.WorkloadBackoffFinished,
Message: "The workload backoff was finished",
}).
RequeueState(ptr.To[int32](1), nil).
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Truncate(time.Second)))).
Obj(),
},
"shouldn't set the WorkloadRequeued condition when backoff expires and workload finished": {
Expand Down Expand Up @@ -791,7 +789,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
"should set the Evicted condition with InactiveWorkload reason when the .spec.active=False and Admitted when the Workload has Evicted=False condition": {
"should set the Evicted condition with InactiveWorkload reason when the .spec.active is False, Admitted, and the Workload has Evicted=False condition": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Expand All @@ -815,7 +813,88 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
"should keep the previous eviction reason when the Workload is already evicted by other reason even thou the Workload is deactivated.": {
"[backoffLimitCount: 0] should set the Evicted condition with InactiveWorkload reason, exceeded the maximum number of requeue retries" +
"when the .spec.active is False, Admitted, the Workload has Evicted=False and PodsReady=False condition": {
reconcilerOpts: []Option{
WithWaitForPodsReady(&waitForPodsReadyConfig{
timeout: 3 * time.Second,
requeuingBackoffLimitCount: ptr.To[int32](0),
requeuingBackoffBaseSeconds: 10,
requeuingBackoffJitter: 0,
}),
},
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}).
Obj(),
},
"[backoffLimitCount: 100] should set the Evicted condition with InactiveWorkload reason, exceeded the maximum number of requeue retries" +
"when the .spec.active is False, Admitted, the Workload has Evicted=False and PodsReady=False condition, and the requeueState.count equals to backoffLimitCount": {
reconcilerOpts: []Option{
WithWaitForPodsReady(&waitForPodsReadyConfig{
timeout: 3 * time.Second,
requeuingBackoffLimitCount: ptr.To[int32](100),
requeuingBackoffBaseSeconds: 10,
requeuingBackoffJitter: 0,
}),
},
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
RequeueState(ptr.To[int32](100), nil).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}).
// The requeueState should be reset in the real cluster, but the fake client doesn't allow us to do it.
RequeueState(ptr.To[int32](100), nil).
Obj(),
},
"should keep the previous eviction reason when the Workload is already evicted by other reason even though the Workload is deactivated.": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Expand Down Expand Up @@ -882,24 +961,9 @@ func TestReconcile(t *testing.T) {
}
gotWorkload = nil
}

if diff := cmp.Diff(tc.wantWorkload, gotWorkload, workloadCmpOpts...); diff != "" {
t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff)
}

if tc.wantWorkload != nil {
if requeueState := tc.wantWorkload.Status.RequeueState; requeueState != nil && requeueState.RequeueAt != nil {
gotRequeueState := gotWorkload.Status.RequeueState
if gotRequeueState != nil && gotRequeueState.RequeueAt != nil {
if !gotRequeueState.RequeueAt.Equal(requeueState.RequeueAt) {
t.Errorf("Unexpected requeueState.requeueAt; gotRequeueAt %v needs to be after requeueAt %v", requeueState.RequeueAt, gotRequeueState.RequeueAt)
}
} else {
t.Errorf("Unexpected nil requeueState.requeuAt; requeueState.requeueAt shouldn't be nil")
}
}
}
Comment on lines -889 to -901
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This verification has been invalid since we introduced the fake clock.


if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
Expand Down
51 changes: 50 additions & 1 deletion test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
var _ = ginkgo.Describe("Job controller interacting with Workload controller when waitForPodsReady with requeuing strategy is enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
backoffBaseSeconds int32
backLimitCount *int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
backLimitCount *int32
backoffLimitCount *int32

nit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ns *corev1.Namespace
fl *kueue.ResourceFlavor
cq *kueue.ClusterQueue
Expand All @@ -1910,6 +1911,7 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
RequeuingStrategy: &configapi.RequeuingStrategy{
Timestamp: ptr.To(configapi.EvictionTimestamp),
BackoffBaseSeconds: ptr.To[int32](backoffBaseSeconds),
BackoffLimitCount: backLimitCount,
},
}
ctx, k8sClient = fwk.RunManager(cfg, managerAndControllersSetup(
Expand Down Expand Up @@ -2000,6 +2002,7 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
ginkgo.When("short backoffBaseSeconds", func() {
ginkgo.BeforeEach(func() {
backoffBaseSeconds = 1
backLimitCount = ptr.To[int32](1)
})

ginkgo.It("should re-queue a workload evicted due to PodsReady timeout after the backoff elapses", func() {
Expand All @@ -2011,9 +2014,10 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: job.Namespace}

ginkgo.By("admit the workload, it gets evicted due to PodsReadyTimeout and re-queued")
var admission *kueue.Admission
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
admission := testing.MakeAdmission(cq.Name).
admission = testing.MakeAdmission(cq.Name).
Assignment(corev1.ResourceCPU, "on-demand", "1m").
AssignmentPodCount(wl.Spec.PodSets[0].Count).
Obj()
Expand Down Expand Up @@ -2060,6 +2064,51 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
}, util.IgnoreConditionTimestampsAndObservedGeneration),
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("re-admit the workload to exceed the backoffLimitCount")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
g.Expect(util.SetQuotaReservation(ctx, k8sClient, wl, admission)).Should(gomega.Succeed())
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, wl)
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("checking the workload is evicted by deactivated due to PodsReadyTimeout")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
g.Expect(wl.Status.RequeueState).Should(gomega.BeNil())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's assert also spec.active: false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point! Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

g.Expect(wl.Status.Conditions).To(gomega.ContainElements(
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: kueue.WorkloadPodsReady,
Message: "Not all pods are ready or succeeded",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
Reason: "NoReservation",
Message: "The workload has no reservation",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadRequeued,
Status: metav1.ConditionFalse,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
})
Expand Down
10 changes: 4 additions & 6 deletions test/integration/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1795,9 +1795,7 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
g.Expect(ptr.Deref(wl.Spec.Active, true)).Should(gomega.BeFalse())
g.Expect(wl.Status.RequeueState).ShouldNot(gomega.BeNil())
g.Expect(wl.Status.RequeueState.Count).Should(gomega.Equal(ptr.To[int32](1)))
g.Expect(wl.Status.RequeueState.RequeueAt).Should(gomega.BeNil())
g.Expect(wl.Status.RequeueState).Should(gomega.BeNil())
g.Expect(wl.Status.Conditions).To(gomega.ContainElements(
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Expand All @@ -1809,13 +1807,13 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "The workload is deactivated",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Expand All @@ -1827,7 +1825,7 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe
Type: podcontroller.WorkloadWaitingForReplacementPods,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
Expand Down
Loading