Skip to content

Commit

Permalink
use flag to indicate if status update is needed
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Apr 9, 2024
1 parent a8d9a0b commit a71c4dc
Showing 1 changed file with 26 additions and 49 deletions.
75 changes: 26 additions & 49 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,13 @@ type childJobs struct {
delete []*batchv1.Job
}

// statusUpdateOpts tracks status updates that should be performed at the end of the reconciliation
// statusUpdateOpts tracks if a JobSet status update should be performed at the end of the reconciliation
// attempt, as well as events that should be conditionally emitted if the status update succeeds.
// - During the reconcilation attempt, any JobSet status update that should be added to
// the `updates` slice as a statusUpdateFunc.
// - Any k8s event that should be conditionally emitted if the status update succeeds should
// be added to the `events` slice.
// - At the end of each reconciliation attempt, the `updates` functions will be executed
// sequentially to modify the JobSet status object in memory with any necessary updates,
// then a single status update API call is made to persist these changes.
// - If the status update call succeeds, the k8s events will be emitted, otherwise the
// status update error is returned.
type statusUpdateOpts struct {
updates []statusUpdateFunc
events []*eventParams
shouldUpdate bool
events []*eventParams
}

// statusUpdateFunc is a function which mutates a JobSet status.
type statusUpdateFunc func(*jobset.JobSetStatus)

// eventParams contains parameters used for emitting a Kubernetes event.
type eventParams struct {
object runtime.Object
Expand Down Expand Up @@ -231,26 +219,21 @@ func SetupJobSetIndexes(ctx context.Context, indexer client.FieldIndexer) error
})
}

// updateJobSetStatus will update the JobSet status with any status updates present in the updateStatusOpts,
// and if the status update is successful, will then emit any events in the updateStatusOpts as well.
// updateJobSetStatus will update the JobSet status if updateStatusOpts requires it,
// and conditionally emit events in updateStatusOpts if the status update call succeeds.
func (r *JobSetReconciler) updateJobSetStatus(ctx context.Context, js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) error {
log := ctrl.LoggerFrom(ctx)

// Perform all mutations on JobSet status object.
for _, updateFn := range updateStatusOpts.updates {
updateFn(&js.Status)
}

// Make single API call to persist the JobSet status update.
if err := r.Status().Update(ctx, js); err != nil {
log.Error(err, "updating jobset status")
return err
}

// If the status update was successful, emit any events we want to emit for the status updates
// that were performed.
for _, event := range updateStatusOpts.events {
r.Record.Eventf(event.object, event.eventType, event.eventReason, event.eventMessage)
if updateStatusOpts.shouldUpdate {
// Make single API call to persist the JobSet status update.
if err := r.Status().Update(ctx, js); err != nil {
log.Error(err, "updating jobset status")
return err
}
// If the status update was successful (or if we had no status updates), emit any enqueued events.
for _, event := range updateStatusOpts.events {
r.Record.Eventf(event.object, event.eventType, event.eventReason, event.eventMessage)
}
}
return nil
}
Expand Down Expand Up @@ -297,17 +280,15 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
return &ownedJobs, nil
}

// updateReplicatedJobsStatuses enqueues a JobSet status update to be performed in the status of the
// replicatedJobs has changed.
// updateReplicatedJobsStatuses updates the replicatedJob statuses if they have changed.
func updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) {
// Check if status ReplicatedJobsStatus has changed
if apiequality.Semantic.DeepEqual(js.Status.ReplicatedJobsStatus, statuses) {
return
}
// Add a new status update to perform at the end of the reconciliation attempt.
enqueueStatusUpdate(updateStatusOpts, func(jsStatus *jobset.JobSetStatus) {
jsStatus.ReplicatedJobsStatus = statuses
})
js.Status.ReplicatedJobsStatus = statuses
updateStatusOpts.shouldUpdate = true
}

// calculateReplicatedJobStatuses uses the JobSet's child jobs to update the statuses
Expand Down Expand Up @@ -625,9 +606,8 @@ func failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, updateStat

// Increment JobSet restarts. This will trigger reconciliation and result in deletions
// of old jobs not part of the current jobSet run.
enqueueStatusUpdate(updateStatusOpts, func(jsStatus *jobset.JobSetStatus) {
js.Status.Restarts += 1
})
js.Status.Restarts += 1
updateStatusOpts.shouldUpdate = true

// Emit event for each JobSet restarts for observability and debugability.
enqueueEvent(updateStatusOpts, &eventParams{
Expand Down Expand Up @@ -893,13 +873,6 @@ func managedByExternalController(js *jobset.JobSet) *string {
return nil
}

// enqueueStatusUpdate appends a new JobSet status mutation function to be run at the end of
// the reconciliation attempt. Once all the enqueued status update functions have been executed,
// a single status update API call will be made to persist the JobSet status update.
func enqueueStatusUpdate(updateStatusOpts *statusUpdateOpts, updateFn statusUpdateFunc) {
updateStatusOpts.updates = append(updateStatusOpts.updates, updateFn)
}

// enqueueEvent appends a new k8s event to be emitted if and only after running the status
// update functions in the updateStatusOpts, the status update API call suceeds.
func enqueueEvent(updateStatusOpts *statusUpdateOpts, event *eventParams) {
Expand Down Expand Up @@ -992,7 +965,7 @@ type conditionOpts struct {
// and enqueue an event for emission if the status update succeeds at the end of the reconcile.
func setCondition(condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) {
// Return early if this condition is already set.
if !updateCondition(condOpts.jobset, condOpts.condition, condOpts.forceFalseUpdate) {
if !updateCondition(condOpts) {
return
}
// Emit an event for each JobSet condition update.
Expand All @@ -1015,7 +988,11 @@ func setCondition(condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) {
// The `forceFalseUpdate` flag is needed for the in-order startup policy implementation.
// We set the startup policy condition status to false to indicate the startup policy is
// in progress, then set it to true once it has completed.
func updateCondition(js *jobset.JobSet, condition metav1.Condition, forceFalseUpdate bool) bool {
func updateCondition(condOpts *conditionOpts) bool {
js := condOpts.jobset
condition := condOpts.condition
forceFalseUpdate := condOpts.forceFalseUpdate

condition.LastTransitionTime = metav1.Now()
for i, val := range js.Status.Conditions {
if condition.Type == val.Type && condition.Status != val.Status {
Expand Down

0 comments on commit a71c4dc

Please sign in to comment.