Skip to content

Commit

Permalink
feat: add jobset status
Browse files Browse the repository at this point in the history
feat: add jobset status
  • Loading branch information
googs1025 committed Jun 6, 2024
1 parent 6c83907 commit 7c00e55
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 10 deletions.
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type JobSetConditionType string

// These are built-in conditions of a JobSet.
const (
// JobSetRunning means the job is running.
JobSetRunning JobSetConditionType = "Running"
// JobSetCompleted means the job has completed its execution.
JobSetCompleted JobSetConditionType = "Completed"
// JobSetFailed means the job has failed its execution.
Expand Down Expand Up @@ -134,6 +136,10 @@ type JobSetStatus struct {
// RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.
RestartsCountTowardsMax int32 `json:"restartsCountTowardsMax,omitempty"`

// Phase of the JobSet.
// +kubebuilder:default="Running"
Phase string `json:"phase,omitempty"`

// ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.
// +optional
// +listType=map
Expand Down Expand Up @@ -169,6 +175,7 @@ type ReplicatedJobStatus struct {
// +k8s:openapi-gen=true
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Phase",JSONPath=".status.phase",type=string,description="Phase of the JobSet"
// +kubebuilder:printcolumn:name="Restarts",JSONPath=".status.restarts",type=string,description="Number of restarts"
// +kubebuilder:printcolumn:name="Completed",type="string",priority=0,JSONPath=".status.conditions[?(@.type==\"Completed\")].status"
// +kubebuilder:printcolumn:name="Suspended",type="string",JSONPath=".spec.suspend",description="JobSet suspended"
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Phase of the JobSet
jsonPath: .status.phase
name: Phase
type: string
- description: Number of restarts
jsonPath: .status.restarts
name: Restarts
Expand Down Expand Up @@ -8547,6 +8551,9 @@ spec:
of restarts.
format: int32
type: integer
status:
description: Status of the JobSet.
type: string
type: object
type: object
served: true
Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@
],
"x-kubernetes-list-type": "map"
},
"phase": {
"description": "Phase of the JobSet.",
"type": "string"
},
"replicatedJobsStatus": {
"description": "ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.",
"type": "array",
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/failure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,13 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts {
Reason: reason,
Message: msg,
},
phase: string(jobset.JobSetFailed),
eventType: corev1.EventTypeWarning,
}
}

// setJobSetFailedCondition sets a condition on the JobSet status indicating it has failed.
func setJobSetFailedCondition(ctx context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) {
func setJobSetFailedCondition(_ context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) {
setCondition(js, makeFailedConditionOpts(reason, msg), updateStatusOpts)
}

Expand Down
23 changes: 17 additions & 6 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js
// executeSuccessPolicy checks the completed jobs against the jobset success policy
// and updates the jobset status to completed if the success policy conditions are met.
// Returns a boolean value indicating if the jobset was completed or not.
func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
func executeSuccessPolicy(_ context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) {
setJobSetCompletedCondition(js, updateStatusOpts)
return true
Expand Down Expand Up @@ -867,14 +867,15 @@ func enqueueEvent(updateStatusOpts *statusUpdateOpts, event *eventParams) {
// function parameters for setCondition
type conditionOpts struct {
eventType string
phase string
condition *metav1.Condition
}

// setCondition will add a new condition to the JobSet status (or update an existing one),
// setCondition will add a new condition and phase to the JobSet status (or update an existing one),
// and enqueue an event for emission if the status update succeeds at the end of the reconcile.
func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) {
// Return early if no status update is required for this condition.
if !updateCondition(js, condOpts) {
// Return early if no status update is required for this condition and phase.
if !updateConditionAndPhase(js, condOpts) {
return
}

Expand All @@ -897,12 +898,12 @@ func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *
enqueueEvent(updateStatusOpts, event)
}

// updateCondition accepts a given condition and does one of the following:
// updateConditionAndPhase accepts a given condition and does one of the following:
// 1. If an identical condition already exists, do nothing and return false (indicating
// no change was made).
// 2. If a condition of the same type exists but with a different status, update
// the condition in place and return true (indicating a condition change was made).
func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool {
func updateConditionAndPhase(js *jobset.JobSet, opts *conditionOpts) bool {
if opts == nil || opts.condition == nil {
return false
}
Expand Down Expand Up @@ -941,6 +942,13 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool {
js.Status.Conditions = append(js.Status.Conditions, newCond)
shouldUpdate = true
}

// Update the JobSet phase if necessary.
if opts.phase != "" && js.Status.Phase != opts.phase {
js.Status.Phase = opts.phase
shouldUpdate = true
}

return shouldUpdate
}

Expand Down Expand Up @@ -970,6 +978,7 @@ func makeCompletedConditionsOpts() *conditionOpts {
Reason: constants.AllJobsCompletedReason,
Message: constants.AllJobsCompletedMessage,
},
phase: string(jobset.JobSetCompleted),
}
}

Expand All @@ -984,6 +993,7 @@ func makeSuspendedConditionOpts() *conditionOpts {
Reason: constants.JobSetSuspendedReason,
Message: constants.JobSetSuspendedMessage,
},
phase: string(jobset.JobSetSuspended),
}
}

