diff --git a/config/crds/batch_v1alpha1_job.yaml b/config/crds/batch_v1alpha1_job.yaml index 64c6306216..e30b396273 100644 --- a/config/crds/batch_v1alpha1_job.yaml +++ b/config/crds/batch_v1alpha1_job.yaml @@ -54,7 +54,6 @@ spec: should be mounted. Defaults to "" (volume's root). type: string required: - - name - mountPath type: object minAvailable: @@ -88,7 +87,6 @@ spec: should be mounted. Defaults to "" (volume's root). type: string required: - - name - mountPath type: object policies: diff --git a/example/job.yaml b/example/job.yaml index 8f28fb4bb6..3d5fca7c18 100644 --- a/example/job.yaml +++ b/example/job.yaml @@ -4,6 +4,16 @@ metadata: name: test-job spec: minAvailable: 3 + input: + mountPath: "/myinput" + output: + mountPath: "/myoutput" + claim: + accessModes: [ "ReadWriteOnce" ] + storageClassName: "my-storage-class" + resources: + requests: + storage: 1Gi taskSpecs: - replicas: 6 template: diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index a7030d0350..53a289a249 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -58,6 +58,7 @@ type Controller struct { jobInformer vkbatchinfo.JobInformer podInformer coreinformers.PodInformer + pvcInformer coreinformers.PersistentVolumeClaimInformer pgInformer kbinfo.PodGroupInformer svcInformer coreinformers.ServiceInformer cmdInformer vkcoreinfo.CommandInformer @@ -70,6 +71,9 @@ type Controller struct { podLister corelisters.PodLister podSynced func() bool + pvcLister corelisters.PersistentVolumeClaimLister + pvcSynced func() bool + // A store of podgroups pgLister kblister.PodGroupLister pgSynced func() bool @@ -156,6 +160,10 @@ func NewJobController(config *rest.Config) *Controller { cc.podLister = cc.podInformer.Lister() cc.podSynced = cc.podInformer.Informer().HasSynced + cc.pvcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().PersistentVolumeClaims() + cc.pvcLister = cc.pvcInformer.Lister() + cc.pvcSynced = cc.pvcInformer.Informer().HasSynced + cc.svcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Services() cc.svcLister = cc.svcInformer.Lister() cc.svcSynced = cc.svcInformer.Informer().HasSynced @@ -182,12 +190,13 @@ func NewJobController(config *rest.Config) *Controller { func (cc *Controller) Run(stopCh <-chan struct{}) { go cc.jobInformer.Informer().Run(stopCh) go cc.podInformer.Informer().Run(stopCh) + go cc.pvcInformer.Informer().Run(stopCh) go cc.pgInformer.Informer().Run(stopCh) go cc.svcInformer.Informer().Run(stopCh) go cc.cmdInformer.Informer().Run(stopCh) cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced, - cc.svcSynced, cc.cmdSynced) + cc.svcSynced, cc.cmdSynced, cc.pvcSynced) go wait.Until(cc.worker, 0, stopCh) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index d7e35124d2..3f1d2ab7d9 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -118,6 +118,72 @@ func (cc *Controller) syncJob(req *state.Request) error { } } + // If input/output PVC does not exist, create them for Job. + inputPVC := fmt.Sprintf("%s-input", job.Name) + outputPVC := fmt.Sprintf("%s-output", job.Name) + if job.Spec.Input != nil { + if job.Spec.Input.VolumeClaim != nil { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: inputPVC, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: *job.Spec.Input.VolumeClaim, + } + + glog.V(3).Infof("Try to create input PVC: %v", pvc) + + if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { + glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return e + } + } + } + } + + if job.Spec.Output != nil { + if job.Spec.Output.VolumeClaim != nil { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: outputPVC, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: *job.Spec.Output.VolumeClaim, + } + + glog.V(3).Infof("Try to create output PVC: %v", pvc) + + if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { + glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return e + } + } + } + } + + // If Service does not exist, create one for Job. if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil { if !apierrors.IsNotFound(err) { glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v", diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 5674628b68..ff3b6f8af0 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "github.com/golang/glog" "k8s.io/api/core/v1" @@ -81,6 +82,56 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { Spec: templateCopy.Spec, } + if job.Spec.Output != nil { + if job.Spec.Output.VolumeClaim == nil { + volume := v1.Volume{ + Name: fmt.Sprintf("%s-output", job.Name), + } + volume.EmptyDir = &v1.EmptyDirVolumeSource{} + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + } else { + volume := v1.Volume{ + Name: fmt.Sprintf("%s-output", job.Name), + } + volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: fmt.Sprintf("%s-output", job.Name), + } + + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + } + + for i, c := range pod.Spec.Containers { + vm := job.Spec.Output.VolumeMount + vm.Name = fmt.Sprintf("%s-output", job.Name) + pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) + } + } + + if job.Spec.Input != nil { + if job.Spec.Input.VolumeClaim == nil { + volume := v1.Volume{ + Name: fmt.Sprintf("%s-input", job.Name), + } + volume.EmptyDir = &v1.EmptyDirVolumeSource{} + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + } else { + volume := v1.Volume{ + Name: fmt.Sprintf("%s-input", job.Name), + } + volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: fmt.Sprintf("%s-input", job.Name), + } + + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + } + + for i, c := range pod.Spec.Containers { + vm := job.Spec.Input.VolumeMount + vm.Name = fmt.Sprintf("%s-input", job.Name) + pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) + } + } + if len(pod.Annotations) == 0 { pod.Annotations = make(map[string]string) } @@ -110,6 +161,7 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { if job.Spec.SchedulerName != "" && pod.Spec.SchedulerName == "" { pod.Spec.SchedulerName = job.Spec.SchedulerName } + return pod }