Skip to content

Commit

Permalink
update pod reconciler to handle race conditions in exclusive placement
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Nov 30, 2023
1 parent 1056fb8 commit 679cf95
Showing 1 changed file with 208 additions and 0 deletions.
208 changes: 208 additions & 0 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@ package controllers

import (
"context"
"errors"
"fmt"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/jobset/pkg/util/placement"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

Expand All @@ -37,6 +45,9 @@ const (
// the namespaced job name of the job that owns this pod, and value is
// the pod itself.
podJobKey string = "podJobKey"

// parallelDeletions defines the maximum number of pod deletions that can be done concurrently.
parallelDeletions int = 50
)

// PodReconciler reconciles a Pod owned by a JobSet using exclusive placement.
Expand Down Expand Up @@ -102,9 +113,187 @@ func SetupPodIndexes(ctx context.Context, indexer client.FieldIndexer) error {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Get Pod from apiserver.
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
// we'll ignore not-found errors, since there is nothing we can do here.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(&pod))
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Pod")

// Check if this is the leader pod. If it is the leader pod and it hasn't been
// scheduled, do nothing and return early.
leader := placement.IsLeaderPod(&pod)
if leader {
log.Info(fmt.Sprintf("%q is a leader pod", pod.Name))
if pod.Spec.NodeName == "" {
log.Info("leader pod not scheduled")
return ctrl.Result{}, nil
}
}

// We need a reference to the scheduled leader pod of this job, to find the topology domain
// it is scheduled in.
leaderPod, err := r.leaderPodForFollower(ctx, &pod)
if err != nil {
return ctrl.Result{}, err
}

// Get all the pods owned by the same job as this pod.
jobKey, exists := leaderPod.Labels[jobset.JobKey]
if !exists {
return ctrl.Result{}, fmt.Errorf("job key label not found on leader pod: %q", leaderPod.Name)
}
podList, err := r.listPodsForJob(ctx, leaderPod.Namespace, jobKey)
if err != nil {
log.Error(err, "listing pods for job")
return ctrl.Result{}, err
}

// Validate all follower pods in this job are assigned to the same topology as the leader pod.
// If not, then delete all the job's pods so they can be recreated and rescheduled correctly.
valid, err := r.validatePodPlacements(ctx, leaderPod, podList)
if err != nil {
return ctrl.Result{}, err
}
if !valid {
return ctrl.Result{}, r.deletePods(ctx, podList.Items)
}
return ctrl.Result{}, nil
}

// listPodsForJobKey returns a list of pods owned by a specific job, using the
// jobKey (SHA1 hash of the namespaced job name) label selector.
func (r *PodReconciler) listPodsForJob(ctx context.Context, ns, jobKey string) (*corev1.PodList, error) {
log := ctrl.LoggerFrom(ctx)

var podList corev1.PodList
if err := r.List(ctx, &podList, client.InNamespace(ns), &client.MatchingFields{podJobKey: jobKey}); err != nil {
log.Error(err, "listing pods")
return nil, err
}

return &podList, nil
}

// getPodByName returns the Pod object for a given pod name.
func (r *PodReconciler) getPodByName(ctx context.Context, ns, podName string) (*corev1.Pod, error) {
log := ctrl.LoggerFrom(ctx)

var podList corev1.PodList
if err := r.List(ctx, &podList, client.InNamespace(ns), &client.MatchingFields{PodNameKey: podName}); err != nil {
log.Error(err, "listing pods")
return nil, err
}

// Validate only 1 pod with this name exists.
if len(podList.Items) != 1 {
return nil, fmt.Errorf("expected 1 pod with name %q, got %d", podName, len(podList.Items))
}

return &podList.Items[0], nil
}

// leaderPodForFollower returns the Pod object for the leader pod (job completion index 0) for
// a given follower pod.
func (r *PodReconciler) leaderPodForFollower(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) {
log := ctrl.LoggerFrom(ctx)

var leaderPod *corev1.Pod
if placement.IsLeaderPod(pod) {
log.Info(fmt.Sprintf("%q is a leader pod", pod.Name))
leaderPod = pod
} else {
log.Info(fmt.Sprintf("%q is a follower pod", pod.Name))
leaderPodName, err := leaderPodNameForFollower(pod)
if err != nil {
log.Error(err, "generating leader pod name")
return nil, err
}
// Use pod name index to quickly fetch the leader pod object.
leaderPod, err = r.getPodByName(ctx, pod.Namespace, leaderPodName)
if err != nil {
log.Error(err, "getting leader pod by name")
return nil, err
}
}
return leaderPod, nil
}

// validatePodPlacements returns true if the topology value specified in follower pods' nodeSelectors
// matches that of their leader pod, and the leader pod exists. Otherwise, it returns false.
func (r *PodReconciler) validatePodPlacements(ctx context.Context, leaderPod *corev1.Pod, podList *corev1.PodList) (bool, error) {
// We know exclusive key is set since we have an event filter for this.
topologyKey := leaderPod.Annotations[jobset.ExclusiveKey]
leaderTopology, err := r.topologyFromPod(ctx, leaderPod, topologyKey)
if err != nil {
return false, err
}

// Validate all follower pods are assigned to the same topology as the leader pod.
for _, pod := range podList.Items {
if placement.IsLeaderPod(&pod) {
continue
}
followerTopology, err := r.topologyFromPod(ctx, &pod, topologyKey)
if err != nil {
return false, err
}
if followerTopology != leaderTopology {
return false, fmt.Errorf("follower topology %q != leader topology %q", followerTopology, leaderTopology)
}
}
return true, nil
}

// topologyFromPod returns the topology value (e.g., node pool name, zone, etc.)
// for a given pod and topology key.
func (r *PodReconciler) topologyFromPod(ctx context.Context, pod *corev1.Pod, topologyKey string) (string, error) {
log := ctrl.LoggerFrom(ctx)

nodeName := pod.Spec.NodeName
ns := pod.Namespace

// Get node the leader pod is running on.
var node corev1.Node
if err := r.Get(ctx, types.NamespacedName{Name: nodeName, Namespace: ns}, &node); err != nil {
// We'll ignore not-found errors, since there is nothing we can do here.
// A node may not exist temporarily due to a maintenance event or other scenarios.
log.Error(err, fmt.Sprintf("getting node %s", nodeName))
return "", client.IgnoreNotFound(err)
}

// Get topology (e.g. node pool name) from node labels.
topology, exists := node.Labels[topologyKey]
if !exists {
return "", fmt.Errorf("node does not have topology label: %s", topology)
}
return topology, nil
}

// deletePods deletes the given pods, parallelizing up to `parallelDeletions` requests.
func (r *PodReconciler) deletePods(ctx context.Context, pods []corev1.Pod) error {
lock := &sync.Mutex{}
var finalErrs []error

workqueue.ParallelizeUntil(ctx, parallelDeletions, len(pods), func(i int) {
pod := pods[i]
backgroundPolicy := metav1.DeletePropagationBackground
if err := r.Delete(ctx, &pod, &client.DeleteOptions{PropagationPolicy: &backgroundPolicy}); client.IgnoreNotFound(err) != nil {
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, err)
return
}
})
return errors.Join(finalErrs...)
}

