Skip to content

Commit

Permalink
Merge pull request #1033 from epam/check-run-with-info
Browse files Browse the repository at this point in the history
[jobframework] Check the potential failure of RunWithPodSetsInfo
  • Loading branch information
k8s-ci-robot committed Aug 4, 2023
2 parents b2d395b + cd35acd commit 7f2b00b
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type GenericJob interface {
// Suspend will suspend the job.
Suspend()
// RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it.
RunWithPodSetsInfo(nodeSelectors []PodSetInfo)
RunWithPodSetsInfo(nodeSelectors []PodSetInfo) error
// RestorePodSetsInfo will restore the original node affinity and podSet counts of the job.
// Returns whether any change was done.
RestorePodSetsInfo(nodeSelectors []PodSetInfo) bool
Expand Down
34 changes: 29 additions & 5 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ import (
"sigs.k8s.io/kueue/pkg/workload"
)

const (
FailedToStartFinisedReason = "FailedToStart"
)

var (
ErrChildJobOwnerNotFound = fmt.Errorf("owner isn't set even though %s annotation is set", controllerconsts.ParentWorkloadAnnotation)
ErrUnknownWorkloadOwner = errors.New("workload owner is unknown")
ErrWorkloadOwnerNotFound = errors.New("workload owner not found")
ErrNoMatchingWorkloads = errors.New("no matching workloads")
ErrExtraWorkloads = errors.New("extra workloads")
ErrInvalidPodsetInfo = errors.New("invalid podset infos")
)

// JobReconciler reconciles a GenericJob object
Expand Down Expand Up @@ -173,11 +178,12 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

if wl != nil && apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}

// 2. handle job is finished.
if condition, finished := job.Finished(); finished {
if wl == nil || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
if condition, finished := job.Finished(); finished && wl != nil {
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
Expand Down Expand Up @@ -248,6 +254,14 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
err := r.startJob(ctx, job, object, wl)
if err != nil {
log.Error(err, "Unsuspending job")
if isPermanent(err) {
// Mark the workload as finished with failure since the is no point to retry.
errUpdateStatus := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, FailedToStartFinisedReason, err.Error(), constants.JobControllerName)
if errUpdateStatus != nil {
log.Error(errUpdateStatus, "Updating workload status, on start failure %s", err.Error())
}
return ctrl.Result{}, errUpdateStatus
}
}
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -283,6 +297,10 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

func isPermanent(e error) bool {
return errors.Is(e, ErrInvalidPodsetInfo)
}

// IsParentJobManaged checks whether the parent job is managed by kueue.
func (r *JobReconciler) IsParentJobManaged(ctx context.Context, jobObj client.Object, namespace string) (bool, error) {
owner := metav1.GetControllerOf(jobObj)
Expand Down Expand Up @@ -422,7 +440,9 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli
if err != nil {
return err
}
job.RunWithPodSetsInfo(info)
if runErr := job.RunWithPodSetsInfo(info); runErr != nil {
return runErr
}

if err := r.client.Update(ctx, object); err != nil {
return err
Expand Down Expand Up @@ -667,3 +687,7 @@ func resetMinCounts(in []kueue.PodSet) []kueue.PodSet {
}
return in
}

func BadPodSetsInfoLenError(want, got int) error {
return fmt.Errorf("%w: expecting %d podset, got %d", ErrInvalidPodsetInfo, got, want)
}
7 changes: 4 additions & 3 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,10 @@ func (j *Job) PodSets() []kueue.PodSet {
}
}

func (j *Job) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) {
func (j *Job) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) error {
j.Spec.Suspend = pointer.Bool(false)
if len(podSetsInfo) == 0 {
return
if len(podSetsInfo) != 1 {
return jobframework.BadPodSetsInfoLenError(1, len(podSetsInfo))
}

info := podSetsInfo[0]
Expand All @@ -225,6 +225,7 @@ func (j *Job) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) {
j.Spec.Completions = j.Spec.Parallelism
}
}
return nil
}

