Skip to content

Commit

Permalink
Merge pull request #328 from danielvegamyhre/parallel-create
Browse files Browse the repository at this point in the history
Parallelize job creation
  • Loading branch information
k8s-ci-robot authored Nov 9, 2023
2 parents 1d33685 + 103892d commit 9d21db1
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import (
)

const (
RestartsKey string = "jobset.sigs.k8s.io/restart-attempt"
parallelDeletions int = 50
RestartsKey string = "jobset.sigs.k8s.io/restart-attempt"
maxParallelism int = 50
)

var (
Expand Down Expand Up @@ -372,26 +372,37 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
}
}

var lock sync.Mutex
var finalErrs []error

for _, rjob := range js.Spec.ReplicatedJobs {
jobs, err := constructJobsFromTemplate(js, &rjob, ownedJobs)
if err != nil {
return err
}
for _, job := range jobs {
workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobs), func(i int) {
job := jobs[i]

// Set jobset controller as owner of the job for garbage collection and reconcilation.
if err := ctrl.SetControllerReference(js, job, r.Scheme); err != nil {
return err
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, err)
return
}

// Create the job.
// TODO(#18): Deal with the case where the job exists but is not owned by the jobset.
if err := r.Create(ctx, job); err != nil {
return err
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, err)
return
}
log.V(2).Info("successfully created job", "job", klog.KObj(job))
}
})
}
return nil
return errors.Join(finalErrs...)
}

// TODO: look into adopting service and updating the selector
Expand Down Expand Up @@ -493,7 +504,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba
log := ctrl.LoggerFrom(ctx)
lock := &sync.Mutex{}
var finalErrs []error
workqueue.ParallelizeUntil(ctx, parallelDeletions, len(jobsForDeletion), func(i int) {
workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobsForDeletion), func(i int) {
targetJob := jobsForDeletion[i]
// Delete job. This deletion event will trigger another reconciliation,
// where the jobs are recreated.
Expand Down

0 comments on commit 9d21db1

Please sign in to comment.