Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WaitForPodsReady: Reset the requeueState while reconciling #1838

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions charts/kueue/templates/webhook/webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,8 @@ webhooks:
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- workloads
- workloads/status
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
Expand Down
2 changes: 0 additions & 2 deletions config/components/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,8 @@ webhooks:
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- workloads
- workloads/status
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

// If a deactivated workload is re-activated, we need to reset the RequeueState.
if wl.Status.RequeueState != nil && ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(&wl) {
wl.Status.RequeueState = nil
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
}

if len(wl.ObjectMeta.OwnerReferences) == 0 && !wl.DeletionTimestamp.IsZero() {
return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, &wl)
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func setupWebhookForWorkload(mgr ctrl.Manager) error {
Complete()
}

// +kubebuilder:webhook:path=/mutate-kueue-x-k8s-io-v1beta1-workload,mutating=true,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads;workloads/status,verbs=create;update,versions=v1beta1,name=mworkload.kb.io,admissionReviewVersions=v1
// +kubebuilder:webhook:path=/mutate-kueue-x-k8s-io-v1beta1-workload,mutating=true,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads,verbs=create,versions=v1beta1,name=mworkload.kb.io,admissionReviewVersions=v1

var _ webhook.CustomDefaulter = &WorkloadWebhook{}

Expand All @@ -76,10 +76,6 @@ func (w *WorkloadWebhook) Default(ctx context.Context, obj runtime.Object) error
}
}

// If a deactivated workload is re-activated, we need to reset the RequeueState.
if ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(wl) && workload.HasRequeueState(wl) {
wl.Status.RequeueState = nil
}
return nil
}

Expand Down
16 changes: 0 additions & 16 deletions pkg/webhooks/workload_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,6 @@ func TestWorkloadWebhookDefault(t *testing.T) {
},
},
},
"re-activated workload with re-queue state is reset the re-queue state": {
wl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
}).RequeueState(ptr.To[int32](5), ptr.To(metav1.Now())).
Obj(),
wantWl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
}).
Obj(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
25 changes: 24 additions & 1 deletion test/integration/scheduler/podsready/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() {
// To avoid flakiness, we don't verify if the workload has a QuotaReserved=false with pending reason here.
})

ginkgo.It("Should re-admit a timed out workload and deactivate a workload exceeded the re-queue count limit", func() {
ginkgo.It("Should re-admit a timed out workload and deactivate a workload exceeded the re-queue count limit. After that re-activating a workload", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/activating/activate

may we consider a separate test for this, assuming that setting up the state isn't too difficult?

Copy link
Member Author

@tenzen-y tenzen-y Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to divide this test into two tests, but we need to start another manager to configure the backoffLimitCount=1 or 0 like this:

requeuingBackoffLimitCount = ptr.To[int32](2)

It could increase integration test time. So, to avoid increasing time, I implement this test like the current one.

ginkgo.By("create the 'prod' workload")
prodWl := testing.MakeWorkload("prod", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Obj()
gomega.Expect(k8sClient.Create(ctx, prodWl)).Should(gomega.Succeed())
Expand Down Expand Up @@ -276,6 +276,29 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl)).Should(gomega.Succeed())
g.Expect(ptr.Deref(prodWl.Spec.Active, true)).Should(gomega.BeFalse())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("verify the re-activated inactive 'prod' workload re-queue state is reset")
// TODO: Once we move a logic to issue the Eviction with InactiveWorkload reason, we need to remove the below updates.
// REF: https://github.com/kubernetes-sigs/kueue/issues/1841
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl)).Should(gomega.Succeed())
apimeta.SetStatusCondition(&prodWl.Status.Conditions, metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "evicted by Test",
})
g.Expect(k8sClient.Status().Update(ctx, prodWl)).Should(gomega.Succeed())
Comment on lines +285 to +291
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't the controller add this already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the jobs controller adds this condition here:

workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByDeactivation, "The workload is deactivated")
, we don't register the jobs controller to the manager.

