Skip to content

Commit

Permalink
add podReplacementPolicy and terminating field to job api
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Jul 19, 2023
1 parent dde22b3 commit ce92952
Show file tree
Hide file tree
Showing 31 changed files with 715 additions and 159 deletions.
9 changes: 9 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions api/openapi-spec/v3/apis__batch__v1_openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@
],
"description": "Specifies the policy of handling failed pods. In particular, it allows to specify the set of actions and conditions which need to be satisfied to take the associated action. If empty, the default behaviour applies - the counter of failed pods, represented by the jobs's .status.failed field, is incremented and it is checked against the backoffLimit. This field cannot be used in combination with restartPolicy=OnFailure.\n\nThis field is beta-level. It can be used when the `JobPodFailurePolicy` feature gate is enabled (enabled by default)."
},
"podReplacementPolicy": {
"description": "podReplacementPolicy specifies when to create replacement Pods. Possible values are: - TerminatingOrFailed means that we recreate pods\n when they are terminating (has a metadata.deletionTimestamp) or failed.\n- Failed means to wait until a previously created Pod is fully terminated (has phase\n Failed or Succeeded) before creating a replacement Pod.\n\nWhen using podFailurePolicy, Failed is the the only allowed value. TerminatingOrFailed and Failed are allowed values when podFailurePolicy is not in use. This is an alpha field. Enable JobPodReplacementPolicy to be able to use this field.",
"type": "string"
},
"selector": {
"allOf": [
{
Expand Down Expand Up @@ -462,6 +466,11 @@
"format": "int32",
"type": "integer"
},
"terminating": {
"description": "The number of pods which are terminating (in phase Pending or Running and have a deletionTimestamp).\n\nThis field is alpha-level. The job controller populates the field when the feature gate JobPodReplacementPolicy is enabled (disabled by default).",
"format": "int32",
"type": "integer"
},
"uncountedTerminatedPods": {
"allOf": [
{
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/batch/fuzzer/fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ var Funcs = func(codecs runtimeserializer.CodecFactory) []interface{} {
// We're fuzzing the internal JobSpec type, not the v1 type, so we don't
// need to fuzz the nil value.
j.Suspend = pointer.Bool(c.RandBool())
podReplacementPolicy := batch.TerminatingOrFailed
if c.RandBool() {
podReplacementPolicy = batch.Failed
}
j.PodReplacementPolicy = &podReplacementPolicy
},
func(sj *batch.CronJobSpec, c fuzz.Continue) {
c.FuzzNoCustom(sj)
Expand Down
34 changes: 34 additions & 0 deletions pkg/apis/batch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,19 @@ const (
PodFailurePolicyOnExitCodesOpNotIn PodFailurePolicyOnExitCodesOperator = "NotIn"
)

// PodReplacementPolicy specifies the policy for creating pod replacements.
// +enum
type PodReplacementPolicy string

const (
// TerminatingOrFailed means that we recreate pods
// when they are terminating (has a metadata.deletionTimestamp) or failed.
TerminatingOrFailed PodReplacementPolicy = "TerminatingOrFailed"
//Failed means to wait until a previously created Pod is fully terminated (has phase
//Failed or Succeeded) before creating a replacement Pod.
Failed PodReplacementPolicy = "Failed"
)

// PodFailurePolicyOnExitCodesRequirement describes the requirement for handling
// a failed pod based on its container exit codes. In particular, it lookups the
// .state.terminated.exitCode for each app container and init container status,
Expand Down Expand Up @@ -381,6 +394,19 @@ type JobSpec struct {
//
// +optional
Suspend *bool

// podReplacementPolicy specifies when to create replacement Pods.
// Possible values are:
// - TerminatingOrFailed means that we recreate pods
// when they are terminating (has a metadata.deletionTimestamp) or failed.
// - Failed means to wait until a previously created Pod is fully terminated (has phase
// Failed or Succeeded) before creating a replacement Pod.
//
// When using podFailurePolicy, Failed is the the only allowed value.
// TerminatingOrFailed and Failed are allowed values when podFailurePolicy is not in use.
// This is an alpha field. Enable JobPodReplacementPolicy to be able to use this field.
// +optional
PodReplacementPolicy *PodReplacementPolicy
}

// JobStatus represents the current state of a Job.
Expand Down Expand Up @@ -413,6 +439,14 @@ type JobStatus struct {
// +optional
Active int32

// The number of pods which are terminating (in phase Pending or Running
// and have a deletionTimestamp).
//
// This field is alpha-level. The job controller populates the field when
// the feature gate JobPodReplacementPolicy is enabled (disabled by default).
// +optional
Terminating *int32

// The number of active pods which have a Ready condition.
//
// This field is beta-level. The job controller populates the field when
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/batch/v1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -68,6 +70,15 @@ func SetDefaults_Job(obj *batchv1.Job) {
}
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
if obj.Spec.PodReplacementPolicy == nil {
if obj.Spec.PodFailurePolicy != nil {
obj.Spec.PodReplacementPolicy = podReplacementPolicyPtr(batchv1.Failed)
} else {
obj.Spec.PodReplacementPolicy = podReplacementPolicyPtr(batchv1.TerminatingOrFailed)
}
}
}
}

func SetDefaults_CronJob(obj *batchv1.CronJob) {
Expand All @@ -84,3 +95,7 @@ func SetDefaults_CronJob(obj *batchv1.CronJob) {
obj.Spec.FailedJobsHistoryLimit = utilpointer.Int32(1)
}
}

func podReplacementPolicyPtr(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
return &obj
}
126 changes: 103 additions & 23 deletions pkg/apis/batch/v1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"

. "k8s.io/kubernetes/pkg/apis/batch/v1"
Expand All @@ -40,9 +43,10 @@ func TestSetDefaultJob(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
}
tests := map[string]struct {
original *batchv1.Job
expected *batchv1.Job
expectLabels bool
original *batchv1.Job
expected *batchv1.Job
expectLabels bool
enablePodReplacementPolicy bool
}{
"Pod failure policy with some field values unspecified -> set default values": {
original: &batchv1.Job{
Expand Down Expand Up @@ -135,6 +139,70 @@ func TestSetDefaultJob(t *testing.T) {
},
expectLabels: true,
},
"Pod failure policy and defaulting for pod replacement policy": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1},
},
},
},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(1),
Parallelism: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.Failed),
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1},
},
},
},
},
},
},
expectLabels: true,
enablePodReplacementPolicy: true,
},
"All unspecified and podReplacementPolicyEnabled -> sets all to default values": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(1),
Parallelism: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.TerminatingOrFailed),
},
},
expectLabels: true,
enablePodReplacementPolicy: true,
},
"All unspecified -> sets all to default values": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Expand Down Expand Up @@ -295,23 +363,25 @@ func TestSetDefaultJob(t *testing.T) {
"All set -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
Expand All @@ -322,23 +392,25 @@ func TestSetDefaultJob(t *testing.T) {
"All set, flipped -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
PodReplacementPolicy: podReplacementPtr(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
PodReplacementPolicy: podReplacementPtr(batchv1.Failed),
},
},
expectLabels: true,
Expand Down Expand Up @@ -396,6 +468,7 @@ func TestSetDefaultJob(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enablePodReplacementPolicy)()
original := test.original
expected := test.expected
obj2 := roundTrip(t, runtime.Object(original))
Expand Down Expand Up @@ -424,6 +497,9 @@ func TestSetDefaultJob(t *testing.T) {
if diff := cmp.Diff(expected.Spec.CompletionMode, actual.Spec.CompletionMode); diff != "" {
t.Errorf("Unexpected CompletionMode (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(expected.Spec.PodReplacementPolicy, actual.Spec.PodReplacementPolicy); diff != "" {
t.Errorf("Unexpected PodReplacementPolicy (-want,+got):\n%s", diff)
}
})
}
}
Expand Down Expand Up @@ -522,3 +598,7 @@ func TestSetDefaultCronJob(t *testing.T) {
func completionModePtr(m batchv1.CompletionMode) *batchv1.CompletionMode {
return &m
}

func podReplacementPtr(m batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
return &m
}
4 changes: 4 additions & 0 deletions pkg/apis/batch/v1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ce92952

Please sign in to comment.