Skip to content

Commit

Permalink
Merge pull request kubernetes#117015 from kannon92/job-fix-replacemen…
Browse files Browse the repository at this point in the history
…t-after-deletion

Job: create replacement pods only after terminated
  • Loading branch information
k8s-ci-robot committed Jul 21, 2023
2 parents f5130e4 + 74fcf3e commit 4e8908d
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 40 deletions.
25 changes: 25 additions & 0 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,12 +958,37 @@ func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod {
return result
}

func FilterTerminatingPods(pods []*v1.Pod) []*v1.Pod {
var result []*v1.Pod
for _, p := range pods {
if IsPodTerminating(p) {
result = append(result, p)
}
}
return result
}

func CountTerminatingPods(pods []*v1.Pod) int32 {
numberOfTerminatingPods := 0
for _, p := range pods {
if IsPodTerminating(p) {
numberOfTerminatingPods += 1
}
}
return int32(numberOfTerminatingPods)
}

func IsPodActive(p *v1.Pod) bool {
return v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil
}

func IsPodTerminating(p *v1.Pod) bool {
return !podutil.IsPodTerminal(p) &&
p.DeletionTimestamp != nil
}

// FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods.
func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet {
activeFilter := func(rs *apps.ReplicaSet) bool {
Expand Down
25 changes: 25 additions & 0 deletions pkg/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,31 @@ func TestDeletePodsAllowsMissing(t *testing.T) {
assert.True(t, apierrors.IsNotFound(err))
}

func TestCountTerminatingPods(t *testing.T) {
now := metav1.Now()

// This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
rc := newReplicationController(0)
podList := newPodList(nil, 7, v1.PodRunning, rc)
podList.Items[0].Status.Phase = v1.PodSucceeded
podList.Items[1].Status.Phase = v1.PodFailed
podList.Items[2].Status.Phase = v1.PodPending
podList.Items[2].SetDeletionTimestamp(&now)
podList.Items[3].Status.Phase = v1.PodRunning
podList.Items[3].SetDeletionTimestamp(&now)
var podPointers []*v1.Pod
for i := range podList.Items {
podPointers = append(podPointers, &podList.Items[i])
}

terminatingPods := CountTerminatingPods(podPointers)

assert.Equal(t, terminatingPods, int32(2))

terminatingList := FilterTerminatingPods(podPointers)
assert.Equal(t, len(terminatingList), int(2))
}

func TestActivePodFiltering(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/job/indexed_job_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,18 @@ func parseIndexesFromString(logger klog.Logger, indexesStr string, completions i

// firstPendingIndexes returns `count` indexes less than `completions` that are
// not covered by `activePods`, `succeededIndexes` or `failedIndexes`.
// In cases of PodReplacementPolicy as Failed we will include `terminatingPods` in this list.
func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int {
if count == 0 {
return nil
}
active := getIndexes(jobCtx.activePods)
result := make([]int, 0, count)
nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active))
if onlyReplaceFailedPods(jobCtx.job) {
terminating := getIndexes(controller.FilterTerminatingPods(jobCtx.pods))
nonPending = nonPending.withOrderedIndexes(sets.List(terminating))
}
if jobCtx.failedIndexes != nil {
nonPending = nonPending.merge(*jobCtx.failedIndexes)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/job/indexed_job_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ func TestFirstPendingIndexes(t *testing.T) {
activePods: hollowPodsWithIndexPhase(tc.activePods),
succeededIndexes: tc.succeededIndexes,
failedIndexes: tc.failedIndexes,
job: newJob(1, 1, 1, batch.IndexedCompletion),
}
got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions)
if diff := cmp.Diff(tc.want, got); diff != "" {
Expand Down
38 changes: 34 additions & 4 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type syncJobCtx struct {
expectedRmFinalizers sets.Set[string]
uncounted *uncountedTerminatedPods
podsWithDelayedDeletionPerIndex map[int]*v1.Pod
terminating *int32
}

// NewController creates a new Job controller that keeps the relevant pods
Expand Down Expand Up @@ -783,11 +784,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if err != nil {
return err
}

var terminating *int32
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
terminating = pointer.Int32(controller.CountTerminatingPods(pods))
}
jobCtx := &syncJobCtx{
job: &job,
pods: pods,
activePods: controller.FilterActivePods(logger, pods),
terminating: terminating,
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
}
Expand Down Expand Up @@ -919,6 +924,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready)
job.Status.Active = active
job.Status.Ready = ready
job.Status.Terminating = jobCtx.terminating
needsStatusUpdate = needsStatusUpdate || !pointer.Int32Equal(job.Status.Terminating, jobCtx.terminating)
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
if err != nil {
return fmt.Errorf("tracking status: %w", err)
Expand Down Expand Up @@ -1453,6 +1460,17 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionPodsDeleted, err
}

var terminating int32 = 0
if onlyReplaceFailedPods(jobCtx.job) {
// For PodFailurePolicy specified but PodRecreationPolicy disabled
// we still need to count terminating pods for replica counts
// But we will not allow updates to status.
if jobCtx.terminating == nil {
terminating = controller.CountTerminatingPods(jobCtx.pods)
} else {
terminating = *jobCtx.terminating
}
}
wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
Expand All @@ -1475,7 +1493,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
}
}

rmAtLeast := active - wantActive
rmAtLeast := active + terminating - wantActive
if rmAtLeast < 0 {
rmAtLeast = 0
}
Expand All @@ -1495,7 +1513,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionPodsDeleted, err
}

if active < wantActive {
if diff := wantActive - terminating - active; diff > 0 {
var remainingTime time.Duration
if !hasBackoffLimitPerIndex(job) {
// we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
Expand All @@ -1505,7 +1523,6 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
diff := wantActive - active
if diff > int32(MaxPodCreateDeletePerSync) {
diff = int32(MaxPodCreateDeletePerSync)
}
Expand Down Expand Up @@ -1797,6 +1814,9 @@ func isPodFailed(p *v1.Pod, job *batch.Job) bool {
if p.Status.Phase == v1.PodFailed {
return true
}
if onlyReplaceFailedPods(job) {
return p.Status.Phase == v1.PodFailed
}
// Count deleted Pods as failures to account for orphan Pods that
// never have a chance to reach the Failed phase.
return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
Expand Down Expand Up @@ -1849,3 +1869,13 @@ func countReadyPods(pods []*v1.Pod) int32 {
}
return cnt
}

// This checks if we should apply PodRecreationPolicy.
// PodRecreationPolicy controls when we recreate pods if they are marked as terminating
// Failed means that we recreate only once the pod has terminated.
func onlyReplaceFailedPods(job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && *job.Spec.PodReplacementPolicy == batch.Failed {
return true
}
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
}
Loading

0 comments on commit 4e8908d

Please sign in to comment.