cfg := &config.Configuration{
WaitForPodsReady: &config.WaitForPodsReady{
Enable: true,
BlockAdmission: &blockAdmission,
Timeout: &metav1.Duration{Duration: value},
RequeuingStrategy: &config.RequeuingStrategy{
Timestamp: ptr.To(requeuingTimestamp),
BackoffLimitCount: requeuingBackoffLimitCount,
},
},
}
mgr.GetScheme().Default(cfg)
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
cCache := cache.New(mgr.GetClient(), cache.WithPodsReadyTracking(cfg.WaitForPodsReady.Enable && cfg.WaitForPodsReady.BlockAdmission != nil && *cfg.WaitForPodsReady.BlockAdmission))
queues := queue.NewManager(
mgr.GetClient(), cCache,
queue.WithPodsReadyRequeuingTimestamp(requeuingTimestamp),
)
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
failedWebhook, err := webhooks.Setup(mgr)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)
err = workloadjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
sched := scheduler.New(
queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName),
scheduler.WithPodsReadyRequeuingTimestamp(requeuingTimestamp),
)
err = sched.Start(ctx)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just register the indexers:

err = workloadjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting...
We should probably move that piece of code to the Workload controller in a follow up. It doesn't belong in the job reconciler, as it is completely independent from the job. Otherwise, if anyone wants to write a custom integration, they would need to implement this.

Copy link
Member Author

@tenzen-y tenzen-y Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, but we need to remember why we put this here.
Let me open an issue for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO to remove this status update and link to #1841?

Copy link
Member Author

@tenzen-y tenzen-y Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Thank you for raising it!

}, util.Timeout, util.Interval).Should(gomega.Succeed(), "Job reconciler should add an Evicted condition with InactiveWorkload to the Workload")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl)).Should(gomega.Succeed())
prodWl.Spec.Active = ptr.To(true)
g.Expect(k8sClient.Update(ctx, prodWl)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), "Reactivate inactive Workload")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl)).Should(gomega.Succeed())
g.Expect(prodWl.Status.RequeueState).Should(gomega.BeNil())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we verify at the start of this sequence that the Requeue state is in a certain configuration (non-nil, or something more specific), in case the previous part of test changes?

Not relevant if this test is split out and this state is set explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already verify that here:

util.ExpectWorkloadToHaveRequeueCount(ctx, k8sClient, client.ObjectKeyFromObject(prodWl), ptr.To[int32](2))

Does it make sense?

}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should unblock admission of new workloads in other ClusterQueues once the admitted workload exceeds timeout", func() {
Expand Down
31 changes: 0 additions & 31 deletions test/integration/webhook/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,37 +82,6 @@ var _ = ginkgo.Describe("Workload defaulting webhook", func() {

gomega.Expect(created.Spec.PodSets[0].Name).Should(gomega.Equal(kueue.DefaultPodSetName))
})
ginkgo.It("Should reset re-queue state", func() {
ginkgo.By("Creating a new inactive Workload")
workload := testing.MakeWorkload(workloadName, ns.Name).
Active(false).
Obj()
gomega.Expect(k8sClient.Create(ctx, workload)).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(workload), workload)).Should(gomega.Succeed())
workload.Status = kueue.WorkloadStatus{
Conditions: []metav1.Condition{{
Type: kueue.WorkloadEvicted,
Reason: kueue.WorkloadEvictedByDeactivation,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}},
RequeueState: &kueue.RequeueState{
Count: ptr.To[int32](10),
RequeueAt: ptr.To(metav1.Now()),
},
}
g.Expect(k8sClient.Status().Update(ctx, workload)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
ginkgo.By("Activate a Workload")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(workload), workload)).Should(gomega.Succeed())
workload.Spec.Active = ptr.To(true)
g.Expect(k8sClient.Update(ctx, workload)).Should(gomega.Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(workload), workload)).Should(gomega.Succeed())
g.Expect(workload.Status.RequeueState).Should(gomega.BeNil(), "re-queue state should be reset")
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
})

Expand Down