Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate follower pod owned by same Job as leader pod #433

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// If not, then delete all the job's follower pods so they can be recreated and rescheduled correctly.
valid, err := r.validatePodPlacements(ctx, &leaderPod, podList)
if err != nil {
log.Error(err, "validating pod placements")
return ctrl.Result{}, err
}
if !valid {
log.V(2).Info("deleting follower pods for leader pod: %s", leaderPod.Name)
return ctrl.Result{}, r.deleteFollowerPods(ctx, podList.Items)
}
return ctrl.Result{}, nil
Expand Down
32 changes: 30 additions & 2 deletions pkg/webhooks/pod_admission_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (p *podWebhook) leaderPodScheduled(ctx context.Context, pod *corev1.Pod) (b
}
scheduled := leaderPod.Spec.NodeName != ""
if !scheduled {
log.V(3).Info("leader pod %s is not yet scheduled", leaderPod.Name)
log.V(2).Info("leader pod %s is not yet scheduled", leaderPod.Name)
}
return scheduled, nil
}
Expand All @@ -107,8 +108,18 @@ func (p *podWebhook) leaderPodForFollower(ctx context.Context, pod *corev1.Pod)
return nil, fmt.Errorf("expected 1 leader pod (%s), but got %d. this is an expected, transient error", leaderPodName, len(podList.Items))
}

// Check if the leader pod is scheduled.
// Validate leader pod has same owner UID as the follower, to ensure they are part of the same Job.
// This is necessary to handle a race condition where the JobSet is restarted (deleting and recreating
// all jobs), and a leader pod may land on different node pools than they were originally scheduled on.
// Then when the follower pods are recreated, and we look up the leader pod using the index which maps
// [pod name without random suffix] -> corev1.Pod object, we may get a stale index entry and inject
// the the wrong nodeSelector, using the topology the leader pod was originally scheduled on before the
// restart.
leaderPod := &podList.Items[0]
if err := podsOwnedBySameJob(leaderPod, pod); err != nil {
return nil, err
}

return leaderPod, nil
}

Expand All @@ -131,3 +142,20 @@ func genLeaderPodName(pod *corev1.Pod) (string, error) {
leaderPodName := placement.GenPodName(jobSet, replicatedJob, jobIndex, "0")
return leaderPodName, nil
}

// podsOwnedBySameJob returns an error if the leader pod and
// follower pod are not owned by the same Job UID. Otherwise, it returns nil.
func podsOwnedBySameJob(leaderPod, followerPod *corev1.Pod) error {
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
followerOwnerRef := metav1.GetControllerOf(followerPod)
if followerOwnerRef == nil {
return fmt.Errorf("follower pod has no owner reference")
}
leaderOwnerRef := metav1.GetControllerOf(leaderPod)
if leaderOwnerRef == nil {
return fmt.Errorf("leader pod %q has no owner reference", leaderPod.Name)
}
if followerOwnerRef.UID != leaderOwnerRef.UID {
return fmt.Errorf("follower pod owner UID (%s) != leader pod owner UID (%s)", string(followerOwnerRef.UID), string(leaderOwnerRef.UID))
}
return nil
}
70 changes: 70 additions & 0 deletions pkg/webhooks/pod_admission_webhook_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package webhooks

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)
Expand Down Expand Up @@ -64,3 +67,70 @@ func TestLeaderPodName(t *testing.T) {
})
}
}

func TestPodsOwnedBySameJob(t *testing.T) {
testCases := []struct {
name string
leaderPod *corev1.Pod
followerPod *corev1.Pod
wantResult error
}{
{
name: "pods owned by the same job",
leaderPod: createPodWithOwner("leader-pod", "job-uid-1"),
followerPod: createPodWithOwner("follower-pod", "job-uid-1"),
wantResult: nil,
},
{
name: "pods owned by different jobs",
leaderPod: createPodWithOwner("leader-pod", "job-uid-1"),
followerPod: createPodWithOwner("follower-pod", "job-uid-2"),
wantResult: fmt.Errorf("follower pod owner UID (%s) != leader pod owner UID (%s)", "job-uid-2", "job-uid-1"),
},
{
name: "follower pod with no owner",
leaderPod: createPodWithOwner("leader-pod", "job-uid-1"),
followerPod: createPodWithOwner("follower-pod", ""),
wantResult: fmt.Errorf("follower pod has no owner reference"),
},
{
name: "leader pod with no owner",
leaderPod: createPodWithOwner("leader-pod", ""),
followerPod: createPodWithOwner("follower-pod", "job-uid-2"),
wantResult: fmt.Errorf("leader pod \"leader-pod\" has no owner reference"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := podsOwnedBySameJob(tc.leaderPod, tc.followerPod)

// Want no error, but we got an error.
if tc.wantResult == nil && result != nil {
t.Errorf("Unexpected error: %v", result)

// Want an error, but we got no error.
} else if tc.wantResult != nil && result == nil {
t.Error("Expected error, but got nil")

// Want an error but the error we got is not the expected error.
} else if tc.wantResult != nil && result != nil && tc.wantResult.Error() != result.Error() {
t.Errorf("Expected error: %v, got: %v", tc.wantResult, result)
}
})
}
}

func createPodWithOwner(name string, ownerUID string) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
if ownerUID != "" {
pod.OwnerReferences = []metav1.OwnerReference{
{UID: types.UID(ownerUID), Kind: "Job", Controller: ptr.To(true)},
}
}
return pod
}
1 change: 1 addition & 0 deletions pkg/webhooks/pod_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (p *podWebhook) setNodeSelector(ctx context.Context, pod *corev1.Pod) error
if pod.Spec.NodeSelector == nil {
pod.Spec.NodeSelector = make(map[string]string)
}
log.V(2).Info(fmt.Sprintf("setting nodeSelector %s: %s to follow leader pod %s", topologyKey, topologyValue, leaderPod.Name))
pod.Spec.NodeSelector[topologyKey] = topologyValue
return nil
}
Expand Down