Skip to content

Commit

Permalink
Merge pull request volcano-sh#126 from wangyuqing4/state
Browse files Browse the repository at this point in the history
[Issue volcano-sh#121]fix state convert
  • Loading branch information
volcano-sh-bot authored May 7, 2019
2 parents 3d213a3 + 059b20d commit 148fed2
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 97 deletions.
8 changes: 6 additions & 2 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
msg = validateJob(job, &reviewResponse)
break
case v1beta1.Update:
oldJob, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
_, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
if err != nil {
return ToAdmissionResponse(err)
}
msg = specDeepEqual(job, oldJob, &reviewResponse)
break
default:
err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'")
Expand All @@ -82,6 +81,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
return fmt.Sprintf("'minAvailable' cannot be less than zero.")
}

if job.Spec.MaxRetry < 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("'maxRetry' cannot be less than zero.")
}

if len(job.Spec.Tasks) == 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("No task specified in job spec")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewJobController(config *rest.Config) *Controller {
cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addJob,
// TODO: enable this until we find an appropriate way.
// UpdateFunc: cc.updateJob,
UpdateFunc: cc.updateJob,
DeleteFunc: cc.deleteJob,
})
cc.jobLister = cc.jobInformer.Lister()
Expand Down
78 changes: 59 additions & 19 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
// Job version is bumped only when job is killed
job.Status.Version = job.Status.Version + 1
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
if job.DeletionTimestamp != nil {
glog.Infof("Job <%s/%s> is terminating, skip management process.",
Expand Down Expand Up @@ -88,6 +86,10 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)
}

job = job.DeepCopy()
//Job version is bumped only when job is killed
job.Status.Version = job.Status.Version + 1

job.Status = vkv1.JobStatus{
State: job.Status.State,

Expand All @@ -112,6 +114,8 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
Expand All @@ -138,21 +142,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
if update, err := cc.filljob(job); err != nil || update {
return err
}
if newJob != nil {
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return nil
}

if err := cc.pluginOnJobAdd(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
Expand All @@ -168,14 +163,26 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
return err
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}

return nil
}

func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
glog.V(3).Infof("Starting to sync up Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> sync up", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if job.DeletionTimestamp != nil {
Expand Down Expand Up @@ -313,6 +320,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
Expand Down Expand Up @@ -356,10 +365,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
return nil
}

func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) {
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) {
// If VolumeClaimName does not exist, generate them for Job.
var newJob *vkv1.Job
volumes := job.Spec.Volumes
update := false
for index, volume := range volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
Expand All @@ -368,7 +378,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return nil, err
return false, nil, err
}
if exist {
continue
Expand All @@ -377,11 +387,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
newJob = job.DeepCopy()
}
newJob.Spec.Volumes[index].VolumeClaimName = vcName
update = true
break
}
}
}
return newJob, nil
return update, newJob, nil
}

func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
Expand Down Expand Up @@ -428,8 +439,9 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
}
pg := &kbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
Namespace: job.Namespace,
Name: job.Name,
Annotations: job.Annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Expand Down Expand Up @@ -494,3 +506,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {

return minAvailableTasksRes.Convert2K8sResource()
}

func (cc *Controller) filljob(job *vkv1.Job) (bool, error) {
update, newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
return false, err
}
if update {
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return false, err
}
return true, nil
} else if job.Status.State.Phase == "" {
job.Status.State.Phase = vkv1.Pending
if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
} else {
if e := cc.cache.Update(j); e != nil {
glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e)
}
}
return true, nil
}

return false, nil
}
12 changes: 6 additions & 6 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
return
}

if err := cc.cache.Update(newJob); err != nil {
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}

// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// For Job status, it's used internally and always been updated via our controller.
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) {
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
glog.Infof("Job update event is ignored since no update in 'Spec'.")
return
}

if err := cc.cache.Update(newJob); err != nil {
glog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}

req := apis.Request{
Namespace: newJob.Namespace,
JobName: newJob.Name,
Expand Down
12 changes: 8 additions & 4 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
for _, task := range job.Spec.Tasks {
if task.Name == req.TaskName {
for _, policy := range task.Policies {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}
}

// 0 is not an error code, is prevented in validation admission controller
Expand All @@ -170,8 +172,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {

// Parse Job level policies
for _, policy := range job.Spec.Policies {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}
}

// 0 is not an error code, is prevented in validation admission controller
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/aborted.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type abortedState struct {
func (as *abortedState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
return SyncJob(as.job, func(status *vkv1.JobStatus) {
return KillJob(as.job, func(status *vkv1.JobStatus) {
status.State.Phase = vkv1.Restarting
status.RetryCount++
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/aborting.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
// Already in Restarting phase, just sync it
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
return KillJob(ps.job, func(status *vkv1.JobStatus) {
status.State.Phase = vkv1.Restarting
status.RetryCount++
})
Expand Down
4 changes: 1 addition & 3 deletions pkg/controllers/job/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State {
return &runningState{job: jobInfo}
case vkv1.Restarting:
return &restartingState{job: jobInfo}
case vkv1.Terminated, vkv1.Completed:
case vkv1.Terminated, vkv1.Completed, vkv1.Failed:
return &finishedState{job: jobInfo}
case vkv1.Terminating:
return &terminatingState{job: jobInfo}
Expand All @@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State {
return &abortedState{job: jobInfo}
case vkv1.Completing:
return &completingState{job: jobInfo}
case vkv1.Failed:
return &failedState{job: jobInfo}
case vkv1.Inqueue:
return &inqueueState{job: jobInfo}
}
Expand Down
30 changes: 0 additions & 30 deletions pkg/controllers/job/state/failed.go

This file was deleted.

15 changes: 8 additions & 7 deletions pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type restartingState struct {
}

func (ps *restartingState) Execute(action vkv1.Action) error {
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
return KillJob(ps.job, func(status *vkv1.JobStatus) {
phase := vkv1.Restarting

// Get the maximum number of retries.
Expand All @@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error {
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
phase = vkv1.Failed
} else {
if status.Terminating == 0 {
if status.Running >= ps.job.Job.Spec.MinAvailable {
phase = vkv1.Running
} else {
phase = vkv1.Pending
}
total := int32(0)
for _, task := range ps.job.Job.Spec.Tasks {
total += task.Replicas
}

if total-status.Terminating >= status.MinAvailable {
phase = vkv1.Pending
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = Describe("Job E2E Test: Test Job Command", func() {
//Job is pending
err := waitJobPending(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitJobStatePending(context, job)
err = waitJobStateInqueue(context, job)
Expect(err).NotTo(HaveOccurred())

//Suspend job and wait status change
Expand Down
Loading

0 comments on commit 148fed2

Please sign in to comment.