Expand All @@ -998,6 +1008,7 @@ func makeResumedConditionOpts() *conditionOpts {
Reason: constants.JobSetResumedReason,
Message: constants.JobSetResumedMessage,
},
phase: string(jobset.JobSetRunning),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func TestUpdateConditions(t *testing.T) {
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).
Obj()).Phase(string(jobset.JobSetCompleted)).
Conditions([]metav1.Condition{
// JobSet is completed..
{
Expand All @@ -752,7 +752,7 @@ func TestUpdateConditions(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
gotUpdate := updateCondition(tc.js, tc.opts)
gotUpdate := updateConditionAndPhase(tc.js, tc.opts)
if gotUpdate != tc.expectedUpdate {
t.Errorf("updateCondition return mismatch (want: %v, got %v)", tc.expectedUpdate, gotUpdate)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (j *JobSetWrapper) FailedCondition(failedAt metav1.Time) *JobSetWrapper {
return j
}

// Phase sets the value of JobSet.Status.Phase.
func (j *JobSetWrapper) Phase(phase string) *JobSetWrapper {
j.Status.Phase = phase
return j
}

func (j *JobSetWrapper) DeletionTimestamp(deletionTimestamp *metav1.Time) *JobSetWrapper {
j.ObjectMeta.DeletionTimestamp = deletionTimestamp
return j
Expand Down
1 change: 1 addition & 0 deletions sdk/python/docs/JobsetV1alpha2JobSetStatus.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ JobSetStatus defines the observed state of JobSet
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**conditions** | [**list[V1Condition]**](V1Condition.md) | | [optional]
**phase** | **str** | Phase of the JobSet. | [optional]
**replicated_jobs_status** | [**list[JobsetV1alpha2ReplicatedJobStatus]**](JobsetV1alpha2ReplicatedJobStatus.md) | ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. | [optional]
**restarts** | **int** | Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). | [optional]
**restarts_count_towards_max** | **int** | RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. | [optional]
Expand Down
30 changes: 29 additions & 1 deletion sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,37 @@ class JobsetV1alpha2JobSetStatus(object):
"""
openapi_types = {
'conditions': 'list[V1Condition]',
'phase': 'str',
'replicated_jobs_status': 'list[JobsetV1alpha2ReplicatedJobStatus]',
'restarts': 'int',
'restarts_count_towards_max': 'int'
}

attribute_map = {
'conditions': 'conditions',
'phase': 'phase',
'replicated_jobs_status': 'replicatedJobsStatus',
'restarts': 'restarts',
'restarts_count_towards_max': 'restartsCountTowardsMax'
}

def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501
def __init__(self, conditions=None, phase=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501
"""JobsetV1alpha2JobSetStatus - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
self.local_vars_configuration = local_vars_configuration

self._conditions = None
self._phase = None
self._replicated_jobs_status = None
self._restarts = None
self._restarts_count_towards_max = None
self.discriminator = None

if conditions is not None:
self.conditions = conditions
if phase is not None:
self.phase = phase
if replicated_jobs_status is not None:
self.replicated_jobs_status = replicated_jobs_status
if restarts is not None:
Expand Down Expand Up @@ -88,6 +93,29 @@ def conditions(self, conditions):

self._conditions = conditions

@property
def phase(self):
"""Gets the phase of this JobsetV1alpha2JobSetStatus. # noqa: E501
Phase of the JobSet. # noqa: E501
:return: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501
:rtype: str
"""
return self._phase

@phase.setter
def phase(self, phase):
"""Sets the phase of this JobsetV1alpha2JobSetStatus.
Phase of the JobSet. # noqa: E501
:param phase: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501
:type: str
"""

self._phase = phase

@property
def replicated_jobs_status(self):
"""Gets the replicated_jobs_status of this JobsetV1alpha2JobSetStatus. # noqa: E501
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down Expand Up @@ -136,6 +137,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down
1 change: 1 addition & 0 deletions test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
LastTransitionTime: metav1.Now(),
},
},
Phase: string(jobset.JobSetRunning),
Restarts: 1,
ReplicatedJobsStatus: []jobset.ReplicatedJobStatus{
{
Expand Down

0 comments on commit 7c00e55

Please sign in to comment.