diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index ca9c7e5132b..46eb40a553c 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -138,21 +138,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name) - job := jobInfo.Job + job := jobInfo.Job.DeepCopy() glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) - newJob, err := cc.needUpdateForVolumeClaim(job) - if err != nil { + if update, err := cc.filljob(job); err != nil || update { return err } - if newJob != nil { - if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { - glog.Errorf("Failed to update Job %v/%v: %v", - job.Namespace, job.Name, err) - return err - } - return nil - } if err := cc.pluginOnJobAdd(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError), @@ -168,6 +159,18 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta return err } + if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } else { + if e := cc.cache.Update(job); e != nil { + glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", + job.Namespace, job.Name, e) + return e + } + } + return nil } @@ -356,10 +359,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { return nil } -func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) { +func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) { // If VolumeClaimName does not exist, generate them for Job. var newJob *vkv1.Job volumes := job.Spec.Volumes + update := false for index, volume := range volumes { vcName := volume.VolumeClaimName if len(vcName) == 0 { @@ -368,7 +372,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr) exist, err := cc.checkPVCExist(job, vcName) if err != nil { - return nil, err + return false, nil, err } if exist { continue @@ -377,11 +381,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) newJob = job.DeepCopy() } newJob.Spec.Volumes[index].VolumeClaimName = vcName + update = true break } } } - return newJob, nil + return update, newJob, nil } func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) { @@ -494,3 +499,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { return minAvailableTasksRes.Convert2K8sResource() } + +func (cc *Controller) filljob(job *vkv1.Job) (bool, error) { + update, newJob, err := cc.needUpdateForVolumeClaim(job) + if err != nil { + return false, err + } + if update { + if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { + glog.Errorf("Failed to update Job %v/%v: %v", + job.Namespace, job.Name, err) + return false, err + } + return true, nil + } else if job.Status.State.Phase == "" { + job.Status.State.Phase = vkv1.Pending + if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + } else { + if e := cc.cache.Update(j); e != nil { + glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e) + } + } + return true, nil + } + + return false, nil +} \ No newline at end of file diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index f7439c1bbca..104170615b4 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -28,7 +28,7 @@ type abortedState struct { func (as *abortedState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: - return SyncJob(as.job, func(status *vkv1.JobStatus) { + return KillJob(as.job, func(status *vkv1.JobStatus) { status.State.Phase = vkv1.Restarting status.RetryCount++ }) diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index f688daf9090..cae21cb466d 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: // Already in Restarting phase, just sync it - return SyncJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) { status.State.Phase = vkv1.Restarting status.RetryCount++ }) diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index f9a6be522a0..a24f6055437 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State { return &runningState{job: jobInfo} case vkv1.Restarting: return &restartingState{job: jobInfo} - case vkv1.Terminated, vkv1.Completed: + case vkv1.Terminated, vkv1.Completed, vkv1.Failed: return &finishedState{job: jobInfo} case vkv1.Terminating: return &terminatingState{job: jobInfo} @@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State { return &abortedState{job: jobInfo} case vkv1.Completing: return &completingState{job: jobInfo} - case vkv1.Failed: - return &failedState{job: jobInfo} case vkv1.Inqueue: return &inqueueState{job: jobInfo} } diff --git a/pkg/controllers/job/state/failed.go b/pkg/controllers/job/state/failed.go deleted file mode 100644 index d969612a1f5..00000000000 --- a/pkg/controllers/job/state/failed.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2017 The Volcano Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package state - -import ( - vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - "volcano.sh/volcano/pkg/controllers/apis" -) - -type failedState struct { - job *apis.JobInfo -} - -func (ps *failedState) Execute(action vkv1.Action) error { - return KillJob(ps.job, nil) -} diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index c59395e5e4e..a58dbd78111 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -26,7 +26,7 @@ type restartingState struct { } func (ps *restartingState) Execute(action vkv1.Action) error { - return SyncJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) { phase := vkv1.Restarting // Get the maximum number of retries. @@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error { // Failed is the phase that the job is restarted failed reached the maximum number of retries. phase = vkv1.Failed } else { - if status.Terminating == 0 { - if status.Running >= ps.job.Job.Spec.MinAvailable { - phase = vkv1.Running - } else { - phase = vkv1.Pending - } + total := int32(0) + for _, task := range ps.job.Job.Spec.Tasks { + total += task.Replicas + } + + if total-status.Terminating >= status.MinAvailable { + phase = vkv1.Pending } }