func (j *Job) RestorePodSetsInfo(podSetsInfo []jobframework.PodSetInfo) bool {
Expand Down
25 changes: 24 additions & 1 deletion pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func TestPodSetsInfo(t *testing.T) {
job *Job
runInfo, restoreInfo []jobframework.PodSetInfo
wantUnsuspended *batchv1.Job
wantRunError error
}{
"append": {
job: (*Job)(utiltestingjob.MakeJob("job", "ns").
Expand Down Expand Up @@ -242,12 +243,34 @@ func TestPodSetsInfo(t *testing.T) {
},
},
},
"noInfoOnRun": {
job: (*Job)(utiltestingjob.MakeJob("job", "ns").
Parallelism(5).
SetAnnotation(JobMinParallelismAnnotation, "2").
Obj()),
runInfo: []jobframework.PodSetInfo{},
wantUnsuspended: utiltestingjob.MakeJob("job", "ns").
Parallelism(5).
SetAnnotation(JobMinParallelismAnnotation, "2").
Suspend(false).
Obj(),
restoreInfo: []jobframework.PodSetInfo{
{
Count: 5,
},
},
wantRunError: jobframework.ErrInvalidPodsetInfo,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
origSpec := *tc.job.Spec.DeepCopy()

tc.job.RunWithPodSetsInfo(tc.runInfo)
gotErr := tc.job.RunWithPodSetsInfo(tc.runInfo)

if diff := cmp.Diff(tc.wantRunError, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("node selectors mismatch (-want +got):\n%s", diff)
}

if diff := cmp.Diff(tc.job.Spec, tc.wantUnsuspended.Spec); diff != "" {
t.Errorf("node selectors mismatch (-want +got):\n%s", diff)
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/jobs/jobset/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,10 @@ func (j *JobSet) PodSets() []kueue.PodSet {
return podSets
}

func (j *JobSet) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) {
func (j *JobSet) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) error {
j.Spec.Suspend = pointer.Bool(false)
if len(podSetInfos) != len(j.Spec.ReplicatedJobs) {
// this is very unlikely, however in order to avoid any potential
// out of bounds access
return
return jobframework.BadPodSetsInfoLenError(len(j.Spec.ReplicatedJobs), len(podSetInfos))
}

// If there are Jobs already created by the JobSet, their node selectors will be updated by the JobSet controller
Expand All @@ -127,6 +125,7 @@ func (j *JobSet) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) {
templateSpec := &j.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec
templateSpec.NodeSelector = maps.MergeKeepFirst(podSetInfos[index].NodeSelector, templateSpec.NodeSelector)
}
return nil
}

func (j *JobSet) RestorePodSetsInfo(podSetInfos []jobframework.PodSetInfo) bool {
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/jobs/mpijob/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,23 @@ func (j *MPIJob) PodSets() []kueue.PodSet {
return podSets
}

func (j *MPIJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) {
func (j *MPIJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) error {
j.Spec.RunPolicy.Suspend = pointer.Bool(false)
if len(podSetInfos) == 0 {
return
orderedReplicaTypes := orderedReplicaTypes(&j.Spec)

if len(podSetInfos) != len(orderedReplicaTypes) {
return jobframework.BadPodSetsInfoLenError(len(orderedReplicaTypes), len(podSetInfos))
}

// The node selectors are provided in the same order as the generated list of
// podSets, use the same ordering logic to restore them.
orderedReplicaTypes := orderedReplicaTypes(&j.Spec)
for index := range podSetInfos {
replicaType := orderedReplicaTypes[index]
info := podSetInfos[index]
replicaSpec := &j.Spec.MPIReplicaSpecs[replicaType].Template.Spec
replicaSpec.NodeSelector = maps.MergeKeepFirst(info.NodeSelector, replicaSpec.NodeSelector)
}
return nil
}

func (j *MPIJob) RestorePodSetsInfo(podSetInfos []jobframework.PodSetInfo) bool {
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func (j *RayJob) PodSets() []kueue.PodSet {
return podSets
}

func (j *RayJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) {
func (j *RayJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) error {
j.Spec.Suspend = false
if len(podSetInfos) != len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1 {
return
expectedLen := len(j.Spec.RayClusterSpec.WorkerGroupSpecs) + 1
if len(podSetInfos) != expectedLen {
return jobframework.BadPodSetsInfoLenError(expectedLen, len(podSetInfos))
}

// head
Expand All @@ -127,6 +128,7 @@ func (j *RayJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) {
workerPodSpec := &j.Spec.RayClusterSpec.WorkerGroupSpecs[index].Template.Spec
workerPodSpec.NodeSelector = maps.MergeKeepFirst(podSetInfos[index+1].NodeSelector, workerPodSpec.NodeSelector)
}
return nil
}

func (j *RayJob) RestorePodSetsInfo(podSetInfos []jobframework.PodSetInfo) bool {
Expand Down
25 changes: 24 additions & 1 deletion pkg/controller/jobs/rayjob/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rayjob

import (
"errors"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -151,8 +152,26 @@ func TestNodeSelectors(t *testing.T) {
}).
Obj())

// RunWithPodSetsInfo with invalid info should fail
runErr := job.RunWithPodSetsInfo([]jobframework.PodSetInfo{
{
NodeSelector: map[string]string{
"newKey": "newValue",
},
},
{
NodeSelector: map[string]string{
"key-wg1": "updated-value-wg1",
},
},
})

if !errors.Is(runErr, jobframework.ErrInvalidPodsetInfo) {
t.Errorf("expecting error for bad PodSetsInfo on RunWithPodSetsInfo")
}

// RunWithPodSetsInfo should append or update the node selectors
job.RunWithPodSetsInfo([]jobframework.PodSetInfo{
runErr = job.RunWithPodSetsInfo([]jobframework.PodSetInfo{
{
NodeSelector: map[string]string{
"newKey": "newValue",
Expand All @@ -170,6 +189,10 @@ func TestNodeSelectors(t *testing.T) {
},
})

if runErr != nil {
t.Errorf("unexpected error on RunWithPodSetsInfo: %s", runErr.Error())
}

if diff := cmp.Diff(
map[string]string{
"newKey": "newValue",
Expand Down

0 comments on commit 7f2b00b

Please sign in to comment.