From 059b20dbe9d5cd4daa281d2d1d500bcfee12136c Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Sun, 5 May 2019 16:01:19 +0800 Subject: [PATCH] fix state convert --- pkg/admission/admit_job.go | 8 +- pkg/controllers/job/job_controller.go | 2 +- pkg/controllers/job/job_controller_actions.go | 78 ++++++++++++++----- pkg/controllers/job/job_controller_handler.go | 12 +-- pkg/controllers/job/job_controller_util.go | 12 ++- pkg/controllers/job/state/aborted.go | 2 +- pkg/controllers/job/state/aborting.go | 2 +- pkg/controllers/job/state/factory.go | 4 +- pkg/controllers/job/state/failed.go | 30 ------- pkg/controllers/job/state/restarting.go | 15 ++-- test/e2e/command.go | 2 +- test/e2e/job_error_handling.go | 22 +++--- test/e2e/job_scheduling.go | 20 ++--- test/e2e/util.go | 17 +++- 14 files changed, 129 insertions(+), 97 deletions(-) delete mode 100644 pkg/controllers/job/state/failed.go diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index 93eea205962..76988954843 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -54,11 +54,10 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { msg = validateJob(job, &reviewResponse) break case v1beta1.Update: - oldJob, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource) + _, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource) if err != nil { return ToAdmissionResponse(err) } - msg = specDeepEqual(job, oldJob, &reviewResponse) break default: err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'") @@ -82,6 +81,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st return fmt.Sprintf("'minAvailable' cannot be less than zero.") } + if job.Spec.MaxRetry < 0 { + reviewResponse.Allowed = false + return fmt.Sprintf("'maxRetry' cannot be less than zero.") + } + if len(job.Spec.Tasks) == 0 { reviewResponse.Allowed = false return fmt.Sprintf("No task specified in job spec") diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index b38518f2f05..7ecdd999f28 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -128,7 +128,7 @@ func NewJobController(config *rest.Config) *Controller { cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: cc.addJob, // TODO: enable this until we find an appropriate way. - // UpdateFunc: cc.updateJob, + UpdateFunc: cc.updateJob, DeleteFunc: cc.deleteJob, }) cc.jobLister = cc.jobInformer.Lister() diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index ca9c7e5132b..e3fcd736a66 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -41,8 +41,6 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name) job := jobInfo.Job - // Job version is bumped only when job is killed - job.Status.Version = job.Status.Version + 1 glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) if job.DeletionTimestamp != nil { glog.Infof("Job <%s/%s> is terminating, skip management process.", @@ -88,6 +86,10 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return fmt.Errorf("failed to kill %d pods of %d", len(errs), total) } + job = job.DeepCopy() + //Job version is bumped only when job is killed + job.Status.Version = job.Status.Version + 1 + job.Status = vkv1.JobStatus{ State: job.Status.State, @@ -112,6 +114,8 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return err } else { if e := cc.cache.Update(job); e != nil { + glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v", + job.Namespace, job.Name, e) return e } } @@ -138,21 +142,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name) - job := jobInfo.Job + job := jobInfo.Job.DeepCopy() glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) - newJob, err := cc.needUpdateForVolumeClaim(job) - if err != nil { + if update, err := cc.filljob(job); err != nil || update { return err } - if newJob != nil { - if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { - glog.Errorf("Failed to update Job %v/%v: %v", - job.Namespace, job.Name, err) - return err - } - return nil - } if err := cc.pluginOnJobAdd(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError), @@ -168,6 +163,18 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta return err } + if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } else { + if e := cc.cache.Update(job); e != nil { + glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", + job.Namespace, job.Name, e) + return e + } + } + return nil } @@ -175,7 +182,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt glog.V(3).Infof("Starting to sync up Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) defer glog.V(3).Infof("Finished Job <%s/%s> sync up", jobInfo.Job.Namespace, jobInfo.Job.Name) - job := jobInfo.Job + job := jobInfo.Job.DeepCopy() glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) if job.DeletionTimestamp != nil { @@ -313,6 +320,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return err } else { if e := cc.cache.Update(job); e != nil { + glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v", + job.Namespace, job.Name, e) return e } } @@ -356,10 +365,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { return nil } -func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) { +func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) { // If VolumeClaimName does not exist, generate them for Job. var newJob *vkv1.Job volumes := job.Spec.Volumes + update := false for index, volume := range volumes { vcName := volume.VolumeClaimName if len(vcName) == 0 { @@ -368,7 +378,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr) exist, err := cc.checkPVCExist(job, vcName) if err != nil { - return nil, err + return false, nil, err } if exist { continue @@ -377,11 +387,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) newJob = job.DeepCopy() } newJob.Spec.Volumes[index].VolumeClaimName = vcName + update = true break } } } - return newJob, nil + return update, newJob, nil } func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) { @@ -428,8 +439,9 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error { } pg := &kbv1.PodGroup{ ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: job.Name, + Namespace: job.Namespace, + Name: job.Name, + Annotations: job.Annotations, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, helpers.JobKind), }, @@ -494,3 +506,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { return minAvailableTasksRes.Convert2K8sResource() } + +func (cc *Controller) filljob(job *vkv1.Job) (bool, error) { + update, newJob, err := cc.needUpdateForVolumeClaim(job) + if err != nil { + return false, err + } + if update { + if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { + glog.Errorf("Failed to update Job %v/%v: %v", + job.Namespace, job.Name, err) + return false, err + } + return true, nil + } else if job.Status.State.Phase == "" { + job.Status.State.Phase = vkv1.Pending + if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + } else { + if e := cc.cache.Update(j); e != nil { + glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e) + } + } + return true, nil + } + + return false, nil +} diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 98b414f59b5..8df166507ac 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -81,18 +81,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { return } - if err := cc.cache.Update(newJob); err != nil { - glog.Errorf("Failed to update job <%s/%s>: %v in cache", - newJob.Namespace, newJob.Name, err) - } - // NOTE: Since we only reconcile job based on Spec, we will ignore other attributes // For Job status, it's used internally and always been updated via our controller. - if reflect.DeepEqual(newJob.Spec, oldJob.Spec) { + if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase { glog.Infof("Job update event is ignored since no update in 'Spec'.") return } + if err := cc.cache.Update(newJob); err != nil { + glog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache", + newJob.Namespace, newJob.Name, err) + } + req := apis.Request{ Namespace: newJob.Namespace, JobName: newJob.Name, diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index cd863359986..70f96616e0c 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -154,8 +154,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { for _, task := range job.Spec.Tasks { if task.Name == req.TaskName { for _, policy := range task.Policies { - if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { - return policy.Action + if len(policy.Event) > 0 && len(req.Event) > 0 { + if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { + return policy.Action + } } // 0 is not an error code, is prevented in validation admission controller @@ -170,8 +172,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { // Parse Job level policies for _, policy := range job.Spec.Policies { - if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { - return policy.Action + if len(policy.Event) > 0 && len(req.Event) > 0 { + if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { + return policy.Action + } } // 0 is not an error code, is prevented in validation admission controller diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index f7439c1bbca..104170615b4 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -28,7 +28,7 @@ type abortedState struct { func (as *abortedState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: - return SyncJob(as.job, func(status *vkv1.JobStatus) { + return KillJob(as.job, func(status *vkv1.JobStatus) { status.State.Phase = vkv1.Restarting status.RetryCount++ }) diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index f688daf9090..cae21cb466d 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: // Already in Restarting phase, just sync it - return SyncJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) { status.State.Phase = vkv1.Restarting status.RetryCount++ }) diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index f9a6be522a0..a24f6055437 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State { return &runningState{job: jobInfo} case vkv1.Restarting: return &restartingState{job: jobInfo} - case vkv1.Terminated, vkv1.Completed: + case vkv1.Terminated, vkv1.Completed, vkv1.Failed: return &finishedState{job: jobInfo} case vkv1.Terminating: return &terminatingState{job: jobInfo} @@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State { return &abortedState{job: jobInfo} case vkv1.Completing: return &completingState{job: jobInfo} - case vkv1.Failed: - return &failedState{job: jobInfo} case vkv1.Inqueue: return &inqueueState{job: jobInfo} } diff --git a/pkg/controllers/job/state/failed.go b/pkg/controllers/job/state/failed.go deleted file mode 100644 index d969612a1f5..00000000000 --- a/pkg/controllers/job/state/failed.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2017 The Volcano Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package state - -import ( - vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - "volcano.sh/volcano/pkg/controllers/apis" -) - -type failedState struct { - job *apis.JobInfo -} - -func (ps *failedState) Execute(action vkv1.Action) error { - return KillJob(ps.job, nil) -} diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index c59395e5e4e..a58dbd78111 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -26,7 +26,7 @@ type restartingState struct { } func (ps *restartingState) Execute(action vkv1.Action) error { - return SyncJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) { phase := vkv1.Restarting // Get the maximum number of retries. @@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error { // Failed is the phase that the job is restarted failed reached the maximum number of retries. phase = vkv1.Failed } else { - if status.Terminating == 0 { - if status.Running >= ps.job.Job.Spec.MinAvailable { - phase = vkv1.Running - } else { - phase = vkv1.Pending - } + total := int32(0) + for _, task := range ps.job.Job.Spec.Tasks { + total += task.Replicas + } + + if total-status.Terminating >= status.MinAvailable { + phase = vkv1.Pending } } diff --git a/test/e2e/command.go b/test/e2e/command.go index e3554167c5b..4300022172f 100644 --- a/test/e2e/command.go +++ b/test/e2e/command.go @@ -138,7 +138,7 @@ var _ = Describe("Job E2E Test: Test Job Command", func() { //Job is pending err := waitJobPending(context, job) Expect(err).NotTo(HaveOccurred()) - err = waitJobStatePending(context, job) + err = waitJobStateInqueue(context, job) Expect(err).NotTo(HaveOccurred()) //Suspend job and wait status change diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index 500aa1adb32..1a875cb851a 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -61,7 +61,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> restarting - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Restarting}) Expect(err).NotTo(HaveOccurred()) }) @@ -98,7 +98,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> Terminating -> Terminated - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Terminating, vkv1.Terminated}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Terminating, vkv1.Terminated}) Expect(err).NotTo(HaveOccurred()) }) @@ -135,7 +135,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> Aborting -> Aborted - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Aborting, vkv1.Aborted}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Aborting, vkv1.Aborted}) Expect(err).NotTo(HaveOccurred()) }) @@ -170,7 +170,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -179,7 +179,7 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) // job phase: Restarting -> Running - err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Running}) + err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) }) @@ -214,7 +214,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -258,7 +258,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -302,7 +302,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -311,7 +311,7 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) // job phase: Restarting -> Running - err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Running}) + err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) }) @@ -464,7 +464,7 @@ var _ = Describe("Job Error Handling", func() { By("job scheduled, then task 'completed_task' finished and job finally complete") // job phase: pending -> running -> completing -> completed err := waitJobStates(context, job, []vkv1.JobPhase{ - vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) + vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) }) @@ -503,7 +503,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> restarting - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Restarting}) Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/job_scheduling.go b/test/e2e/job_scheduling.go index e7a6d4e1617..6f1cf0b07bf 100644 --- a/test/e2e/job_scheduling.go +++ b/test/e2e/job_scheduling.go @@ -93,16 +93,17 @@ var _ = Describe("Job E2E Test", func() { namespace: "test", tasks: []taskSpec{ { - img: defaultBusyBoxImage, - req: oneCPU, - min: rep, - rep: rep, + img: defaultBusyBoxImage, + req: oneCPU, + min: rep, + rep: rep, + command: "sleep 10s", }, }, } job := createJob(context, jobSpec) - err = waitJobPending(context, job) + err = waitJobStateInqueue(context, job) Expect(err).NotTo(HaveOccurred()) err = waitJobUnschedulable(context, job) @@ -260,10 +261,11 @@ var _ = Describe("Job E2E Test", func() { namespace: "test", tasks: []taskSpec{ { - img: defaultNginxImage, - req: slot, - min: rep, - rep: rep, + img: defaultNginxImage, + req: slot, + min: rep, + rep: rep, + command: "sleep 10s", }, }, } diff --git a/test/e2e/util.go b/test/e2e/util.go index 6f223333e7d..c90ac521f3a 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -427,7 +427,7 @@ func jobUnschedulable(ctx *context, job *vkv1.Job, time time.Time) wait.Conditio for _, event := range events.Items { target := event.InvolvedObject - if target.Name == pg.Name && target.Namespace == pg.Namespace { + if strings.HasPrefix(target.Name, pg.Name) && target.Namespace == pg.Namespace { if event.Reason == string("Unschedulable") && event.LastTimestamp.After(time) { return true, nil } @@ -481,6 +481,11 @@ func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { } func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error { + total := int32(0) + for _, task := range job.Spec.Tasks { + total += task.Replicas + } + return wait.Poll(100*time.Millisecond, twoMinute, func() (bool, error) { newJob, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) @@ -492,7 +497,9 @@ func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error { var flag = false switch phase { case vkv1.Pending: - flag = newJob.Status.Pending > 0 + flag = (newJob.Status.Pending+newJob.Status.Succeeded+ + newJob.Status.Failed+newJob.Status.Running) == 0 || + (total-newJob.Status.Terminating >= newJob.Status.MinAvailable) case vkv1.Terminating, vkv1.Aborting, vkv1.Restarting: flag = newJob.Status.Terminating > 0 case vkv1.Terminated, vkv1.Aborted: @@ -503,6 +510,8 @@ func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error { flag = newJob.Status.Succeeded == state.TotalTasks(newJob) case vkv1.Running: flag = newJob.Status.Running >= newJob.Spec.MinAvailable + case vkv1.Inqueue: + flag = newJob.Status.Pending > 0 default: return false, fmt.Errorf("unknown phase %s", phase) } @@ -538,6 +547,10 @@ func waitJobStatePending(ctx *context, job *vkv1.Job) error { return wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, vkv1.Pending)) } +func waitJobStateInqueue(ctx *context, job *vkv1.Job) error { + return wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, vkv1.Inqueue)) +} + func waitJobStateAborted(ctx *context, job *vkv1.Job) error { return wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, vkv1.Aborted)) }