Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding toleration to the job doesn't trigger workload change #1304

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 58 additions & 36 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/maps"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -511,7 +511,13 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
}

if _, finished := job.Finished(); !finished {
if err := r.stopJob(ctx, job, w, StopReasonNoMatchingWorkload, "No matching Workload"); err != nil {
var msg string
if w == nil {
msg = "Missing Workload; unable to restore pod templates"
} else {
msg = "No matching Workload; restoring pod templates according to existent Workload"
}
if err := r.stopJob(ctx, job, w, StopReasonNoMatchingWorkload, msg); err != nil {
return nil, fmt.Errorf("stopping job with no matching workload: %w", err)
}
}
Expand Down Expand Up @@ -562,7 +568,7 @@ func FindMatchingWorkloads(ctx context.Context, c client.Client, job GenericJob)

for i := range workloads.Items {
w := &workloads.Items[i]
if match == nil && equivalentToWorkload(job, w) {
if match == nil && equivalentToWorkload(ctx, c, job, w) {
match = w
} else {
toDelete = append(toDelete, w)
Expand Down Expand Up @@ -594,7 +600,7 @@ func (r *JobReconciler) ensurePrebuiltWorkloadOwnership(ctx context.Context, wl
}

func (r *JobReconciler) ensurePrebuiltWorkloadInSync(ctx context.Context, wl *kueue.Workload, job GenericJob) (bool, error) {
if !equivalentToWorkload(job, wl) {
if !equivalentToWorkload(ctx, r.client, job, wl) {
// mark the workload as finished
err := workload.UpdateStatus(ctx, r.client, wl,
kueue.WorkloadFinished,
Expand All @@ -607,8 +613,39 @@ func (r *JobReconciler) ensurePrebuiltWorkloadInSync(ctx context.Context, wl *ku
return true, nil
}

// expectedRunningPodSets gets the expected podsets during the job execution, returns nil if the workload has no reservation or
// the admission does not match.
func expectedRunningPodSets(ctx context.Context, c client.Client, wl *kueue.Workload) []kueue.PodSet {
if !workload.HasQuotaReservation(wl) {
return nil
}
info, err := getPodSetsInfoFromStatus(ctx, c, wl)
if err != nil {
return nil
}
infoMap := slices.ToRefMap(info, func(psi *podset.PodSetInfo) string { return psi.Name })
runningPodSets := wl.Spec.DeepCopy().PodSets
canBePartiallyAdmitted := workload.CanBePartiallyAdmitted(wl)
for i := range runningPodSets {
ps := &runningPodSets[i]
psi, found := infoMap[ps.Name]
if !found {
return nil
}
err := podset.Merge(&ps.Template.ObjectMeta, &ps.Template.Spec, *psi)
if err != nil {
return nil
}
if canBePartiallyAdmitted && ps.MinCount != nil {
// update the expected running count
ps.Count = psi.Count
}
Comment on lines +639 to +642
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this just be:

psi.Count = psi.Count

?

If the value is set, admission already determined if partial admission was possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, if we are not using partial admission we need to be strict about the count

}
return runningPodSets
}

// equivalentToWorkload checks if the job corresponds to the workload
func equivalentToWorkload(job GenericJob, wl *kueue.Workload) bool {
func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob, wl *kueue.Workload) bool {
owner := metav1.GetControllerOf(wl)
// Indexes don't work in unit tests, so we explicitly check for the
// owner here.
Expand All @@ -618,33 +655,18 @@ func equivalentToWorkload(job GenericJob, wl *kueue.Workload) bool {

jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets())

if !workload.CanBePartiallyAdmitted(wl) || !workload.HasQuotaReservation(wl) {
// the two sets should fully match.
return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, true)
}

// Check everything but the pod counts.
if !equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, false) {
return false
}

// If the workload is admitted but the job is suspended, ignore counts.
// This might allow some violating jobs to pass equivalency checks, but their
// workloads would be invalidated in the next sync after unsuspending.
Comment on lines -631 to -633
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a version of this comment should be left before return job.IsSuspended() && equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets)

I guess we are saying that, if the job is suspended, it can match either the running or the base specs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if job.IsSuspended() {
return true
}

for i, psAssigment := range wl.Status.Admission.PodSetAssignments {
assignedCount := wl.Spec.PodSets[i].Count
if jobPodSets[i].MinCount != nil {
assignedCount = ptr.Deref(psAssigment.Count, assignedCount)
}
if jobPodSets[i].Count != assignedCount {
return false
if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil {
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) {
return true
}
// If the workload is admitted but the job is suspended, do the check
// against the non-running info.
// This might allow some violating jobs to pass equivalency checks, but their
// workloads would be invalidated in the next sync after unsuspending.
return job.IsSuspended() && equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets)
}
return true

return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets)
}

func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) {
Expand All @@ -668,7 +690,7 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi

// startJob will unsuspend the job, and also inject the node affinity.
func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) error {
info, err := r.getPodSetsInfoFromStatus(ctx, wl)
info, err := getPodSetsInfoFromStatus(ctx, r.client, wl)
if err != nil {
return err
}
Expand Down Expand Up @@ -703,7 +725,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
if jws, implements := job.(JobWithCustomStop); implements {
stoppedNow, err := jws.Stop(ctx, r.client, info, stopReason, eventMsg)
if stoppedNow {
r.record.Eventf(object, corev1.EventTypeNormal, ReasonStopped, eventMsg)
r.record.Event(object, corev1.EventTypeNormal, ReasonStopped, eventMsg)
}

return err
Expand All @@ -721,7 +743,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
return err
}

r.record.Eventf(object, corev1.EventTypeNormal, ReasonStopped, eventMsg)
r.record.Event(object, corev1.EventTypeNormal, ReasonStopped, eventMsg)
return nil
}

Expand Down Expand Up @@ -819,15 +841,15 @@ func extractPriorityFromPodSets(podSets []kueue.PodSet) string {

// getPodSetsInfoFromStatus extracts podSetsInfo from workload status, based on
// admission, and admission checks.
func (r *JobReconciler) getPodSetsInfoFromStatus(ctx context.Context, w *kueue.Workload) ([]podset.PodSetInfo, error) {
func getPodSetsInfoFromStatus(ctx context.Context, c client.Client, w *kueue.Workload) ([]podset.PodSetInfo, error) {
if len(w.Status.Admission.PodSetAssignments) == 0 {
return nil, nil
}

podSetsInfo := make([]podset.PodSetInfo, len(w.Status.Admission.PodSetAssignments))

for i, podSetFlavor := range w.Status.Admission.PodSetAssignments {
info, err := podset.FromAssignment(ctx, r.client, &podSetFlavor, w.Spec.PodSets[i].Count)
info, err := podset.FromAssignment(ctx, c, &podSetFlavor, w.Spec.PodSets[i].Count)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -921,7 +943,7 @@ func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo {
return nil
}

return utilslices.Map(wl.Spec.PodSets, podset.FromPodSet)
return slices.Map(wl.Spec.PodSets, podset.FromPodSet)

}

Expand Down
Loading