Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new condition and fail job #2

Closed
wants to merge 18 commits into from
12 changes: 11 additions & 1 deletion pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -795,7 +796,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
deleted, err := jm.deleteActivePods(ctx, &job, activePods)
if uncounted == nil {
// Legacy behavior: pretend all active pods were successfully removed.
deleted = active
if strings.Contains(finishedCondition.Message, "has condition for pending") {
failed = 0
} else {
deleted = active
}
} else if deleted != active || !satisfiedExpectations {
// Can't declare the Job as finished yet, as there might be remaining
// pod finalizers or pods that are not in the informer's cache yet.
Expand Down Expand Up @@ -1742,6 +1747,11 @@ func isPodFailed(p *v1.Pod, job *batch.Job, wFinalizers bool) bool {
// TODO(#113855): Stop limiting this behavior to Jobs with podFailurePolicy.
// For now, we do so to avoid affecting all running Jobs without the
// avaibility to opt-out into the old behavior.
if p.Status.Phase == v1.PodPending {
_, _, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
failJob := batch.PodFailurePolicyActionFailJob
return action != nil && *action == failJob
}
return p.Status.Phase == v1.PodFailed
}
if p.Status.Phase == v1.PodFailed {
Expand Down
203 changes: 203 additions & 0 deletions pkg/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -2912,6 +2913,206 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
wantStatusFailed: 1,
wantStatusSucceeded: 0,
},
"fail job based on OnPodConditions PodFailedToStart for Pending Case": {
enableJobPodFailurePolicy: true,
enablePodDisruptionConditions: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(1),
Completions: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.PodFailedToStart,
Status: v1.ConditionTrue,
},
},
},
},
},
},
},
pods: []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: v1.PodFailedToStart,
Status: v1.ConditionTrue,
},
},
},
},
},
wantConditions: &[]batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "PodFailurePolicy",
Message: "Pod default/mypod-0 has condition for pending FailedToStart matching FailJob rule at index 0",
},
},
wantStatusActive: 0,
wantStatusFailed: 1,
wantStatusSucceeded: 0,
},
"fail job based on OnPodConditions PodHasNetWork for Pending Case": {
enableJobPodFailurePolicy: true,
enablePodDisruptionConditions: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(1),
Completions: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: kubetypes.PodHasNetwork,
Status: v1.ConditionFalse,
},
},
},
},
},
},
},
pods: []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: kubetypes.PodHasNetwork,
Status: v1.ConditionFalse,
},
},
},
},
},
wantConditions: &[]batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "PodFailurePolicy",
Message: "Pod default/mypod-0 has condition for pending PodHasNetwork matching FailJob rule at index 0",
},
},
wantStatusActive: 0,
wantStatusFailed: 1,
wantStatusSucceeded: 0,
},
"Wrong Condition Match for Pending Pod Condition": {
enableJobPodFailurePolicy: true,
enablePodDisruptionConditions: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(1),
Completions: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: kubetypes.PodHasNetwork,
Status: v1.ConditionFalse,
},
},
},
},
},
},
},
pods: []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
},
wantStatusActive: 1,
wantStatusFailed: 0,
wantStatusSucceeded: 0,
},
"fail job based on OnPodConditions PodScheduled for Pending Case": {
enableJobPodFailurePolicy: true,
enablePodDisruptionConditions: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(1),
Completions: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
},
},
},
pods: []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
},
wantConditions: &[]batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "PodFailurePolicy",
Message: "Pod default/mypod-0 has condition for pending PodScheduled matching FailJob rule at index 0",
},
},
wantStatusActive: 0,
wantStatusFailed: 1,
wantStatusSucceeded: 0,
},
"terminating Pod considered failed when PodDisruptionConditions is disabled": {
wFinalizersExclusive: pointer.Bool(true),
enableJobPodFailurePolicy: true,
Expand Down Expand Up @@ -3002,6 +3203,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
}
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodFailedToStartCondition, true)
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodHasNetworkCondition, true)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/job/pod_failure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *
case batch.PodFailurePolicyActionCount:
return nil, true, &count
case batch.PodFailurePolicyActionFailJob:
msg := fmt.Sprintf("Pod %s/%s has condition %v matching %v rule at index %d",
failedPod.Namespace, failedPod.Name, podCondition.Type, podFailurePolicyRule.Action, index)
var msg string = ""
if failedPod.Status.Phase == v1.PodPending {
msg = fmt.Sprintf("Pod %s/%s has condition for pending %v matching %v rule at index %d",
failedPod.Namespace, failedPod.Name, podCondition.Type, podFailurePolicyRule.Action, index)
} else {
msg = fmt.Sprintf("Pod %s/%s has condition %v matching %v rule at index %d",
failedPod.Namespace, failedPod.Name, podCondition.Type, podFailurePolicyRule.Action, index)
}
return &msg, true, &failJob
}
}
Expand Down
91 changes: 91 additions & 0 deletions pkg/controller/job/pod_failure_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/kubernetes/pkg/apis/core/install"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -583,6 +584,66 @@ func TestMatchPodFailurePolicy(t *testing.T) {
wantJobFailureMessage: nil,
wantCountFailed: true,
},
"job fail rule matched for pod conditions in pending due to PodHasNetwork": {
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: kubetypes.PodHasNetwork,
Status: v1.ConditionFalse,
},
},
},
},
},
failedPod: &v1.Pod{
ObjectMeta: validPodObjectMeta,
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: kubetypes.PodHasNetwork,
Status: v1.ConditionFalse,
},
},
},
},
wantJobFailureMessage: pointer.String("Pod default/mypod has condition for pending PodHasNetwork matching FailJob rule at index 0"),
wantCountFailed: true,
wantAction: &failJob,
},
"job fail rule matched for pod conditions in pending due to Scheduled": {
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
},
failedPod: &v1.Pod{
ObjectMeta: validPodObjectMeta,
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
wantJobFailureMessage: pointer.String("Pod default/mypod has condition for pending PodScheduled matching FailJob rule at index 0"),
wantCountFailed: true,
wantAction: &failJob,
},
"job fail rule matched for pod conditions": {
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
Expand Down Expand Up @@ -613,6 +674,36 @@ func TestMatchPodFailurePolicy(t *testing.T) {
wantCountFailed: true,
wantAction: &failJob,
},
"job fail rule different condition on pending": {
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
},
},
failedPod: &v1.Pod{
ObjectMeta: validPodObjectMeta,
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
},
},
wantJobFailureMessage: nil,
wantCountFailed: true,
wantAction: nil,
},
"count rule matched for pod conditions": {
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
Expand Down
Loading