// usingExclusivePlacement returns true if the pod is part of a JobSet using
// exclusive placement, otherwise it returns false.
func usingExclusivePlacement(pod *corev1.Pod) bool {
_, exists := pod.Annotations[jobset.ExclusiveKey]
return exists
Expand All @@ -121,3 +310,22 @@ func removePodNameSuffix(podName string) (string, error) {
}
return strings.Join(parts[:len(parts)-1], "-"), nil
}

// leaderPodNameForFollower accepts the name of a pod that is part of a jobset as input, and
// returns the name of the pod with completion index 0 in the same child job.
func leaderPodNameForFollower(pod *corev1.Pod) (string, error) {
// Pod name format: <jobset>-<replicatedJob>-<jobIndex>-<podIndex>-<randomSuffix>
jobSet, ok := pod.Labels[jobset.JobSetNameKey]
if !ok {
return "", fmt.Errorf("pod missing label: %s", jobset.JobSetNameKey)
}
replicatedJob, ok := pod.Labels[jobset.ReplicatedJobNameKey]
if !ok {
return "", fmt.Errorf("pod missing label: %s", jobset.ReplicatedJobNameKey)
}
jobIndex, ok := pod.Labels[jobset.JobIndexKey]
if !ok {
return "", fmt.Errorf("pod missing label: %s", jobset.JobIndexKey)
}
return placement.GenLeaderPodName(jobSet, replicatedJob, jobIndex), nil
}

0 comments on commit 679cf95

Please sign in to comment.