Skip to content

Commit

Permalink
validate follower in same job as leader
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Feb 22, 2024
1 parent 9f2cb14 commit 9ad5582
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// If not, then delete all the job's follower pods so they can be recreated and rescheduled correctly.
valid, err := r.validatePodPlacements(ctx, &leaderPod, podList)
if err != nil {
log.Error(err, "validating pod placements")
return ctrl.Result{}, err
}
if !valid {
log.V(2).Info("deleting follower pods for leader pod: %s", leaderPod.Name)
return ctrl.Result{}, r.deleteFollowerPods(ctx, podList.Items)
}
return ctrl.Result{}, nil
Expand Down
32 changes: 30 additions & 2 deletions pkg/webhooks/pod_admission_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (p *podWebhook) leaderPodScheduled(ctx context.Context, pod *corev1.Pod) (b
}
scheduled := leaderPod.Spec.NodeName != ""
if !scheduled {
log.V(3).Info("leader pod %s is not yet scheduled", leaderPod.Name)
log.V(0).Info("leader pod %s is not yet scheduled", leaderPod.Name)
}
return scheduled, nil
}
Expand All @@ -107,8 +108,18 @@ func (p *podWebhook) leaderPodForFollower(ctx context.Context, pod *corev1.Pod)
return nil, fmt.Errorf("expected 1 leader pod (%s), but got %d. this is an expected, transient error", leaderPodName, len(podList.Items))
}

// Check if the leader pod is scheduled.
// Validate leader pod has same owner UID as the follower, to ensure they are part of the same Job.
// This is necessary to handle a race condition where the JobSet is restarted (deleting and recreating
// all jobs), and a leader pod may land on different node pools than they were originally scheduled on.
// Then when the follower pods are recreated, and we look up the leader pod using the index which maps
// [pod name without random suffix] -> corev1.Pod object, we may get a stale index entry and inject
// the the wrong nodeSelector, using the topology the leader pod was originally scheduled on before the
// restart.
leaderPod := &podList.Items[0]
if err := podsOwnedBySameJob(leaderPod, pod); err != nil {
return nil, err
}

return leaderPod, nil
}

Expand All @@ -131,3 +142,20 @@ func genLeaderPodName(pod *corev1.Pod) (string, error) {
leaderPodName := placement.GenPodName(jobSet, replicatedJob, jobIndex, "0")
return leaderPodName, nil
}

// validatePodsOnSameRestartAttempt returns an error if the leader pod and
// follower pod are not on the same restart attempt. Otherwise, it returns nil.
func podsOwnedBySameJob(leaderPod, followerPod *corev1.Pod) error {
followerOwnerRef := metav1.GetControllerOf(followerPod)
if followerOwnerRef == nil {
return fmt.Errorf("follower pod has no owner reference")
}
leaderOwnerRef := metav1.GetControllerOf(leaderPod)
if leaderOwnerRef == nil {
return fmt.Errorf("leader pod %s has no owner reference", leaderPod.Name)
}
if followerOwnerRef.UID != leaderOwnerRef.UID {
return fmt.Errorf("follower pod owner UID (%s) != leader pod owner UID (%s)", string(followerOwnerRef.UID), string(leaderOwnerRef.UID))
}
return nil
}
1 change: 1 addition & 0 deletions pkg/webhooks/pod_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (p *podWebhook) setNodeSelector(ctx context.Context, pod *corev1.Pod) error
if pod.Spec.NodeSelector == nil {
pod.Spec.NodeSelector = make(map[string]string)
}
log.V(2).Info(fmt.Sprintf("setting nodeSelector %s: %s to follow leader pod %s", topologyKey, topologyValue, leaderPod.Name))
pod.Spec.NodeSelector[topologyKey] = topologyValue
return nil
}
Expand Down

0 comments on commit 9ad5582

Please sign in to comment.