Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pods] Finalize the replaced pods #1766

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions keps/976-plain-pods/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -565,25 +565,25 @@ in-memory Workload, then it is considered unmatching and the Workload is evicted
In the Pod-group reconciler:
1. If the Pod is not terminated and doesn't have a deletionTimestamp,
create a Workload for the pod group if one does not exist.
2. Remove Pod finalizers if the Pod is terminated and the Workload is finished, has a deletion
timestamp or is finished.
2. Remove Pod finalizers if:
- The Pod is terminated and the Workload is finished or has a deletion timestamp.
- The Pod Failed and a valid replacement pod was created for it.
3. Build the in-memory Workload. If its podset counters are greater than the stored Workload,
then evict the Workload.
4. For gated pods:
- remove the gate, set nodeSelector
5. If the number of succeeded pods is equal to the admission count, mark the Workload as Finished
and remove the finalizers from the Pods.

Note that we are only removing Pod finalizers once the Workload is finished. This is a simple way of
managing finalizers, but it might lead to too many Pods lingering in etcd for a long time after
terminated.

### Retrying Failed Pods

The Pod group will generally only be considered finished if all the Pods finish with a Succeeded
phase.
This allows the user to send replacement Pods when a Pod in the group fails or if the group is
preempted. The replacement Pods can have any name, but they must point to the same pod group.
Once a replacement Pod is created, and Kueue has added it as an owner of the Workload, the
Failed pod will be finalized. If multiple Pods have Failed, a new Pod is assumed to replace
the Pod that failed first.

To declare that a group is failed, a user can execute one of the following actions:
1. Issue a Delete for the Workload object. The controller would terminate all running Pods and
Expand Down Expand Up @@ -708,4 +708,4 @@ While this would be a clean approach, this proposal is targeting users that don'
wrapping their Pods, and adding one would be a bigger effort than adding annotations. Such amount
of effort could be similar to migrating from plain Pods to the Job API, which is already supported.

We could reconsider this based on user feedback.
We could reconsider this based on user feedback.
2 changes: 0 additions & 2 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ type ComposableJob interface {
FindMatchingWorkloads(ctx context.Context, c client.Client, r record.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
// Stop implements the custom stop procedure for ComposableJob
Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error)
// Ensure all members of the ComposableJob are owning the workload
EnsureWorkloadOwnedByAllMembers(ctx context.Context, c client.Client, r record.EventRecorder, workload *kueue.Workload) error
}

