From e38ab6d3671e7375baf39860cda514ca1e01916e Mon Sep 17 00:00:00 2001 From: kannon92 Date: Sun, 16 Jul 2023 23:46:30 +0000 Subject: [PATCH] Add PodGC changes for PodReplacementPolicy --- pkg/controller/podgc/gc_controller.go | 8 ++++++-- pkg/features/kube_features.go | 10 +++++++++ test/integration/podgc/podgc_test.go | 29 +++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index d4fe227ac06c8..81288dfd6ae9c 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -344,7 +344,11 @@ func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1. func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *corev1apply.PodConditionApplyConfiguration) error { logger := klog.FromContext(ctx) logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod)) - if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + // Patch the pod to make sure it is transitioned to the Failed phase before deletion. + // This is needed for the JobPodReplacementPolicy feature to make sure Job replacement pods are created. + // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/3939-allow-replacement-when-fully-terminated#risks-and-mitigations + // for more details. + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) || utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { // Mark the pod as failed - this is especially important in case the pod // is orphaned, in which case the pod would remain in the Running phase @@ -357,7 +361,7 @@ func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Cont // PodGC it means that it is in the Failed phase, so sending the // condition will not be re-attempted. podApply.Status.WithPhase(v1.PodFailed) - if condition != nil { + if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { podApply.Status.WithConditions(condition) } if _, err := gcc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 18e15c8688654..8ed07e074c727 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -382,6 +382,14 @@ const ( // that have never been unsuspended before. JobMutableNodeSchedulingDirectives featuregate.Feature = "JobMutableNodeSchedulingDirectives" + // owner: @kannon92 + // kep : https://kep.k8s.io/3939 + // alpha: v1.28 + // + // Allow users to specify recreating pods of a job only when + // pods have fully terminated. + JobPodReplacementPolicy featuregate.Feature = "JobPodReplacementPolicy" + // owner: @mimowo // kep: https://kep.k8s.io/3329 // alpha: v1.25 @@ -994,6 +1002,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS JobPodFailurePolicy: {Default: true, PreRelease: featuregate.Beta}, + JobPodReplacementPolicy: {Default: false, PreRelease: featuregate.Alpha}, + JobReadyPods: {Default: true, PreRelease: featuregate.Beta}, JobTrackingWithFinalizers: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28 diff --git a/test/integration/podgc/podgc_test.go b/test/integration/podgc/podgc_test.go index 8356d75334738..799f7f262737f 100644 --- a/test/integration/podgc/podgc_test.go +++ b/test/integration/podgc/podgc_test.go @@ -41,6 +41,7 @@ import ( func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) { tests := map[string]struct { enablePodDisruptionConditions bool + enableJobPodReplacementPolicy bool phase v1.PodPhase wantPhase v1.PodPhase wantDisruptionTarget *v1.PodCondition @@ -56,6 +57,24 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) { Message: "PodGC: node no longer exists", }, }, + "PodDisruptionConditions and PodReplacementPolicy enabled": { + enablePodDisruptionConditions: true, + enableJobPodReplacementPolicy: true, + phase: v1.PodPending, + wantPhase: v1.PodFailed, + wantDisruptionTarget: &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByPodGC", + Message: "PodGC: node no longer exists", + }, + }, + "Only PodReplacementPolicy enabled; no PodDisruptionCondition": { + enablePodDisruptionConditions: false, + enableJobPodReplacementPolicy: true, + phase: v1.PodPending, + wantPhase: v1.PodFailed, + }, "PodDisruptionConditions disabled": { enablePodDisruptionConditions: false, phase: v1.PodPending, @@ -76,6 +95,7 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enableJobPodReplacementPolicy)() testCtx := setup(t, "podgc-orphaned") cs := testCtx.ClientSet @@ -151,6 +171,7 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) { func TestTerminatingOnOutOfServiceNode(t *testing.T) { tests := map[string]struct { enablePodDisruptionConditions bool + enableJobPodReplacementPolicy bool withFinalizer bool wantPhase v1.PodPhase }{ @@ -172,11 +193,19 @@ func TestTerminatingOnOutOfServiceNode(t *testing.T) { enablePodDisruptionConditions: false, withFinalizer: false, }, + "pod has phase changed when PodDisruptionConditions disabled, but JobPodReplacementPolicy enabled": { + enablePodDisruptionConditions: false, + enableJobPodReplacementPolicy: true, + withFinalizer: true, + wantPhase: v1.PodFailed, + }, } for name, test := range tests { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enableJobPodReplacementPolicy)() testCtx := setup(t, "podgc-out-of-service") cs := testCtx.ClientSet