Skip to content

Commit

Permalink
Update pod reconciler to handle race conditions in exclusive placement (
Browse files Browse the repository at this point in the history
#342)

* update pod reconciler to handle race conditions in exclusive placement

* add comments explaining the purpose of the reconciler and the algorithm used

* only reconcile scheduled leader pods using exclusive placement; only delete follower pods when there is a placement violation

* don't send status update again if the pod already has the condition

* address comments

* filter deleted pods

* debugging

* revert last commit

* test

* debug

* more debug

* debug 3

* remove debug logs

* webhook logs at higher log level

* fix comments
  • Loading branch information
danielvegamyhre authored Dec 12, 2023
1 parent 39827b9 commit d49514b
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 14 deletions.
210 changes: 206 additions & 4 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@ package controllers

import (
"context"
"errors"
"fmt"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/jobset/pkg/util/placement"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

Expand Down Expand Up @@ -55,9 +64,10 @@ func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
// Only reconcile leader pods which have been scheduled which are part of
// JobSets using exclusive placement.
pod, ok := object.(*corev1.Pod)
// Only reconcile pods that are part of JobSets using exclusive placement.
return ok && usingExclusivePlacement(pod)
return ok && placement.IsLeaderPod(pod) && podScheduled(pod) && usingExclusivePlacement(pod) && !podDeleted(pod)
})).
Complete(r)
}
Expand Down Expand Up @@ -99,17 +109,188 @@ func SetupPodIndexes(ctx context.Context, indexer client.FieldIndexer) error {
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// Reconcile attempts to enforce that the pods that belong to the same job are
// scheduled on the same topology domain exclusively if the parent JobSet is using
// exclusive placement.
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// In the following, we aim to enforce that for JobSets using exclusive placement,
// pods that belong to the same job are scheduled on the same topology domain exclusively.
// We do this by performing the following steps:
// 1) Reconcile leader pods which are scheduled and using exclusive placement.
// 2) For a given leader pod, check all follower pods's nodeSelectors are all
// configured to select the same topology as the leader pod is currently placed on.
// 3) If the above condition is false, we delete all the follower pods in this job to
// allow them to be rescheduled correctly.
var leaderPod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &leaderPod); err != nil {
// we'll ignore not-found errors, since there is nothing we can do here.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(&leaderPod))
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Pod")

// Get all the pods owned by the same job as this pod.
jobKey, exists := leaderPod.Labels[jobset.JobKey]
if !exists {
return ctrl.Result{}, fmt.Errorf("job key label not found on leader pod: %q", leaderPod.Name)
}
podList, err := r.listPodsForJob(ctx, leaderPod.Namespace, jobKey)
if err != nil {
log.Error(err, "listing pods for job")
return ctrl.Result{}, err
}

// Validate all follower pods in this job are assigned to the same topology as the leader pod.
// If not, then delete all the job's follower pods so they can be recreated and rescheduled correctly.
valid, err := r.validatePodPlacements(ctx, &leaderPod, podList)
if err != nil {
return ctrl.Result{}, err
}
if !valid {
return ctrl.Result{}, r.deleteFollowerPods(ctx, podList.Items)
}
return ctrl.Result{}, nil
}

// listPodsForJobKey returns a list of pods owned by a specific job, using the
// jobKey (SHA1 hash of the namespaced job name) label selector.
func (r *PodReconciler) listPodsForJob(ctx context.Context, ns, jobKey string) (*corev1.PodList, error) {
var podList corev1.PodList
if err := r.List(ctx, &podList, client.InNamespace(ns), &client.MatchingFields{podJobKey: jobKey}); err != nil {
return nil, err
}

return &podList, nil
}