func QueueName(job GenericJob) string {
Expand Down
9 changes: 0 additions & 9 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,6 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

// Ensure all members of the composable job own the workload
if wl != nil {
if cj, implements := job.(ComposableJob); implements {
if err := cj.EnsureWorkloadOwnedByAllMembers(ctx, r.client, r.record, wl); err != nil {
return ctrl.Result{}, err
}
}
}

if wl != nil && apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
// Finalize the job if it's finished
if _, finished := job.Finished(); finished {
Expand Down
231 changes: 155 additions & 76 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,13 @@ func (p *Pod) Load(ctx context.Context, c client.Client, key *types.NamespacedNa
}

func (p *Pod) constructGroupPodSets() ([]kueue.PodSet, error) {
return constructGroupPodSets(p.list.Items)
}

func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) {
var resultPodSets []kueue.PodSet

for _, podInGroup := range p.list.Items {
for _, podInGroup := range pods {
if !isPodRunnableOrSucceeded(&podInGroup) {
continue
}
Expand Down Expand Up @@ -704,14 +708,12 @@ func (p *Pod) validatePodGroupMetadata(r record.EventRecorder, activePods []core

// runnableOrSucceededPods returns a slice of active pods in the group
func (p *Pod) runnableOrSucceededPods() []corev1.Pod {
activePods := make([]corev1.Pod, 0, len(p.list.Items))
for _, pod := range p.list.Items {
if isPodRunnableOrSucceeded(&pod) {
activePods = append(activePods, pod)
}
}
return utilslices.Pick(p.list.Items, isPodRunnableOrSucceeded)
}

return activePods
// notRunnableNorSucceededPods returns a slice of inactive pods in the group
func (p *Pod) notRunnableNorSucceededPods() []corev1.Pod {
return utilslices.Pick(p.list.Items, func(p *corev1.Pod) bool { return !isPodRunnableOrSucceeded(p) })
}

// isPodRunnableOrSucceeded returns whether the Pod can eventually run, is Running or Succeeded.
Expand All @@ -723,21 +725,55 @@ func isPodRunnableOrSucceeded(p *corev1.Pod) bool {
return p.Status.Phase != corev1.PodFailed
}

// cleanupExcessPods will delete and finalize pods created last if the number of
// activePods is greater than the totalCount value.
func (p *Pod) cleanupExcessPods(ctx context.Context, c client.Client, r record.EventRecorder, totalCount int, activePods []corev1.Pod) error {
log := ctrl.LoggerFrom(ctx)

extraPodsCount := len(activePods) - totalCount

if extraPodsCount <= 0 {
return nil
// lastActiveTime returns the last timestamp on which the pod was observed active:
// - the time the pod was declared Failed
// - the deletion time
func lastActiveTime(p *corev1.Pod) time.Time {
lastTransition := metav1.Now()
for _, c := range p.Status.Conditions {
if c.Type == corev1.ContainersReady {
if c.Status == corev1.ConditionFalse && c.Reason == string(corev1.PodFailed) {
lastTransition = c.LastTransitionTime
}
break
}
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
// Do not clean up more pods until observing previous operations
if !p.satisfiedExcessPods {
return errPendingOps
deletionTime := ptr.Deref(p.DeletionTimestamp, metav1.Now())
if lastTransition.Before(&deletionTime) {
return lastTransition.Time
}
return deletionTime.Time
}

// sortInactivePods - sorts the provided pods slice base on:
trasc marked this conversation as resolved.
Show resolved Hide resolved
// - finalizer state (pods with finalizers are first)
// - lastActiveTime (pods that ware active last are first)
trasc marked this conversation as resolved.
Show resolved Hide resolved
// - creation timestamp (newer pods are first)
func sortInactivePods(inactivePods []corev1.Pod) {
sort.Slice(inactivePods, func(i, j int) bool {
pi := &inactivePods[i]
pj := &inactivePods[j]
iFin := slices.Contains(pi.Finalizers, PodFinalizer)
jFin := slices.Contains(pj.Finalizers, PodFinalizer)
if iFin != jFin {
return iFin
}

iLastActive := lastActiveTime(pi)
jLastActive := lastActiveTime(pj)

if iLastActive.Equal(jLastActive) {
return pi.CreationTimestamp.Before(&pj.CreationTimestamp)
}
return jLastActive.Before(iLastActive)
})
}

// sortActivePods - sorts the provided pods slice base on:
trasc marked this conversation as resolved.
Show resolved Hide resolved
// - finalizer state (pods with no finalizers are last)
// - gated state (pods that are still gated are last)
// - creation timestamp (newer pods are last)
func sortActivePods(activePods []corev1.Pod) {
// Sort active pods by creation timestamp
sort.Slice(activePods, func(i, j int) bool {
pi := &activePods[i]
Expand All @@ -754,16 +790,22 @@ func (p *Pod) cleanupExcessPods(ctx context.Context, c client.Client, r record.E
if iGated != jGated {
return !iGated
}
return pi.ObjectMeta.CreationTimestamp.Before(&pj.ObjectMeta.CreationTimestamp)
return pi.CreationTimestamp.Before(&pj.CreationTimestamp)
})
}

func (p *Pod) removeExcessPods(ctx context.Context, c client.Client, r record.EventRecorder, extraPods []corev1.Pod) error {
if len(extraPods) == 0 {
return nil
}

log := ctrl.LoggerFrom(ctx)

// Extract all the latest created extra pods
extraPods := activePods[len(activePods)-extraPodsCount:]
extraPodsUIDs := utilslices.Map(extraPods, func(p *corev1.Pod) types.UID { return p.UID })
p.excessPodExpectations.ExpectUIDs(log, p.key, extraPodsUIDs)
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved

// Finalize and delete the active pods created last

err := parallelize.Until(ctx, len(extraPods), func(i int) error {
pod := extraPods[i]
if controllerutil.RemoveFinalizer(&pod, PodFinalizer) {
Expand All @@ -774,7 +816,7 @@ func (p *Pod) cleanupExcessPods(ctx context.Context, c client.Client, r record.E
return err
}
}
if pod.ObjectMeta.DeletionTimestamp.IsZero() {
if pod.DeletionTimestamp.IsZero() {
log.V(3).Info("Deleting excess pod in group", "excessPod", klog.KObj(&pod))
if err := c.Delete(ctx, &pod); err != nil {
// We won't observe this cleanup in the event handler.
Expand All @@ -788,31 +830,42 @@ func (p *Pod) cleanupExcessPods(ctx context.Context, c client.Client, r record.E
if err != nil {
return err
}
return nil
}

// Remove excess pods from the group list
newPodsInGroup := make([]corev1.Pod, 0, len(p.list.Items)-len(extraPods))
for i := range p.list.Items {
found := false
for j := range extraPods {
if p.list.Items[i].Name == extraPods[j].Name && p.list.Items[i].Namespace == extraPods[j].Namespace {
found = true
break
}
}
func (p *Pod) finalizePods(ctx context.Context, c client.Client, extraPods []corev1.Pod) error {
if len(extraPods) == 0 {
return nil
}

log := ctrl.LoggerFrom(ctx)

// Extract all the latest created extra pods
extraPodsUIDs := utilslices.Map(extraPods, func(p *corev1.Pod) types.UID { return p.UID })
p.excessPodExpectations.ExpectUIDs(log, p.key, extraPodsUIDs)

if !found {
newPodsInGroup = append(newPodsInGroup, p.list.Items[i])
err := parallelize.Until(ctx, len(extraPods), func(i int) error {
pod := extraPods[i]
if controllerutil.RemoveFinalizer(&pod, PodFinalizer) {
log.V(3).Info("Finalizing pod in group", "Pod", klog.KObj(&pod))
if err := c.Update(ctx, &pod); err != nil {
// We won't observe this cleanup in the event handler.
p.excessPodExpectations.ObservedUID(log, p.key, pod.UID)
return err
}
} else {
// We don't expect an event in this case.
p.excessPodExpectations.ObservedUID(log, p.key, pod.UID)
}
return nil
})
if err != nil {
return err
}
p.list.Items = newPodsInGroup

return nil
}

func (p *Pod) EnsureWorkloadOwnedByAllMembers(ctx context.Context, c client.Client, r record.EventRecorder, workload *kueue.Workload) error {
if !p.isGroup {
return nil
}
func (p *Pod) ensureWorkloadOwnedByAllMembers(ctx context.Context, c client.Client, r record.EventRecorder, workload *kueue.Workload) error {
oldOwnersCnt := len(workload.GetOwnerReferences())
for _, pod := range p.list.Items {
if err := controllerutil.SetOwnerReference(&pod, workload, c.Scheme()); err != nil {
Expand Down Expand Up @@ -871,7 +924,7 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client,
return wl, nil
}

if err := p.finalizeNonRunnableNorSucceededPods(ctx, c); err != nil {
if err := p.finalizePods(ctx, c, p.notRunnableNorSucceededPods()); err != nil {
return nil, err
}

Expand All @@ -893,9 +946,13 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client,
}

// Cleanup extra pods if there's any
err = p.cleanupExcessPods(ctx, c, r, groupTotalCount, activePods)
if err != nil {
return nil, err
if excessPodsCount := len(activePods) - groupTotalCount; excessPodsCount > 0 {
sortActivePods(activePods)
err = p.removeExcessPods(ctx, c, r, activePods[len(activePods)-excessPodsCount:])
if err != nil {
return nil, err
}
p.list.Items = activePods[:len(activePods)-excessPodsCount]
}

// Construct workload for a pod group
Expand Down Expand Up @@ -971,37 +1028,73 @@ func (p *Pod) FindMatchingWorkloads(ctx context.Context, c client.Client, r reco

// Cleanup excess pods for each workload pod set (role)
trasc marked this conversation as resolved.
Show resolved Hide resolved
activePods := p.runnableOrSucceededPods()
inactivePods := p.notRunnableNorSucceededPods()

var keptPods []corev1.Pod
var excessActivePods []corev1.Pod
var replacedInactivePods []corev1.Pod

for _, ps := range workload.Spec.PodSets {
// Find all the active pods of the role
var roleActivePods []corev1.Pod
for _, activePod := range activePods {
roleHash, err := getRoleHash(activePod)
// Find all the active and inactive pods of the role
var roleHashErrors []error
hasRoleFunc := func(p *corev1.Pod) bool {
hash, err := getRoleHash(*p)
if err != nil {
return nil, nil, fmt.Errorf("failed to calculate pod role hash: %w", err)
roleHashErrors = append(roleHashErrors, err)
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
return false
}
return hash == ps.Name
}
roleActivePods := utilslices.Pick(activePods, hasRoleFunc)
roleInactivePods := utilslices.Pick(inactivePods, hasRoleFunc)
if len(roleHashErrors) > 0 {
return nil, nil, fmt.Errorf("failed to calculate pod role hash: %w", errors.Join(roleHashErrors...))
}

if ps.Name == roleHash {
roleActivePods = append(roleActivePods, activePod)
}
if excessCount := len(roleActivePods) - int(ps.Count); excessCount > 0 {
sortActivePods(roleActivePods)
excessActivePods = append(excessActivePods, roleActivePods[len(roleActivePods)-excessCount:]...)
keptPods = append(keptPods, roleActivePods[:len(roleActivePods)-excessCount]...)
} else {
keptPods = append(keptPods, roleActivePods...)
}

// Cleanup excess pods of the role
err := p.cleanupExcessPods(ctx, c, r, int(ps.Count), roleActivePods)
if err != nil {
return nil, nil, err
if finalzeblePodsCount := min(len(roleInactivePods), len(roleInactivePods)+len(roleActivePods)-int(ps.Count)); finalzeblePodsCount > 0 {
trasc marked this conversation as resolved.
Show resolved Hide resolved
sortInactivePods(roleInactivePods)
replacedInactivePods = append(replacedInactivePods, roleInactivePods[len(roleInactivePods)-finalzeblePodsCount:]...)
keptPods = append(keptPods, roleInactivePods[:len(roleInactivePods)-finalzeblePodsCount]...)
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
} else {
keptPods = append(keptPods, roleInactivePods...)
}
}

jobPodSets, err := p.constructGroupPodSets()
jobPodSets, err := constructGroupPodSets(keptPods)
if err != nil {
return nil, nil, err
}

if p.equivalentToWorkload(workload, jobPodSets) {
return workload, []*kueue.Workload{}, nil
} else {
if len(keptPods) == 0 || !p.equivalentToWorkload(workload, jobPodSets) {
return nil, []*kueue.Workload{workload}, nil
}

// Do not clean up more pods until observing previous operations
if !p.satisfiedExcessPods {
return nil, nil, errPendingOps
}

p.list.Items = keptPods
if err := p.ensureWorkloadOwnedByAllMembers(ctx, c, r, workload); err != nil {
return nil, nil, err
}

if err := p.removeExcessPods(ctx, c, r, excessActivePods); err != nil {
return nil, nil, err
}

if err := p.finalizePods(ctx, c, replacedInactivePods); err != nil {
return nil, nil, err
}
return workload, []*kueue.Workload{}, nil
}

func (p *Pod) equivalentToWorkload(wl *kueue.Workload, jobPodSets []kueue.PodSet) bool {
Expand Down Expand Up @@ -1068,20 +1161,6 @@ func (p *Pod) ReclaimablePods() ([]kueue.ReclaimablePod, error) {
return result, nil
}

func (p *Pod) finalizeNonRunnableNorSucceededPods(ctx context.Context, c client.Client) error {
for _, p := range p.list.Items {
if isPodRunnableOrSucceeded(&p) {
continue
}
if controllerutil.RemoveFinalizer(&p, PodFinalizer) {
if err := c.Update(ctx, &p); err != nil {
return err
}
}
}
return nil
}

func IsPodOwnerManagedByKueue(p *Pod) bool {
if owner := metav1.GetControllerOf(&p.pod); owner != nil {
return jobframework.IsOwnerManagedByKueue(owner) || (owner.Kind == "RayCluster" && strings.HasPrefix(owner.APIVersion, "ray.io/v1alpha1"))
Expand Down
Loading