From 60b70e3d0f6022aeafdfd044f9dba21aced77da5 Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Sun, 10 Mar 2024 20:12:49 +0100 Subject: [PATCH] add support for ttl cleanup for finished jobsets --- api/jobset/v1alpha2/jobset_types.go | 11 ++ api/jobset/v1alpha2/openapi_generated.go | 7 + api/jobset/v1alpha2/zz_generated.deepcopy.go | 5 + .../jobset/v1alpha2/jobsetspec.go | 21 ++- .../crd/bases/jobset.x-k8s.io_jobsets.yaml | 12 ++ hack/python-sdk/swagger.json | 5 + pkg/controllers/jobset_controller.go | 114 +++++++++++++- pkg/controllers/jobset_controller_test.go | 142 ++++++++++++++++++ pkg/util/testing/wrappers.go | 6 + sdk/python/docs/JobsetV1alpha2JobSetSpec.md | 1 + .../models/jobset_v1alpha2_job_set_spec.py | 34 ++++- .../test/test_jobset_v1alpha2_job_set.py | 3 +- .../test/test_jobset_v1alpha2_job_set_list.py | 6 +- .../test/test_jobset_v1alpha2_job_set_spec.py | 3 +- .../controller/jobset_controller_test.go | 96 ++++++++++++ 15 files changed, 451 insertions(+), 15 deletions(-) diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index d0a417b4b..f2fe45582 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -94,6 +94,17 @@ type JobSetSpec struct { // Suspend suspends all running child Jobs when set to true. Suspend *bool `json:"suspend,omitempty"` + + // TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished + // execution (either Complete or Failed). If this field is set, + // TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be + // automatically deleted. When the JobSet is being deleted, its lifecycle + // guarantees (e.g. finalizers) will be honored. If this field is unset, + // the JobSet won't be automatically deleted. If this field is set to zero, + // the JobSet becomes eligible to be deleted immediately after it finishes. + // +kubebuilder:validation:Minimum=0 + // +optional + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` } // JobSetStatus defines the observed state of JobSet diff --git a/api/jobset/v1alpha2/openapi_generated.go b/api/jobset/v1alpha2/openapi_generated.go index 3c2c11bce..38800b0f5 100644 --- a/api/jobset/v1alpha2/openapi_generated.go +++ b/api/jobset/v1alpha2/openapi_generated.go @@ -214,6 +214,13 @@ func schema_jobset_api_jobset_v1alpha2_JobSetSpec(ref common.ReferenceCallback) Format: "", }, }, + "ttlSecondsAfterFinished": { + SchemaProps: spec.SchemaProps{ + Description: "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.", + Type: []string{"integer"}, + Format: "int32", + }, + }, }, }, }, diff --git a/api/jobset/v1alpha2/zz_generated.deepcopy.go b/api/jobset/v1alpha2/zz_generated.deepcopy.go index 112890b3b..94eb729d7 100644 --- a/api/jobset/v1alpha2/zz_generated.deepcopy.go +++ b/api/jobset/v1alpha2/zz_generated.deepcopy.go @@ -131,6 +131,11 @@ func (in *JobSetSpec) DeepCopyInto(out *JobSetSpec) { *out = new(bool) **out = **in } + if in.TTLSecondsAfterFinished != nil { + in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSetSpec. diff --git a/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go b/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go index 670796529..86f7a2e5b 100644 --- a/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go +++ b/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go @@ -17,12 +17,13 @@ package v1alpha2 // JobSetSpecApplyConfiguration represents an declarative configuration of the JobSetSpec type for use // with apply. type JobSetSpecApplyConfiguration struct { - ReplicatedJobs []ReplicatedJobApplyConfiguration `json:"replicatedJobs,omitempty"` - Network *NetworkApplyConfiguration `json:"network,omitempty"` - SuccessPolicy *SuccessPolicyApplyConfiguration `json:"successPolicy,omitempty"` - FailurePolicy *FailurePolicyApplyConfiguration `json:"failurePolicy,omitempty"` - StartupPolicy *StartupPolicyApplyConfiguration `json:"startupPolicy,omitempty"` - Suspend *bool `json:"suspend,omitempty"` + ReplicatedJobs []ReplicatedJobApplyConfiguration `json:"replicatedJobs,omitempty"` + Network *NetworkApplyConfiguration `json:"network,omitempty"` + SuccessPolicy *SuccessPolicyApplyConfiguration `json:"successPolicy,omitempty"` + FailurePolicy *FailurePolicyApplyConfiguration `json:"failurePolicy,omitempty"` + StartupPolicy *StartupPolicyApplyConfiguration `json:"startupPolicy,omitempty"` + Suspend *bool `json:"suspend,omitempty"` + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` } // JobSetSpecApplyConfiguration constructs an declarative configuration of the JobSetSpec type for use with @@ -83,3 +84,11 @@ func (b *JobSetSpecApplyConfiguration) WithSuspend(value bool) *JobSetSpecApplyC b.Suspend = &value return b } + +// WithTTLSecondsAfterFinished sets the TTLSecondsAfterFinished field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the TTLSecondsAfterFinished field is set to the value of the last call. +func (b *JobSetSpecApplyConfiguration) WithTTLSecondsAfterFinished(value int32) *JobSetSpecApplyConfiguration { + b.TTLSecondsAfterFinished = &value + return b +} diff --git a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml index 33f532f67..be7dae213 100644 --- a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml +++ b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml @@ -8331,6 +8331,18 @@ spec: suspend: description: Suspend suspends all running child Jobs when set to true. type: boolean + ttlSecondsAfterFinished: + description: |- + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished + execution (either Complete or Failed). If this field is set, + TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be + automatically deleted. When the JobSet is being deleted, its lifecycle + guarantees (e.g. finalizers) will be honored. If this field is unset, + the JobSet won't be automatically deleted. If this field is set to zero, + the JobSet becomes eligible to be deleted immediately after it finishes. + format: int32 + minimum: 0 + type: integer type: object status: description: JobSetStatus defines the observed state of JobSet diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index a24816955..b8740be60 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -106,6 +106,11 @@ "suspend": { "description": "Suspend suspends all running child Jobs when set to true.", "type": "boolean" + }, + "ttlSecondsAfterFinished": { + "description": "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.", + "type": "integer", + "format": "int32" } } }, diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index e15b9ccc3..b36ca3313 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -21,6 +21,9 @@ import ( "fmt" "strconv" "sync" + "time" + + "k8s.io/utils/clock" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -55,6 +58,7 @@ type JobSetReconciler struct { client.Client Scheme *runtime.Scheme Record record.EventRecorder + clock clock.Clock } type childJobs struct { @@ -69,7 +73,7 @@ type childJobs struct { } func NewJobSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *JobSetReconciler { - return &JobSetReconciler{Client: client, Scheme: scheme, Record: record} + return &JobSetReconciler{Client: client, Scheme: scheme, Record: record, clock: clock.RealClock{}} } //+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch @@ -114,8 +118,26 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } - // If JobSet is already completed or failed, clean up active child jobs. + // If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set. if jobSetFinished(&js) { + if js.Spec.TTLSecondsAfterFinished != nil { + expired, err := r.checkIfTTLExpired(ctx, &js) + if err != nil { + log.Error(err, "checking if TTL expired") + return ctrl.Result{}, err + } + // if expired is true, that means the TTL has expired, and we should delete the JobSet + // otherwise, we requeue it for the remaining TTL duration. + if expired { + log.V(5).Info("JobSet TTL expired") + if err := r.deleteJobSet(ctx, &js); err != nil { + log.Error(err, "deleting jobset") + return ctrl.Result{}, err + } + } else { + return ctrl.Result{RequeueAfter: requeueJobSetAfter(&js)}, nil + } + } if err := r.deleteJobs(ctx, ownedJobs.active); err != nil { log.Error(err, "deleting jobs") return ctrl.Result{}, err @@ -916,3 +938,91 @@ func findReplicatedStatus(replicatedJobStatus []jobset.ReplicatedJobStatus, repl } return jobset.ReplicatedJobStatus{} } + +func (r *JobSetReconciler) deleteJobSet(ctx context.Context, js *jobset.JobSet) error { + log := ctrl.LoggerFrom(ctx) + + policy := metav1.DeletePropagationForeground + options := []client.DeleteOption{client.PropagationPolicy(policy)} + log.V(2).Info("Cleaning up JobSet", "jobset", klog.KObj(js)) + + return r.Delete(ctx, js, options...) +} + +// checkIfTTLExpired checks whether a given JobSet's TTL has expired. +func (r *JobSetReconciler) checkIfTTLExpired(ctx context.Context, jobSet *jobset.JobSet) (bool, error) { + // We don't care about the JobSets that are going to be deleted + if jobSet.DeletionTimestamp != nil { + return false, nil + } + + now := r.clock.Now() + remaining, err := timeLeft(ctx, jobSet, &now) + if err != nil { + return false, err + } + + // TTL has expired + ttlExpired := remaining != nil && *remaining <= 0 + return ttlExpired, nil +} + +// timeLeft returns the time left until the JobSet's TTL expires and the time when it will expire. +func timeLeft(ctx context.Context, js *jobset.JobSet, now *time.Time) (*time.Duration, error) { + log := ctrl.LoggerFrom(ctx) + + finishAt, expireAt, err := getJobSetFinishAndExpireTime(js) + if err != nil { + return nil, err + } + // The following 2 checks do sanity checking for nil pointers in case of changes to the above function. + // This logic should never be executed. + if now == nil || finishAt == nil || expireAt == nil { + log.V(2).Info("Warning: Calculated invalid expiration time. JobSet cleanup will be deferred.") + return nil, nil + } + + if finishAt.After(*now) { + log.V(2).Info("Warning: Found JobSet finished in the future. This is likely due to time skew in the cluster. JobSet cleanup will be deferred.") + } + remaining := expireAt.Sub(*now) + log.V(2).Info("Found JobSet finished", "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", now.UTC(), "deadlineTTL", expireAt.UTC()) + return &remaining, nil +} + +func getJobSetFinishAndExpireTime(js *jobset.JobSet) (finishAt, expireAt *time.Time, err error) { + finishTime, err := jobSetFinishTime(js) + if err != nil { + return nil, nil, err + } + + finishAt = &finishTime.Time + expiration := finishAt.Add(time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second) + expireAt = ptr.To(expiration) + return finishAt, expireAt, nil +} + +// jobSetFinishTime takes an already finished JobSet and returns the time it finishes. +func jobSetFinishTime(finishedJobSet *jobset.JobSet) (metav1.Time, error) { + for _, c := range finishedJobSet.Status.Conditions { + if (c.Type == string(jobset.JobSetCompleted) || c.Type == string(jobset.JobSetFailed)) && c.Status == metav1.ConditionTrue { + finishAt := c.LastTransitionTime + if finishAt.IsZero() { + return metav1.Time{}, fmt.Errorf("unable to find the time when the JobSet %s/%s finished", finishedJobSet.Namespace, finishedJobSet.Name) + } + return finishAt, nil + } + } + + // This should never happen if the JobSets have finished + return metav1.Time{}, fmt.Errorf("unable to find the status of the finished JobSet %s/%s", finishedJobSet.Namespace, finishedJobSet.Name) +} + +// requeueJobSetAfter returns the duration after which the JobSet should be requeued if TTLSecondsAfterFinished is set, otherwise returns 0. +func requeueJobSetAfter(js *jobset.JobSet) time.Duration { + var requeueAfter time.Duration = 0 + if js.Spec.TTLSecondsAfterFinished != nil { + requeueAfter = time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second + } + return requeueAfter +} diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 4922e21d0..63095378e 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -16,13 +16,17 @@ package controllers import ( "context" "strconv" + "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2/ktesting" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/fake" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" @@ -1106,6 +1110,92 @@ func TestCalculateReplicatedJobStatuses(t *testing.T) { } } +func TestTimeLeft(t *testing.T) { + now := metav1.Now() + + tests := []struct { + name string + completionTime metav1.Time + failedTime metav1.Time + ttl *int32 + since *time.Time + expectErr bool + expectErrStr string + expectedTimeLeft *time.Duration + }{ + { + name: "jobset completed now, nil since", + completionTime: now, + ttl: ptr.To[int32](0), + since: nil, + }, + { + name: "jobset completed now, 0s TTL", + completionTime: now, + ttl: ptr.To[int32](0), + since: &now.Time, + expectedTimeLeft: ptr.To(0 * time.Second), + }, + { + name: "jobset completed now, 10s TTL", + completionTime: now, + ttl: ptr.To[int32](10), + since: &now.Time, + expectedTimeLeft: ptr.To(10 * time.Second), + }, + { + name: "jobset completed 10s ago, 15s TTL", + completionTime: metav1.NewTime(now.Add(-10 * time.Second)), + ttl: ptr.To[int32](15), + since: &now.Time, + expectedTimeLeft: ptr.To(5 * time.Second), + }, + { + name: "jobset failed now, 0s TTL", + failedTime: now, + ttl: ptr.To[int32](0), + since: &now.Time, + expectedTimeLeft: ptr.To(0 * time.Second), + }, + { + name: "jobset failed now, 10s TTL", + failedTime: now, + ttl: ptr.To[int32](10), + since: &now.Time, + expectedTimeLeft: ptr.To(10 * time.Second), + }, + { + name: "jobset failed 10s ago, 15s TTL", + failedTime: metav1.NewTime(now.Add(-10 * time.Second)), + ttl: ptr.To[int32](15), + since: &now.Time, + expectedTimeLeft: ptr.To(5 * time.Second), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + jobSet := newJobSet(tc.completionTime, tc.failedTime, tc.ttl) + _, ctx := ktesting.NewTestContext(t) + gotTimeLeft, gotErr := timeLeft(ctx, jobSet, tc.since) + if tc.expectErr != (gotErr != nil) { + t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr) + } + if tc.expectErr && len(tc.expectErrStr) == 0 { + t.Errorf("%s: invalid test setup; error message must not be empty for error cases", tc.name) + } + if tc.expectErr && !strings.Contains(gotErr.Error(), tc.expectErrStr) { + t.Errorf("%s: expected error message contains %q, got %v", tc.name, tc.expectErrStr, gotErr) + } + if !tc.expectErr { + if gotTimeLeft != nil && *gotTimeLeft != *tc.expectedTimeLeft { + t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft) + } + } + }) + } +} + type makeJobArgs struct { jobSetName string replicatedJobName string @@ -1150,3 +1240,55 @@ func makeJob(args *makeJobArgs) *testutils.JobWrapper { PodAnnotations(annotations) return jobWrapper } + +func newJobSet(completionTime, failedTime metav1.Time, ttl *int32) *jobset.JobSet { + js := &jobset.JobSet{ + TypeMeta: metav1.TypeMeta{Kind: "JobSet"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "foobar", + Namespace: metav1.NamespaceDefault, + }, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "foobar-job", + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if !completionTime.IsZero() { + c := metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue, LastTransitionTime: completionTime} + js.Status.Conditions = append(js.Status.Conditions, c) + } + + if !failedTime.IsZero() { + c := metav1.Condition{Type: string(jobset.JobSetFailed), Status: metav1.ConditionTrue, LastTransitionTime: failedTime} + js.Status.Conditions = append(js.Status.Conditions, c) + } + + if ttl != nil { + js.Spec.TTLSecondsAfterFinished = ttl + } + + return js +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 0946ef901..9b2fd450f 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -121,6 +121,12 @@ func (j *JobSetWrapper) EnableDNSHostnames(val bool) *JobSetWrapper { return j } +// TTLSecondsAfterFinished sets the value of JobSet.Spec.TTLSecondsAfterFinished +func (j *JobSetWrapper) TTLSecondsAfterFinished(seconds int32) *JobSetWrapper { + j.Spec.TTLSecondsAfterFinished = &seconds + return j +} + // ReplicatedJobWrapper wraps a ReplicatedJob. type ReplicatedJobWrapper struct { jobset.ReplicatedJob diff --git a/sdk/python/docs/JobsetV1alpha2JobSetSpec.md b/sdk/python/docs/JobsetV1alpha2JobSetSpec.md index 01102c616..5337b0c1d 100644 --- a/sdk/python/docs/JobsetV1alpha2JobSetSpec.md +++ b/sdk/python/docs/JobsetV1alpha2JobSetSpec.md @@ -10,6 +10,7 @@ Name | Type | Description | Notes **startup_policy** | [**JobsetV1alpha2StartupPolicy**](JobsetV1alpha2StartupPolicy.md) | | [optional] **success_policy** | [**JobsetV1alpha2SuccessPolicy**](JobsetV1alpha2SuccessPolicy.md) | | [optional] **suspend** | **bool** | Suspend suspends all running child Jobs when set to true. | [optional] +**ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py b/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py index 7f4ca82e4..b1fed064b 100644 --- a/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py +++ b/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py @@ -38,7 +38,8 @@ class JobsetV1alpha2JobSetSpec(object): 'replicated_jobs': 'list[JobsetV1alpha2ReplicatedJob]', 'startup_policy': 'JobsetV1alpha2StartupPolicy', 'success_policy': 'JobsetV1alpha2SuccessPolicy', - 'suspend': 'bool' + 'suspend': 'bool', + 'ttl_seconds_after_finished': 'int' } attribute_map = { @@ -47,10 +48,11 @@ class JobsetV1alpha2JobSetSpec(object): 'replicated_jobs': 'replicatedJobs', 'startup_policy': 'startupPolicy', 'success_policy': 'successPolicy', - 'suspend': 'suspend' + 'suspend': 'suspend', + 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, failure_policy=None, network=None, replicated_jobs=None, startup_policy=None, success_policy=None, suspend=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, failure_policy=None, network=None, replicated_jobs=None, startup_policy=None, success_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """JobsetV1alpha2JobSetSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -62,6 +64,7 @@ def __init__(self, failure_policy=None, network=None, replicated_jobs=None, star self._startup_policy = None self._success_policy = None self._suspend = None + self._ttl_seconds_after_finished = None self.discriminator = None if failure_policy is not None: @@ -76,6 +79,8 @@ def __init__(self, failure_policy=None, network=None, replicated_jobs=None, star self.success_policy = success_policy if suspend is not None: self.suspend = suspend + if ttl_seconds_after_finished is not None: + self.ttl_seconds_after_finished = ttl_seconds_after_finished @property def failure_policy(self): @@ -207,6 +212,29 @@ def suspend(self, suspend): self._suspend = suspend + @property + def ttl_seconds_after_finished(self): + """Gets the ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. # noqa: E501 + + :return: The ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + :rtype: int + """ + return self._ttl_seconds_after_finished + + @ttl_seconds_after_finished.setter + def ttl_seconds_after_finished(self, ttl_seconds_after_finished): + """Sets the ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. + + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. # noqa: E501 + + :param ttl_seconds_after_finished: The ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + :type: int + """ + + self._ttl_seconds_after_finished = ttl_seconds_after_finished + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set.py b/sdk/python/test/test_jobset_v1alpha2_job_set.py index 0719e3a6a..bf0cf5852 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set.py @@ -60,7 +60,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py index 5d4681583..740c27ad5 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py @@ -63,7 +63,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None @@ -108,7 +109,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py index 7fc988784..71be9b5b5 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py @@ -56,7 +56,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True + suspend = True, + ttl_seconds_after_finished = 56 ) else : return JobsetV1alpha2JobSetSpec( diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index 242846678..ffde81363 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -1223,6 +1223,82 @@ var _ = ginkgo.Describe("JobSet controller", func() { }, timeout, interval).Should(gomega.Succeed()) }) }) + + ginkgo.When("A JobSet is created with TTLSecondsAfterFinished configured and reaches terminal state", func() { + ginkgo.It("JobSet controller should delete it after configured ttl duration passes", func() { + // Create test namespace for each entry. + ns1 := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "jobset-ns-", + }, + } + ns2 := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "jobset-ns-", + }, + } + + gomega.Expect(k8sClient.Create(ctx, ns1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, ns2)).To(gomega.Succeed()) + + defer func() { + gomega.Expect(testutil.DeleteNamespace(ctx, k8sClient, ns1)).To(gomega.Succeed()) + gomega.Expect(testutil.DeleteNamespace(ctx, k8sClient, ns2)).To(gomega.Succeed()) + }() + // Create JobSet. + js1 := testJobSet(ns1).TTLSecondsAfterFinished(2).Obj() + js2 := testJobSet(ns2).TTLSecondsAfterFinished(2).Obj() + + // Verify jobsets created successfully. + ginkgo.By(fmt.Sprintf("creating jobSet %s/%s", js1.Name, js1.Namespace)) + gomega.Eventually(k8sClient.Create(ctx, js1), timeout, interval).Should(gomega.Succeed()) + ginkgo.By(fmt.Sprintf("creating jobSet %s/%s", js2.Name, js2.Namespace)) + gomega.Eventually(k8sClient.Create(ctx, js2), timeout, interval).Should(gomega.Succeed()) + + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js1).Should(gomega.Equal(testutil.NumExpectedJobs(js1))) + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js2).Should(gomega.Equal(testutil.NumExpectedJobs(js2))) + + // Fetch updated job objects, so we always have the latest resource versions to perform mutations on. + var jobList batchv1.JobList + gomega.Expect(k8sClient.List(ctx, &jobList, client.InNamespace(js1.Namespace))).Should(gomega.Succeed()) + gomega.Expect(len(jobList.Items)).To(gomega.Equal(testutil.NumExpectedJobs(js1))) + completeAllJobs(&jobList) + gomega.Expect(k8sClient.List(ctx, &jobList, client.InNamespace(js2.Namespace))).Should(gomega.Succeed()) + gomega.Expect(len(jobList.Items)).To(gomega.Equal(testutil.NumExpectedJobs(js2))) + failJob(&jobList.Items[0]) + + // Verify jobset is marked as completed. + testutil.JobSetCompleted(ctx, k8sClient, js1, timeout) + testutil.JobSetFailed(ctx, k8sClient, js2, timeout) + + // Verify active jobs have not been deleted if ttl has not passed. + checkJobsDeletionTimestamp(js2, false, testutil.NumExpectedJobs(js2)) + + // Verify jobset has not been deleted if ttl has not passed. + ginkgo.By("checking that jobset has not been deleted before configured seconds pass") + var fresh1, fresh2 jobset.JobSet + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(js1), &fresh1)).To(gomega.Succeed()) + gomega.Expect(fresh1.DeletionTimestamp).To(gomega.BeNil()) + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(js2), &fresh2)).To(gomega.Succeed()) + gomega.Expect(fresh2.DeletionTimestamp).To(gomega.BeNil()) + + // Verify active jobs have been deleted after ttl has passed. + checkJobsDeletionTimestamp(js2, true, testutil.NumExpectedJobs(js2)-1) + + // Verify jobset has been deleted after ttl has passed. + ginkgo.By("checking that ttl after finished controller deletes jobset after configured seconds pass") + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(js1), &fresh1); err != nil { + return false + } + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(js2), &fresh2); err != nil { + return false + } + return !fresh1.DeletionTimestamp.IsZero() && !fresh2.DeletionTimestamp.IsZero() + }, timeout, interval).Should(gomega.BeTrue()) + }) + }) }) // end of Describe func makeAllJobsReady(jl *batchv1.JobList) { @@ -1497,6 +1573,26 @@ func checkNoActiveJobs(js *jobset.JobSet, numFinishedJobs int) { }, timeout, interval).Should(gomega.Equal(true)) } +// Check that the jobs' deletion timestamp is set or not set for the provided number of jobs. +func checkJobsDeletionTimestamp(js *jobset.JobSet, set bool, numJobs int) { + ginkgo.By(fmt.Sprintf("checking that jobset jobs deletion timestamp status is %t", set)) + gomega.Eventually(func() (bool, error) { + var jobList batchv1.JobList + if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil { + return false, err + } + numJobs := numJobs + for _, job := range jobList.Items { + deletionTimestampExpected := set && job.DeletionTimestamp != nil + deletionTimestampNotExpected := !set && job.DeletionTimestamp == nil + if deletionTimestampExpected || deletionTimestampNotExpected { + numJobs-- + } + } + return numJobs == 0, nil + }, timeout, interval).Should(gomega.Equal(true)) +} + func jobActive(job *batchv1.Job) bool { // Jobs marked for deletion using foreground cascading deletion will have deletion timestamp set, // but will still exist until dependent objects with ownerReference.blockOwnerDeletion=true set are deleted.