// validatePodPlacements returns true if the topology value specified in follower pods' nodeSelectors
// matches that of their leader pod, and the leader pod exists. Otherwise, it returns false.
func (r *PodReconciler) validatePodPlacements(ctx context.Context, leaderPod *corev1.Pod, podList *corev1.PodList) (bool, error) {
// We know exclusive key is set since we have an event filter for this.
topologyKey := leaderPod.Annotations[jobset.ExclusiveKey]
leaderTopology, err := r.leaderPodTopology(ctx, leaderPod, topologyKey)
if err != nil {
return false, err
}

// Validate all follower pods are assigned to the same topology as the leader pod.
for _, pod := range podList.Items {
if placement.IsLeaderPod(&pod) {
continue
}
followerTopology, err := followerPodTopology(&pod, topologyKey)
if err != nil {
return false, err
}
if followerTopology != leaderTopology {
return false, fmt.Errorf("follower topology %q != leader topology %q", followerTopology, leaderTopology)
}
}
return true, nil
}

// deleteFollowerPods deletes follower pods (non-index 0), parallelizing up to `maxParallelism` requests.
func (r *PodReconciler) deleteFollowerPods(ctx context.Context, pods []corev1.Pod) error {
lock := &sync.Mutex{}
var finalErrs []error

workqueue.ParallelizeUntil(ctx, maxParallelism, len(pods), func(i int) {
pod := pods[i]
// Do not delete leader pod.
if placement.IsLeaderPod(&pod) {
return
}

// Add condition to pod status so that a podFailurePolicy can be used to ignore
// deletions by this controller done to handle race conditions.
condition := corev1.PodCondition{
Type: corev1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "ExclusivePlacementViolation",
Message: "Pod violated JobSet exclusive placement policy",
}

// If pod status already has this condition, we don't need to send the update again.
if updatePodCondition(&pod, condition) {
if err := r.Status().Update(ctx, &pod); err != nil {
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, err)
return
}
}

// Delete the pod.
backgroundPolicy := metav1.DeletePropagationBackground
if err := r.Delete(ctx, &pod, &client.DeleteOptions{PropagationPolicy: &backgroundPolicy}); client.IgnoreNotFound(err) != nil {
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, err)
return
}
})
return errors.Join(finalErrs...)
}

// leaderPodTopology returns the topology value (e.g., node pool name, zone, etc.)
// for a given leader pod and topology key, by checking the node labels on the node
// the leader is currently scheduled on.
func (r *PodReconciler) leaderPodTopology(ctx context.Context, pod *corev1.Pod, topologyKey string) (string, error) {
log := ctrl.LoggerFrom(ctx)

nodeName := pod.Spec.NodeName
ns := pod.Namespace

// Get node the leader pod is running on.
var node corev1.Node
if err := r.Get(ctx, types.NamespacedName{Name: nodeName, Namespace: ns}, &node); err != nil {
// We'll ignore not-found errors, since there is nothing we can do here.
// A node may not exist temporarily due to a maintenance event or other scenarios.
log.Error(err, fmt.Sprintf("getting node %s", nodeName))
return "", client.IgnoreNotFound(err)
}

// Get topology (e.g. node pool name) from node labels.
topology, exists := node.Labels[topologyKey]
if !exists {
return "", fmt.Errorf("node does not have topology label: %s", topology)
}
return topology, nil
}

// followerPodTopology returns the topology value (e.g., node pool name, zone, etc.)
// for a given follower pod and topology key, by checking the target topology
// defined in the pod's nodeSelector.
func followerPodTopology(pod *corev1.Pod, topologyKey string) (string, error) {
if pod.Spec.NodeSelector == nil {
return "", fmt.Errorf("pod %s nodeSelector is nil", pod.Name)
}
topology, exists := pod.Spec.NodeSelector[topologyKey]
if !exists {
return "", fmt.Errorf("pod %s nodeSelector is missing key: %s", pod.Name, topologyKey)
}
return topology, nil
}

// usingExclusivePlacement returns true if the pod is part of a JobSet using
// exclusive placement, otherwise it returns false.
func usingExclusivePlacement(pod *corev1.Pod) bool {
_, exists := pod.Annotations[jobset.ExclusiveKey]
return exists
}

// podScheduled returns true if the pod has been scheduled, otherwise it returns false.
func podScheduled(pod *corev1.Pod) bool {
return pod.Spec.NodeName != ""
}

