From 26b05f511d59e743ba2ba5a8460b5eb6d7ca777e Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Sat, 19 Jan 2019 08:31:25 +0800 Subject: [PATCH] Added Job error handling. Signed-off-by: Klaus Ma --- Makefile | 2 +- config/crds/batch_v1alpha1_job.yaml | 36 +- example/job.yaml | 5 +- pkg/apis/batch/v1alpha1/job.go | 28 +- .../batch/v1alpha1/zz_generated.deepcopy.go | 3 +- .../bus/v1alpha1/zz_generated.deepcopy.go | 2 +- pkg/controllers/job/job_controller.go | 100 ++--- pkg/controllers/job/job_controller_actions.go | 404 +++++++++++------- pkg/controllers/job/job_controller_const.go | 21 + pkg/controllers/job/job_controller_handler.go | 100 +++-- pkg/controllers/job/job_controller_util.go | 66 ++- .../job/state/{restart.go => aborted.go} | 22 +- pkg/controllers/job/state/aborting.go | 56 +++ pkg/controllers/job/state/factory.go | 68 ++- .../job/state/{base.go => finished.go} | 23 +- pkg/controllers/job/state/pending.go | 57 +++ pkg/controllers/job/state/restarting.go | 45 ++ pkg/controllers/job/state/running.go | 82 ++++ pkg/controllers/job/state/terminating.go | 44 ++ pkg/controllers/job/state/util.go | 21 +- test/e2e/util.go | 8 +- 21 files changed, 792 insertions(+), 401 deletions(-) create mode 100644 pkg/controllers/job/job_controller_const.go rename pkg/controllers/job/state/{restart.go => aborted.go} (64%) create mode 100644 pkg/controllers/job/state/aborting.go rename pkg/controllers/job/state/{base.go => finished.go} (60%) create mode 100644 pkg/controllers/job/state/pending.go create mode 100644 pkg/controllers/job/state/restarting.go create mode 100644 pkg/controllers/job/state/running.go create mode 100644 pkg/controllers/job/state/terminating.go diff --git a/Makefile b/Makefile index f97846e051..6894cc1fce 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ cli: generate-code: go build -o ${BIN_DIR}/deepcopy-gen ./cmd/deepcopy-gen/ ${BIN_DIR}/deepcopy-gen -i ./pkg/apis/batch/v1alpha1/ -O zz_generated.deepcopy - ${BIN_DIR}/deepcopy-gen -i ./pkg/apis/core/v1alpha1/ -O zz_generated.deepcopy + ${BIN_DIR}/deepcopy-gen -i ./pkg/apis/bus/v1alpha1/ -O zz_generated.deepcopy e2e-test: ./hack/run-e2e.sh diff --git a/config/crds/batch_v1alpha1_job.yaml b/config/crds/batch_v1alpha1_job.yaml index e30b396273..09f16ccd1a 100644 --- a/config/crds/batch_v1alpha1_job.yaml +++ b/config/crds/batch_v1alpha1_job.yaml @@ -30,29 +30,13 @@ spec: input: description: The volume mount for input of Job properties: - claim: + volumeClaim: description: VolumeClaim defines the PVC used by the VolumeMount. type: object mountPath: description: Path within the container at which the volume should be mounted. Must not contain ':'. type: string - mountPropagation: - description: mountPropagation determines how mounts are propagated - from the host to container and the other way around. When not - set, MountPropagationNone is used. This field is beta in 1.10. - type: string - name: - description: This must match the Name of a Volume. - type: string - readOnly: - description: Mounted read-only if true, read-write otherwise (false - or unspecified). Defaults to false. - type: boolean - subPath: - description: Path within the volume from which the container's volume - should be mounted. Defaults to "" (volume's root). - type: string required: - mountPath type: object @@ -63,29 +47,13 @@ spec: output: description: The volume mount for output of Job properties: - claim: + volumeClaim: description: VolumeClaim defines the PVC used by the VolumeMount. type: object mountPath: description: Path within the container at which the volume should be mounted. Must not contain ':'. type: string - mountPropagation: - description: mountPropagation determines how mounts are propagated - from the host to container and the other way around. When not - set, MountPropagationNone is used. This field is beta in 1.10. - type: string - name: - description: This must match the Name of a Volume. - type: string - readOnly: - description: Mounted read-only if true, read-write otherwise (false - or unspecified). Defaults to false. - type: boolean - subPath: - description: Path within the volume from which the container's volume - should be mounted. Defaults to "" (volume's root). - type: string required: - mountPath type: object diff --git a/example/job.yaml b/example/job.yaml index 3d5fca7c18..36040abb83 100644 --- a/example/job.yaml +++ b/example/job.yaml @@ -4,11 +4,14 @@ metadata: name: test-job spec: minAvailable: 3 + policies: + - event: PodEvicted + action: RestartJob input: mountPath: "/myinput" output: mountPath: "/myoutput" - claim: + volumeClaim: accessModes: [ "ReadWriteOnce" ] storageClassName: "my-storage-class" resources: diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index f7af206475..51c104ac6b 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -64,10 +64,12 @@ type JobSpec struct { // VolumeSpec defines the specification of Volume, e.g. PVC type VolumeSpec struct { - v1.VolumeMount `json:",inline"` + // Path within the container at which the volume should be mounted. Must + // not contain ':'. + MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"` // VolumeClaim defines the PVC used by the VolumeMount. - VolumeClaim *v1.PersistentVolumeClaimSpec `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"` + VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"` } // Event represent the phase of Job, e.g. pod-failed. @@ -75,7 +77,7 @@ type Event string const ( // AllEvent means all event - AllEvents Event = "*" + AnyEvent Event = "*" // PodFailedEvent is triggered if Pod was failed PodFailedEvent Event = "PodFailed" // PodEvictedEvent is triggered if Pod was deleted @@ -165,8 +167,8 @@ const ( Completed JobPhase = "Completed" // Terminating is the phase that the Job is terminated, waiting for releasing pods Terminating JobPhase = "Terminating" - // Teriminated is the phase that the job is finished unexpected, e.g. events - Teriminated JobPhase = "Terminated" + // Terminated is the phase that the job is finished unexpected, e.g. events + Terminated JobPhase = "Terminated" ) // JobState contains details for the current state of the job. @@ -189,25 +191,29 @@ type JobStatus struct { // Current state of Job. State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"` + // The minimal available pods to run for this Job + // +optional + MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"` + // The number of pending pods. // +optional - Pending int32 `json:"pending,omitempty" protobuf:"bytes,2,opt,name=pending"` + Pending int32 `json:"pending,omitempty" protobuf:"bytes,3,opt,name=pending"` // The number of running pods. // +optional - Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"` + Running int32 `json:"running,omitempty" protobuf:"bytes,4,opt,name=running"` // The number of pods which reached phase Succeeded. // +optional - Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,4,opt,name=succeeded"` + Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,5,opt,name=succeeded"` // The number of pods which reached phase Failed. // +optional - Failed int32 `json:"failed,omitempty" protobuf:"bytes,5,opt,name=failed"` + Failed int32 `json:"failed,omitempty" protobuf:"bytes,6,opt,name=failed"` - // The minimal available pods to run for this Job + // The number of pods which reached phase Terminating. // +optional - MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,6,opt,name=minAvailable"` + Terminating int32 `json:"terminating,omitempty" protobuf:"bytes,7,opt,name=terminating"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index 321b509b2c..7e38caca05 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ // +build !ignore_autogenerated /* -Copyright The Kubernetes Authors. +Copyright 2019 The Vulcan Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -208,7 +208,6 @@ func (in *TaskSpec) DeepCopy() *TaskSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) { *out = *in - in.VolumeMount.DeepCopyInto(&out.VolumeMount) if in.VolumeClaim != nil { in, out := &in.VolumeClaim, &out.VolumeClaim *out = new(corev1.PersistentVolumeClaimSpec) diff --git a/pkg/apis/bus/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/bus/v1alpha1/zz_generated.deepcopy.go index 021f6db716..ff8d1aa7d1 100644 --- a/pkg/apis/bus/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/bus/v1alpha1/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ // +build !ignore_autogenerated /* -Copyright The Kubernetes Authors. +Copyright 2019 The Vulcan Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 53a289a249..7aa4ab53a2 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -21,22 +21,10 @@ import ( "github.com/golang/glog" - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" kbinfoext "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions" kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1" kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1" - vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" v1corev1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1" "hpw.cloud/volcano/pkg/apis/helpers" @@ -47,6 +35,15 @@ import ( vkbatchlister "hpw.cloud/volcano/pkg/client/listers/batch/v1alpha1" vkcorelister "hpw.cloud/volcano/pkg/client/listers/bus/v1alpha1" "hpw.cloud/volcano/pkg/controllers/job/state" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // Controller the Job Controller type @@ -173,15 +170,8 @@ func NewJobController(config *rest.Config) *Controller { cc.pgSynced = cc.pgInformer.Informer().HasSynced // Register actions - actionFns := map[vkbatchv1.Action]state.ActionFn{ - vkbatchv1.ResumeJobAction: cc.resumeJob, - vkbatchv1.SyncJobAction: cc.syncJob, - vkbatchv1.AbortJobAction: cc.abortJob, - vkbatchv1.TerminateJobAction: cc.terminateJob, - vkbatchv1.RestartJobAction: cc.restartJob, - vkbatchv1.RestartTaskAction: cc.syncJob, - } - state.RegisterActions(actionFns) + state.SyncJob = cc.syncJob + state.KillJob = cc.killJob return cc } @@ -203,52 +193,54 @@ func (cc *Controller) Run(stopCh <-chan struct{}) { glog.Infof("JobController is running ...... ") } +type Request struct { + Namespace string + JobName string + PodName string + + Event vkbatchv1.Event + Action vkbatchv1.Action + + Reason string + Message string +} + func (cc *Controller) worker() { obj := cache.Pop(cc.eventQueue) if obj == nil { glog.Errorf("Fail to pop item from eventQueue") + return } - req := obj.(*state.Request) + req := obj.(*Request) - if req.Target != nil { - if job, err := cc.jobLister.Jobs(req.Namespace).Get(req.Target.Name); err != nil { - req.Job = job - } + job, err := cc.jobLister.Jobs(req.Namespace).Get(req.JobName) + if err != nil { + return } - if req.Job == nil { - if req.Pod == nil { - glog.Errorf("Empty data for request %v", req) - return - } - jobs, err := cc.jobLister.List(labels.Everything()) - if err != nil { - glog.Errorf("Failed to list Jobs for Pod %v/%v", req.Pod.Namespace, req.Pod.Name) - } - - // TODO(k82cn): select by UID instead of loop - ctl := helpers.GetController(req.Pod) - for _, j := range jobs { - if j.UID == ctl { - req.Job = j - break - } - } + st := state.NewState(job) + if st == nil { + glog.Errorf("Invalid state <%s> of Job <%v/%v>", + job.Status.State, job.Namespace, job.Name) + return } - if req.Job == nil { - glog.Errorf("No Job for request %v from pod %v/%v", - req.Event, req.Pod.Namespace, req.Pod.Name) - return + action := req.Action + if len(action) == 0 { + pod, err := cc.podLister.Pods(req.Namespace).Get(req.PodName) + if err != nil { + pod = nil + } + action = applyPolicies(req.Event, job, pod) } - st := state.NewState(req) - if err := st.Execute(); err != nil { - glog.Errorf("Failed to handle Job %s/%s: %v", - req.Job.Namespace, req.Job.Name, err) + if err := st.Execute(action, req.Reason, req.Message); err != nil { + glog.Errorf("Failed to handle Job <%s/%s>: %v", + job.Namespace, job.Name, err) // If any error, requeue it. - // TODO(k82cn): replace with RateLimteQueue - cc.eventQueue.Add(req) + if e := cc.eventQueue.Add(req); e != nil { + glog.Errorf("Failed to reqeueue request <%v>", req) + } } } diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 3f1d2ab7d9..011ea188e1 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -28,61 +28,125 @@ import ( kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" - vkapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" "hpw.cloud/volcano/pkg/apis/helpers" "hpw.cloud/volcano/pkg/controllers/job/state" ) -func (cc *Controller) resumeJob(req *state.Request) error { - switch req.Reason { +func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error { + job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) + if err != nil { + if apierrors.IsNotFound(err) { + glog.V(3).Infof("Job has been deleted: %v", job.Name) + return nil + } + return err + } + + if job.DeletionTimestamp != nil { + glog.Infof("Job <%s/%s> is terminating, skip management process.", + job.Namespace, job.Name) + return nil + } + podsMap, err := getPodsForJob(cc.podLister, job) + if err != nil { + return err } - return nil -} -func (cc *Controller) abortJob(req *state.Request) error { - switch req.Reason { + var pending, running, terminating, succeeded, failed int32 + + var errs []error + var total int + + for _, pods := range podsMap { + for _, pod := range pods { + total++ + switch pod.Status.Phase { + case v1.PodRunning: + err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil) + if err != nil { + running++ + errs = append(errs, err) + continue + } + terminating++ + case v1.PodPending: + err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil) + if err != nil { + pending++ + errs = append(errs, err) + continue + } + terminating++ + case v1.PodSucceeded: + succeeded++ + case v1.PodFailed: + failed++ + } + } } - return nil -} -func (cc *Controller) terminateJob(req *state.Request) error { - switch req.Reason { + if len(errs) != 0 { + return fmt.Errorf("failed to kill %d pods of %d", len(errs), total) + } + job.Status = vkv1.JobStatus{ + Pending: pending, + Running: running, + Succeeded: succeeded, + Failed: failed, + Terminating: terminating, + MinAvailable: int32(job.Spec.MinAvailable), + } + if nextState != nil { + job.Status.State = nextState(job.Status) + } + + if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err } - return nil -} -func (cc *Controller) restartJob(req *state.Request) error { - switch req.Reason { + if err := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil { + glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } + if err := cc.kubeClients.CoreV1().Services(job.Namespace).Delete(job.Name, nil); err != nil { + glog.Errorf("Failed to delete Service of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err } + + // NOTE(k82cn): DO NOT delete input/output until job is deleted. + return nil } -func (cc *Controller) syncJob(req *state.Request) error { - j := req.Job - job, err := cc.jobLister.Jobs(j.Namespace).Get(j.Name) +func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error { + job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { if apierrors.IsNotFound(err) { - glog.V(3).Infof("Job has been deleted: %v", j.Name) return nil } return err } - podsMap, err := getPodsForJob(cc.podLister, job) - if err != nil { - return err - } - if job.DeletionTimestamp != nil { glog.Infof("Job <%s/%s> is terminating, skip management process.", job.Namespace, job.Name) return nil } + podsMap, err := getPodsForJob(cc.podLister, job) + if err != nil { + return err + } + glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name) // TODO(k82cn): add WebHook to validate job. @@ -90,142 +154,28 @@ func (cc *Controller) syncJob(req *state.Request) error { glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err) } - // If PodGroup does not exist, create one for Job. - if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } - pg := &kbv1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: job.Name, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: kbv1.PodGroupSpec{ - MinMember: job.Spec.MinAvailable, - }, - } - - if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil { - glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - - return e - } - } - - // If input/output PVC does not exist, create them for Job. - inputPVC := fmt.Sprintf("%s-input", job.Name) - outputPVC := fmt.Sprintf("%s-output", job.Name) - if job.Spec.Input != nil { - if job.Spec.Input.VolumeClaim != nil { - if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } - - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: inputPVC, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: *job.Spec.Input.VolumeClaim, - } - - glog.V(3).Infof("Try to create input PVC: %v", pvc) - - if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { - glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return e - } - } - } + if err := cc.createPodGroupIfNotExist(job); err != nil { + return err } - if job.Spec.Output != nil { - if job.Spec.Output.VolumeClaim != nil { - if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } - - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: outputPVC, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: *job.Spec.Output.VolumeClaim, - } - - glog.V(3).Infof("Try to create output PVC: %v", pvc) - - if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { - glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return e - } - } - } + if err := cc.createJobIOIfNotExist(job); err != nil { + return err } - // If Service does not exist, create one for Job. - if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } - - svc := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: job.Name, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: v1.ServiceSpec{ - ClusterIP: "None", - Selector: map[string]string{ - vkapi.JobNameKey: job.Name, - vkapi.JobNamespaceKey: job.Namespace, - }, - }, - } - - if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil { - glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - - return e - } + if err := cc.createServiceIfNotExist(job); err != nil { + return err } var podToCreate []*v1.Pod var podToDelete []*v1.Pod - var running, pending, succeeded, failed int32 + var running, pending, terminating, succeeded, failed int32 for _, ts := range job.Spec.Tasks { name := ts.Template.Name // TODO(k82cn): the template name should be set in default func. if len(name) == 0 { - name = vkapi.DefaultTaskSpec + name = vkv1.DefaultTaskSpec } pods, found := podsMap[name] @@ -234,16 +184,24 @@ func (cc *Controller) syncJob(req *state.Request) error { } for i := 0; i < int(ts.Replicas); i++ { - podName := fmt.Sprintf("%s-%s-%d", job.Name, name, i) + podName := fmt.Sprintf(TaskNameFmt, job.Name, name, i) if pod, found := pods[podName]; !found { newPod := createJobPod(job, &ts.Template, i) podToCreate = append(podToCreate, newPod) } else { switch pod.Status.Phase { case v1.PodPending: - pending++ + if pod.DeletionTimestamp != nil { + terminating++ + } else { + pending++ + } case v1.PodRunning: - running++ + if pod.DeletionTimestamp != nil { + terminating++ + } else { + running++ + } /**/ case v1.PodSucceeded: succeeded++ case v1.PodFailed: @@ -272,6 +230,7 @@ func (cc *Controller) syncJob(req *state.Request) error { pod.Name, job.Name, err) creationErrs = append(creationErrs, err) } else { + pending++ glog.V(3).Infof("Created Task <%s> of Job <%s/%s>", pod.Name, job.Namespace, job.Name) } @@ -290,7 +249,7 @@ func (cc *Controller) syncJob(req *state.Request) error { for _, pod := range podToDelete { go func(pod *v1.Pod) { defer waitDeletionGroup.Done() - err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) + err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil) if err != nil { // Failed to create Pod, waitCreationGroup a moment and then create it again // This is to ensure all podsMap under the same Job created @@ -301,6 +260,7 @@ func (cc *Controller) syncJob(req *state.Request) error { } else { glog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>", pod.Name, job.Namespace, job.Name) + terminating++ } }(pod) } @@ -311,15 +271,19 @@ func (cc *Controller) syncJob(req *state.Request) error { } } - job.Status = vkapi.JobStatus{ + job.Status = vkv1.JobStatus{ Pending: pending, Running: running, Succeeded: succeeded, Failed: failed, + Terminating: terminating, MinAvailable: int32(job.Spec.MinAvailable), } - // TODO(k82cn): replaced it with `UpdateStatus` or `Patch` + if nextState != nil { + job.Status.State = nextState(job.Status) + } + if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { glog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) @@ -328,3 +292,139 @@ func (cc *Controller) syncJob(req *state.Request) error { return err } + +func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error { + // If Service does not exist, create one for Job. + if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: job.Name, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: v1.ServiceSpec{ + ClusterIP: "None", + Selector: map[string]string{ + vkv1.JobNameKey: job.Name, + vkv1.JobNamespaceKey: job.Namespace, + }, + }, + } + + if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil { + glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + + return e + } + } + + return nil +} + +func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { + // If input/output PVC does not exist, create them for Job. + inputPVC := fmt.Sprintf("%s-input", job.Name) + outputPVC := fmt.Sprintf("%s-output", job.Name) + if job.Spec.Input != nil { + if job.Spec.Input.VolumeClaim != nil { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: inputPVC, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: *job.Spec.Input.VolumeClaim, + } + + glog.V(3).Infof("Try to create input PVC: %v", pvc) + + if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { + glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return e + } + } + } + } + if job.Spec.Output != nil { + if job.Spec.Output.VolumeClaim != nil { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + //return err + } + + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: outputPVC, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: *job.Spec.Output.VolumeClaim, + } + + glog.V(3).Infof("Try to create output PVC: %v", pvc) + + if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { + glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return e + } + } + } + } + return nil +} + +func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error { + // If PodGroup does not exist, create one for Job. + if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + pg := &kbv1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: job.Name, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: kbv1.PodGroupSpec{ + MinMember: job.Spec.MinAvailable, + }, + } + + if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil { + glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + + return e + } + } + + return nil +} diff --git a/pkg/controllers/job/job_controller_const.go b/pkg/controllers/job/job_controller_const.go new file mode 100644 index 0000000000..11c28ca295 --- /dev/null +++ b/pkg/controllers/job/job_controller_const.go @@ -0,0 +1,21 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +const ( + TaskNameFmt = "%s-%s-%d" +) diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 4c2f2340e7..831647fa76 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Vulcan Authors. +Copyright 2017 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,71 +24,76 @@ import ( "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" - vkbatch "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" - vkcore "hpw.cloud/volcano/pkg/apis/bus/v1alpha1" - "hpw.cloud/volcano/pkg/controllers/job/state" + vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + vkbusv1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1" ) func (cc *Controller) addCommand(obj interface{}) { - cmd, ok := obj.(*vkcore.Command) + cmd, ok := obj.(*vkbusv1.Command) if !ok { glog.Errorf("obj is not Command") return } - cc.enqueue(&state.Request{ - Event: vkbatch.CommandIssuedEvent, - Action: vkbatch.Action(cmd.Action), - + cc.eventQueue.Add(&Request{ Namespace: cmd.Namespace, - Target: cmd.TargetObject, + JobName: cmd.TargetObject.Name, + + Event: vkbatchv1.CommandIssuedEvent, + Action: vkbatchv1.Action(cmd.Action), }) } func (cc *Controller) addJob(obj interface{}) { - job, ok := obj.(*vkbatch.Job) + job, ok := obj.(*vkbatchv1.Job) if !ok { glog.Errorf("obj is not Job") return } - cc.enqueue(&state.Request{ - Event: vkbatch.OutOfSyncEvent, - Job: job, + cc.eventQueue.Add(&Request{ + Namespace: job.Namespace, + JobName: job.Name, + + Event: vkbatchv1.OutOfSyncEvent, }) } func (cc *Controller) updateJob(oldObj, newObj interface{}) { - newJob, ok := newObj.(*vkbatch.Job) + newJob, ok := newObj.(*vkbatchv1.Job) if !ok { glog.Errorf("newObj is not Job") return } - oldJob, ok := oldObj.(*vkbatch.Job) + oldJob, ok := oldObj.(*vkbatchv1.Job) if !ok { glog.Errorf("oldObj is not Job") return } if !reflect.DeepEqual(oldJob.Spec, newJob.Spec) { - cc.enqueue(&state.Request{ - Event: vkbatch.OutOfSyncEvent, - Job: newJob, + cc.eventQueue.Add(&Request{ + Namespace: newJob.Namespace, + JobName: newJob.Name, + + Event: vkbatchv1.OutOfSyncEvent, }) } } func (cc *Controller) deleteJob(obj interface{}) { - job, ok := obj.(*vkbatch.Job) + job, ok := obj.(*vkbatchv1.Job) if !ok { glog.Errorf("obj is not Job") return } - cc.enqueue(&state.Request{ - Event: vkbatch.OutOfSyncEvent, - Job: job, + cc.eventQueue.Add(&Request{ + Namespace: job.Namespace, + JobName: job.Name, + + Event: vkbatchv1.OutOfSyncEvent, }) } @@ -99,9 +104,17 @@ func (cc *Controller) addPod(obj interface{}) { return } - cc.enqueue(&state.Request{ - Event: vkbatch.OutOfSyncEvent, - Pod: pod, + jobName, found := pod.Annotations[vkbatchv1.JobNameKey] + if !found { + return + } + + cc.eventQueue.Add(&Request{ + Namespace: pod.Namespace, + JobName: jobName, + PodName: pod.Name, + + Event: vkbatchv1.OutOfSyncEvent, }) } @@ -112,9 +125,17 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { return } - cc.enqueue(&state.Request{ - Event: vkbatch.OutOfSyncEvent, - Pod: pod, + jobName, found := pod.Annotations[vkbatchv1.JobNameKey] + if !found { + return + } + + cc.eventQueue.Add(&Request{ + Namespace: pod.Namespace, + JobName: jobName, + PodName: pod.Name, + + Event: vkbatchv1.OutOfSyncEvent, }) } @@ -135,15 +156,16 @@ func (cc *Controller) deletePod(obj interface{}) { return } - cc.enqueue(&state.Request{ - Event: vkbatch.OutOfSyncEvent, - Pod: pod, - }) -} - -func (cc *Controller) enqueue(obj interface{}) { - err := cc.eventQueue.Add(obj) - if err != nil { - glog.Errorf("Fail to enqueue Job to update queue, err %v", err) + jobName, found := pod.Annotations[vkbatchv1.JobNameKey] + if !found { + return } + + cc.eventQueue.Add(&Request{ + Namespace: pod.Namespace, + JobName: jobName, + PodName: pod.Name, + + Event: vkbatchv1.OutOfSyncEvent, + }) } diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index ff3b6f8af0..6207063ac4 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -18,7 +18,6 @@ package job import ( "fmt" - "github.com/golang/glog" "k8s.io/api/core/v1" @@ -30,7 +29,6 @@ import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" "hpw.cloud/volcano/pkg/apis/helpers" - "hpw.cloud/volcano/pkg/controllers/job/state" ) func validate(job *vkv1.Job) error { @@ -48,22 +46,12 @@ func validate(job *vkv1.Job) error { } func eventKey(obj interface{}) (string, error) { - req := obj.(*state.Request) - - if req.Pod == nil && req.Job == nil { - return "", fmt.Errorf("empty data for request") + req, ok := obj.(*Request) + if !ok { + return "", fmt.Errorf("failed to convert %v to *Request", obj) } - if req.Job != nil { - return fmt.Sprintf("%s/%s", req.Job.Namespace, req.Job.Name), nil - } - - name, found := req.Pod.Annotations[vkv1.JobNameKey] - if !found { - return "", fmt.Errorf("failed to find job of pod <%s/%s>", - req.Pod.Namespace, req.Pod.Name) - } - return fmt.Sprintf("%s/%s", req.Pod.Namespace, name), nil + return fmt.Sprintf("%s/%s", req.Namespace, req.JobName), nil } func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { @@ -71,7 +59,7 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s-%d", job.Name, template.Name, ix), + Name: fmt.Sprintf(TaskNameFmt, job.Name, template.Name, ix), Namespace: job.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, helpers.JobKind), @@ -101,8 +89,10 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { } for i, c := range pod.Spec.Containers { - vm := job.Spec.Output.VolumeMount - vm.Name = fmt.Sprintf("%s-output", job.Name) + vm := v1.VolumeMount{ + MountPath: job.Spec.Output.MountPath, + Name: fmt.Sprintf("%s-output", job.Name), + } pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) } } @@ -126,9 +116,13 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { } for i, c := range pod.Spec.Containers { - vm := job.Spec.Input.VolumeMount - vm.Name = fmt.Sprintf("%s-input", job.Name) + vm := v1.VolumeMount{ + MountPath: job.Spec.Input.MountPath, + Name: fmt.Sprintf("%s-input", job.Name), + } + pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) + } } @@ -194,3 +188,33 @@ func getPodsForJob(podLister corelisters.PodLister, job *vkv1.Job) (map[string]m return pods, nil } + +func applyPolicies(event vkv1.Event, job *vkv1.Job, pod *v1.Pod) vkv1.Action { + // Overwrite Job level policies + if pod != nil { + // Parse task level policies + if taskName, found := pod.Annotations[vkv1.TaskSpecKey]; found { + for _, task := range job.Spec.Tasks { + if task.Name == taskName { + for _, policy := range task.Policies { + if policy.Event == event || policy.Event == vkv1.AnyEvent { + return policy.Action + } + } + } + } + } else { + glog.Errorf("Failed to find taskSpecKey in Pod <%s/%s>", + pod.Namespace, pod.Name) + } + } + + // Parse Job level policies + for _, policy := range job.Spec.Policies { + if policy.Event == event || policy.Event == vkv1.AnyEvent { + return policy.Action + } + } + + return vkv1.SyncJobAction +} diff --git a/pkg/controllers/job/state/restart.go b/pkg/controllers/job/state/aborted.go similarity index 64% rename from pkg/controllers/job/state/restart.go rename to pkg/controllers/job/state/aborted.go index 829a2c3cd3..7a5a5a90a5 100644 --- a/pkg/controllers/job/state/restart.go +++ b/pkg/controllers/job/state/aborted.go @@ -20,19 +20,21 @@ import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" ) -type restartingState struct { - request *Request - policies map[vkv1.Event]vkv1.Action +type abortedState struct { + job *vkv1.Job } -func (ps *restartingState) Execute() error { - action := ps.policies[ps.request.Event] +func (as *abortedState) Execute(action vkv1.Action, reason string, msg string) (error) { switch action { - case vkv1.RestartJobAction, vkv1.RestartTaskAction: - // Already in Restarting phase, just sync it - return actionFns[vkv1.SyncJobAction](ps.request) + case vkv1.ResumeJobAction: + return SyncJob(as.job, func(status vkv1.JobStatus) vkv1.JobState { + return vkv1.JobState{ + Phase: vkv1.Restarting, + Reason: reason, + Message: msg, + } + }) default: - fn := actionFns[action] - return fn(ps.request) + return KillJob(as.job, nil) } } diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go new file mode 100644 index 0000000000..aa7126fba6 --- /dev/null +++ b/pkg/controllers/job/state/aborting.go @@ -0,0 +1,56 @@ +/* +Copyright 2017 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type abortingState struct { + job *vkv1.Job +} + +func (ps *abortingState) Execute(action vkv1.Action, reason string, msg string) (error) { + switch action { + case vkv1.ResumeJobAction: + // Already in Restarting phase, just sync it + return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + return vkv1.JobState{ + Phase: vkv1.Restarting, + Reason: reason, + Message: msg, + } + }) + default: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + // If any "alive" pods, still in Aborting phase + if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { + return vkv1.JobState{ + Phase: vkv1.Aborting, + Reason: reason, + Message: msg, + } + } + + return vkv1.JobState{ + Phase: vkv1.Aborted, + Reason: reason, + Message: msg, + } + }) + } +} diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index ab8b1e18d4..1d56a2523b 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -17,54 +17,42 @@ limitations under the License. package state import ( - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" - vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" ) -type Request struct { - Event vkv1.Event - Action vkv1.Action - - Namespace string - Target *metav1.OwnerReference - - Job *vkv1.Job - Pod *v1.Pod - PodGroup *kbv1.PodGroup - - Reason string - Message string -} - -type ActionFn func(req *Request) error - -var actionFns = map[vkv1.Action]ActionFn{} +type NextStateFn func(status vkv1.JobStatus) vkv1.JobState +type ActionFn func(job *vkv1.Job, fn NextStateFn) error -func RegisterActions(afs map[vkv1.Action]ActionFn) { - actionFns = afs -} +var ( + // SyncJob will create or delete Pods according to Job's spec. + SyncJob ActionFn + // KillJob kill all Pods of Job. + KillJob ActionFn +) type State interface { - Execute() error + // Execute executes the actions based on current state. + Execute(act vkv1.Action, reason string, msg string) error } -func NewState(req *Request) State { - policies := parsePolicies(req) - - switch req.Job.Status.State.Phase { +func NewState(job *vkv1.Job) State { + switch job.Status.State.Phase { + case vkv1.Pending: + return &pendingState{job: job} + case vkv1.Running: + return &runningState{job: job} case vkv1.Restarting: - return &restartingState{ - request: req, - policies: policies, - } - default: - return &baseState{ - request: req, - policies: policies, - } + return &restartingState{job: job} + case vkv1.Terminated, vkv1.Completed: + return &finishedState{job: job} + case vkv1.Terminating: + return &terminatingState{job: job} + case vkv1.Aborting: + return &abortingState{job: job} + case vkv1.Aborted: + return &abortedState{job: job} } + + // It's pending by default. + return &pendingState{job: job} } diff --git a/pkg/controllers/job/state/base.go b/pkg/controllers/job/state/finished.go similarity index 60% rename from pkg/controllers/job/state/base.go rename to pkg/controllers/job/state/finished.go index 45bc6d5b7e..01cef3d3d4 100644 --- a/pkg/controllers/job/state/base.go +++ b/pkg/controllers/job/state/finished.go @@ -17,26 +17,15 @@ limitations under the License. package state import ( - "github.com/golang/glog" - vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" ) -type baseState struct { - request *Request - policies map[vkv1.Event]vkv1.Action +type finishedState struct { + job *vkv1.Job } -func (ps *baseState) Execute() error { - action := ps.policies[ps.request.Event] - glog.V(3).Infof("The action for event <%s> is <%s>", - ps.request.Event, action) - switch action { - case vkv1.RestartJobAction: - fn := actionFns[action] - return fn(ps.request) - default: - fn := actionFns[action] - return fn(ps.request) - } +func (ps *finishedState) Execute(action vkv1.Action, reason string, msg string) (error) { + // In finished state, e.g. Completed, always kill the whole job. + return KillJob(ps.job, nil) } + diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go new file mode 100644 index 0000000000..6b03e8dfeb --- /dev/null +++ b/pkg/controllers/job/state/pending.go @@ -0,0 +1,57 @@ +/* +Copyright 2017 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type pendingState struct { + job *vkv1.Job +} + +func (ps *pendingState) Execute(action vkv1.Action, reason string, msg string) error { + switch action { + case vkv1.RestartJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Pending + if status.Terminating != 0 { + phase = vkv1.Restarting + } + + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + default: + return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + total := totalTasks(ps.job) + phase := vkv1.Pending + + if total == status.Running { + phase = vkv1.Running + } + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + } +} diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go new file mode 100644 index 0000000000..bd12f8d5c6 --- /dev/null +++ b/pkg/controllers/job/state/restarting.go @@ -0,0 +1,45 @@ +/* +Copyright 2017 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type restartingState struct { + job *vkv1.Job +} + +func (ps *restartingState) Execute(action vkv1.Action, reason string, msg string) ( error) { + return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Restarting + if status.Terminating == 0 { + if status.Pending == 0 && status.Running != 0 { + phase = vkv1.Running + } else { + phase = vkv1.Pending + } + } + + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + +} diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go new file mode 100644 index 0000000000..422b43652d --- /dev/null +++ b/pkg/controllers/job/state/running.go @@ -0,0 +1,82 @@ +/* +Copyright 2017 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type runningState struct { + job *vkv1.Job +} + +func (ps *runningState) Execute(action vkv1.Action, reason string, msg string) (error) { + switch action { + case vkv1.RestartJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Running + if status.Terminating != 0 { + phase = vkv1.Restarting + } + + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + case vkv1.AbortJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Running + if status.Terminating != 0 { + phase = vkv1.Aborting + } + + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + case vkv1.TerminateJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Running + if status.Terminating != 0 { + phase = vkv1.Terminating + } + + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + default: + return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Running + if status.Succeeded+status.Failed == totalTasks(ps.job) { + phase = vkv1.Completed + } + + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + } +} diff --git a/pkg/controllers/job/state/terminating.go b/pkg/controllers/job/state/terminating.go new file mode 100644 index 0000000000..5a7cd9d33a --- /dev/null +++ b/pkg/controllers/job/state/terminating.go @@ -0,0 +1,44 @@ +/* +Copyright 2017 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type terminatingState struct { + job *vkv1.Job +} + +func (ps *terminatingState) Execute(action vkv1.Action, reason string, msg string) (error) { + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + // If any "alive" pods, still in Terminating phase + if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { + return vkv1.JobState{ + Phase: vkv1.Terminating, + Reason: reason, + Message: msg, + } + } + + return vkv1.JobState{ + Phase: vkv1.Terminated, + Reason: reason, + Message: msg, + } + }) +} diff --git a/pkg/controllers/job/state/util.go b/pkg/controllers/job/state/util.go index 22761df408..6eacf590eb 100644 --- a/pkg/controllers/job/state/util.go +++ b/pkg/controllers/job/state/util.go @@ -20,23 +20,12 @@ import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" ) -func parsePolicies(req *Request) map[vkv1.Event]vkv1.Action { - actions := map[vkv1.Event]vkv1.Action{} +func totalTasks(job *vkv1.Job) int32 { + var rep int32 - // Set Job level policies - for _, policy := range req.Job.Spec.Policies { - actions[policy.Event] = policy.Action + for _, task := range job.Spec.Tasks { + rep += task.Replicas } - // TODO(k82cn): set task level polices - - // Set default action - actions[vkv1.OutOfSyncEvent] = vkv1.SyncJobAction - - // Set command action - if len(req.Action) != 0 { - actions[req.Event] = req.Action - } - - return actions + return rep } diff --git a/test/e2e/util.go b/test/e2e/util.go index fb45ad6e3d..7b4a2d23a5 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -344,10 +344,14 @@ func jobUnschedulable(ctx *context, job *vkv1.Job, time time.Time) wait.Conditio // TODO(k82cn): check Job's Condition instead of PodGroup's event. return func() (bool, error) { pg, err := ctx.kbclient.SchedulingV1alpha1().PodGroups(job.Namespace).Get(job.Name, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) + if err != nil { + return false, nil + } events, err := ctx.kubeclient.CoreV1().Events(pg.Namespace).List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) + if err != nil { + return false, nil + } for _, event := range events.Items { target := event.InvolvedObject