Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pod reconciler to handle race conditions in exclusive placement #342

Merged
merged 15 commits into from
Dec 12, 2023
Merged
189 changes: 185 additions & 4 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@ package controllers

import (
"context"
"errors"
"fmt"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/jobset/pkg/util/placement"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

Expand Down Expand Up @@ -56,8 +65,9 @@ func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&corev1.Pod{}).
WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
pod, ok := object.(*corev1.Pod)
// Only reconcile pods that are part of JobSets using exclusive placement.
return ok && usingExclusivePlacement(pod)
// Only reconcile leader pods which have been scheduled which are part of
// JobSets using exclusive placement.
return ok && placement.IsLeaderPod(pod) && isScheduled(pod) && usingExclusivePlacement(pod)
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
})).
Complete(r)
}
Expand Down Expand Up @@ -99,17 +109,167 @@ func SetupPodIndexes(ctx context.Context, indexer client.FieldIndexer) error {
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// Reconcile attempts to enforce that the pods that belong to the same job are
// scheduled on the same topology domain exclusively (if the parent JobSet is using
// exclusive placement).
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// In the following, we aim to enforce that for JobSets using exclusive placement,
// pods that belong to the same job are scheduled on the same topology domain exclusively.
// We do this by performing the following steps:
// 1) Reconcile leader pods which are scheduled and using exclusive placement.
// 2) For a given leader pod, check all follower pods's nodeSelectors are all
// configured to select the same topology as the leader pod is currently placed on.
// 3) If the above condition is false, we delete all the follower pods in this job to
// allow them to be rescheduled correctly.
var leaderPod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &leaderPod); err != nil {
// we'll ignore not-found errors, since there is nothing we can do here.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(&leaderPod))
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Pod")

ahg-g marked this conversation as resolved.
Show resolved Hide resolved
// Get all the pods owned by the same job as this pod.
jobKey, exists := leaderPod.Labels[jobset.JobKey]
if !exists {
return ctrl.Result{}, fmt.Errorf("job key label not found on leader pod: %q", leaderPod.Name)
}
podList, err := r.listPodsForJob(ctx, leaderPod.Namespace, jobKey)
if err != nil {
log.Error(err, "listing pods for job")
return ctrl.Result{}, err
}

// Validate all follower pods in this job are assigned to the same topology as the leader pod.
// If not, then delete all the job's follower pods so they can be recreated and rescheduled correctly.
valid, err := r.validatePodPlacements(ctx, &leaderPod, podList)
if err != nil {
return ctrl.Result{}, err
}
if !valid {
return ctrl.Result{}, r.deleteFollowerPods(ctx, podList.Items)
}
return ctrl.Result{}, nil
}

// listPodsForJobKey returns a list of pods owned by a specific job, using the
// jobKey (SHA1 hash of the namespaced job name) label selector.
func (r *PodReconciler) listPodsForJob(ctx context.Context, ns, jobKey string) (*corev1.PodList, error) {
var podList corev1.PodList
if err := r.List(ctx, &podList, client.InNamespace(ns), &client.MatchingFields{podJobKey: jobKey}); err != nil {
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

return &podList, nil
}

// validatePodPlacements returns true if the topology value specified in follower pods' nodeSelectors
// matches that of their leader pod, and the leader pod exists. Otherwise, it returns false.
func (r *PodReconciler) validatePodPlacements(ctx context.Context, leaderPod *corev1.Pod, podList *corev1.PodList) (bool, error) {
// We know exclusive key is set since we have an event filter for this.
topologyKey := leaderPod.Annotations[jobset.ExclusiveKey]
leaderTopology, err := r.topologyFromPod(ctx, leaderPod, topologyKey)
if err != nil {
return false, err
}

// Validate all follower pods are assigned to the same topology as the leader pod.
for _, pod := range podList.Items {
if placement.IsLeaderPod(&pod) {
continue
}
followerTopology, err := r.topologyFromPod(ctx, &pod, topologyKey)
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, err
}
if followerTopology != leaderTopology {
return false, fmt.Errorf("follower topology %q != leader topology %q", followerTopology, leaderTopology)
}
}
return true, nil
}