// podDeleted returns true if hte pod has been marked for deletion, otherwise it returns false.
func podDeleted(pod *corev1.Pod) bool {
return pod.DeletionTimestamp != nil
}

// removePodNameSuffix removes the random suffix appended to pod names.
func removePodNameSuffix(podName string) (string, error) {
parts := strings.Split(podName, "-")
Expand All @@ -121,3 +302,24 @@ func removePodNameSuffix(podName string) (string, error) {
}
return strings.Join(parts[:len(parts)-1], "-"), nil
}

// Update the pod status with the given condition.
func updatePodCondition(pod *corev1.Pod, condition corev1.PodCondition) bool {
condition.LastTransitionTime = metav1.Now()
for i, val := range pod.Status.Conditions {
if condition.Type == val.Type && condition.Status != val.Status {
pod.Status.Conditions[i] = condition
// Condition found but different status so we should update
return true
} else if condition.Type == val.Type && condition.Status == val.Status {
// Duplicate condition so no update
return false
}
}
// Condition doesn't exist, update only if the status is true.
if condition.Status == corev1.ConditionTrue {
pod.Status.Conditions = append(pod.Status.Conditions, condition)
return true
}
return false
}
12 changes: 6 additions & 6 deletions pkg/util/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func GenJobName(jsName, rjobName string, jobIndex int) string {
return fmt.Sprintf("%s-%s-%d", jsName, rjobName, jobIndex)
}

// IsLeaderPod returns true if the given pod is a leader pod (job completion index of 0),
// otherwise it returns false.
func IsLeaderPod(pod *corev1.Pod) bool {
return pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0"
}

// GenLeaderPodName returns the name of the leader pod (pod with completion index 0)
// for a given job in a jobset.
func GenLeaderPodName(jobSet, replicatedJob, jobIndex string) string {
return fmt.Sprintf("%s-%s-%s-0", jobSet, replicatedJob, jobIndex)
}

// IsLeaderPod returns true if the given pod is a leader pod (job completion index of 0),
// otherwise it returns false.
func IsLeaderPod(pod *corev1.Pod) bool {
return pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0"
}
3 changes: 1 addition & 2 deletions pkg/webhooks/pod_admission_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (p *podWebhook) leaderPodScheduled(ctx context.Context, pod *corev1.Pod) (b
func (p *podWebhook) leaderPodForFollower(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) {
// Generate the expected leader pod name for this follower pod.
log := ctrl.LoggerFrom(ctx)
log.Info(fmt.Sprintf("generating leader pod name for follower pod: %s", pod.Name))
leaderPodName, err := genLeaderPodName(pod)
if err != nil {
log.Error(err, "getting leader pod name for follower pod")
Expand All @@ -100,7 +99,7 @@ func (p *podWebhook) leaderPodForFollower(ctx context.Context, pod *corev1.Pod)

// Validate there is only 1 leader pod for this job.
if len(podList.Items) != 1 {
return nil, fmt.Errorf("too many leader pods for this job (expected 1, got %d", len(podList.Items))
return nil, fmt.Errorf("incorrect number of leader pods for this job (expected 1, got %d)", len(podList.Items))
}

// Check if the leader pod is scheduled.
Expand Down
4 changes: 2 additions & 2 deletions pkg/webhooks/pod_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func (p *podWebhook) Default(ctx context.Context, obj runtime.Object) error {
func (p *podWebhook) patchPod(ctx context.Context, pod *corev1.Pod) error {
log := ctrl.LoggerFrom(ctx)
if pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0" {
log.Info(fmt.Sprintf("pod webhook: setting exclusive affinities for pod: %s", pod.Name))
log.V(3).Info(fmt.Sprintf("pod webhook: setting exclusive affinities for pod: %s", pod.Name))
setExclusiveAffinities(pod)
return nil
} else {
log.Info(fmt.Sprintf("pod webhook: adding node selector for follower pod: %s", pod.Name))
log.V(3).Info(fmt.Sprintf("pod webhook: adding node selector for follower pod: %s", pod.Name))
return p.setNodeSelector(ctx, pod)
}
}
Expand Down

0 comments on commit d49514b

Please sign in to comment.