// topologyFromPod returns the topology value (e.g., node pool name, zone, etc.)
// for a given pod and topology key.
func (r *PodReconciler) topologyFromPod(ctx context.Context, pod *corev1.Pod, topologyKey string) (string, error) {
log := ctrl.LoggerFrom(ctx)

nodeName := pod.Spec.NodeName
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
ns := pod.Namespace

// Get node the leader pod is running on.
var node corev1.Node
if err := r.Get(ctx, types.NamespacedName{Name: nodeName, Namespace: ns}, &node); err != nil {
// We'll ignore not-found errors, since there is nothing we can do here.
// A node may not exist temporarily due to a maintenance event or other scenarios.
log.Error(err, fmt.Sprintf("getting node %s", nodeName))
return "", client.IgnoreNotFound(err)
}

// Get topology (e.g. node pool name) from node labels.
topology, exists := node.Labels[topologyKey]
if !exists {
return "", fmt.Errorf("node does not have topology label: %s", topology)
}
return topology, nil
}

// deleteFollowerPods deletes follower pods (non-index 0), parallelizing up to `maxParallelism` requests.
func (r *PodReconciler) deleteFollowerPods(ctx context.Context, pods []corev1.Pod) error {
lock := &sync.Mutex{}
var finalErrs []error

workqueue.ParallelizeUntil(ctx, maxParallelism, len(pods), func(i int) {
pod := pods[i]
// Do not delete leader pod.
if placement.IsLeaderPod(&pod) {
return
}

// Add condition to pod status so that a podFailurePolicy can be used to ignore
// deletions by this controller done to handle race conditions.
condition := corev1.PodCondition{
Type: corev1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "ExclusivePlacementViolation",
Message: "Pod violated JobSet exclusive placement policy",
}
// If pod status already has this condition, we don't need to send the update RPC again.
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
if updatePodCondition(&pod, condition) {
if err := r.Status().Update(ctx, &pod); err != nil {
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, err)
return
}
}

// Delete the pod.
backgroundPolicy := metav1.DeletePropagationBackground
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
if err := r.Delete(ctx, &pod, &client.DeleteOptions{PropagationPolicy: &backgroundPolicy}); client.IgnoreNotFound(err) != nil {
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, err)
return
}
})
return errors.Join(finalErrs...)
}

// usingExclusivePlacement returns true if the pod is part of a JobSet using
// exclusive placement, otherwise it returns false.
func usingExclusivePlacement(pod *corev1.Pod) bool {
_, exists := pod.Annotations[jobset.ExclusiveKey]
return exists
}

// isScheduled returns true if the pod has been scheduled, otherwise it returns false.
func isScheduled(pod *corev1.Pod) bool {
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
return pod.Spec.NodeName != ""
}

// removePodNameSuffix removes the random suffix appended to pod names.
func removePodNameSuffix(podName string) (string, error) {
parts := strings.Split(podName, "-")
Expand All @@ -121,3 +281,24 @@ func removePodNameSuffix(podName string) (string, error) {
}
return strings.Join(parts[:len(parts)-1], "-"), nil
}

// Update the pod status with the given condition.
func updatePodCondition(pod *corev1.Pod, condition corev1.PodCondition) bool {
condition.LastTransitionTime = metav1.Now()
for i, val := range pod.Status.Conditions {
if condition.Type == val.Type && condition.Status != val.Status {
pod.Status.Conditions[i] = condition
// Condition found but different status so we should update
return true
} else if condition.Type == val.Type && condition.Status == val.Status {
// Duplicate condition so no update
return false
}
}
// condition doesn't exist, update only if the status is true
if condition.Status == corev1.ConditionTrue {
pod.Status.Conditions = append(pod.Status.Conditions, condition)
return true
}
return false
}
12 changes: 6 additions & 6 deletions pkg/util/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func GenJobName(jsName, rjobName string, jobIndex int) string {
return fmt.Sprintf("%s-%s-%d", jsName, rjobName, jobIndex)
}

// IsLeaderPod returns true if the given pod is a leader pod (job completion index of 0),
// otherwise it returns false.
func IsLeaderPod(pod *corev1.Pod) bool {
return pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0"
}

// GenLeaderPodName returns the name of the leader pod (pod with completion index 0)
// for a given job in a jobset.
func GenLeaderPodName(jobSet, replicatedJob, jobIndex string) string {
return fmt.Sprintf("%s-%s-%s-0", jobSet, replicatedJob, jobIndex)
}

// IsLeaderPod returns true if the given pod is a leader pod (job completion index of 0),
// otherwise it returns false.
func IsLeaderPod(pod *corev1.Pod) bool {
return pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0"
}