Skip to content

Commit

Permalink
Merge pull request #269 from k82cn/ka_266
Browse files Browse the repository at this point in the history
Added multiple pod template in QueueJob.
  • Loading branch information
k82cn authored Jul 5, 2018
2 parents f6abe1f + 07ac230 commit acf3a65
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 126 deletions.
25 changes: 16 additions & 9 deletions pkg/apis/v1alpha1/queuejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,25 @@ type QueueJob struct {

// QueueJobSpec describes how the job execution will look like and when it will actually run
type QueueJobSpec struct {
// SchedSpec specifies the parameters for scheduling.
SchedSpec SchedulingSpecTemplate `json:"schedulingSpec,omitempty" protobuf:"bytes,1,opt,name=schedulingSpec"`

// TaskSpecs specifies the task specification of QueueJob
TaskSpecs []TaskSpec `json:"taskSpecs,omitempty" protobuf:"bytes,2,opt,name=taskSpecs"`
}

// TaskSpec specifies the task specification of QueueJob
type TaskSpec struct {
// A label query over pods that should match the pod count.
// Normally, the system sets this field for you.
// +optional
Selector *metav1.LabelSelector `json:"selector,omitempty" protobuf:"bytes,1,opt,name=selector"`

// Replicas specifies the replicas of this QueueJob.
// Replicas specifies the replicas of this TaskSpec in QueueJob.
Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`

// SchedSpec specifies the parameters for scheduling.
SchedSpec SchedulingSpecTemplate `json:"schedulingSpec,omitempty" protobuf:"bytes,2,opt,name=schedulingSpec"`

// Specifies the pod that will be created when executing a QueueJob
// Specifies the pod that will be created for this TaskSpec
// when executing a QueueJob
Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"`
}

Expand All @@ -61,19 +68,19 @@ type QueueJobStatus struct {

// The number of running pods.
// +optional
Running int32 `json:"running,omitempty" protobuf:"bytes,1,opt,name=running"`
Running int32 `json:"running,omitempty" protobuf:"bytes,2,opt,name=running"`

// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,2,opt,name=succeeded"`
Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,3,opt,name=succeeded"`

// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed,omitempty" protobuf:"bytes,3,opt,name=failed"`
Failed int32 `json:"failed,omitempty" protobuf:"bytes,4,opt,name=failed"`

// The minimal available pods to run for this QueueJob
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,4,opt,name=minAvailable"`
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,5,opt,name=minAvailable"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
41 changes: 32 additions & 9 deletions pkg/apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 76 additions & 51 deletions pkg/controller/queuejob/queuejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Controller struct {
queueJobSynced func() bool

// A store of pods, populated by the podController
podStore corelisters.PodLister
podListr corelisters.PodLister
podSynced func() bool

// eventQueue that need to sync up
Expand Down Expand Up @@ -122,7 +122,7 @@ func NewQueueJobController(config *rest.Config) *Controller {
DeleteFunc: cc.deletePod,
},
})
cc.podStore = cc.podInformer.Lister()
cc.podListr = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced

return cc
Expand Down Expand Up @@ -314,36 +314,43 @@ func (cc *Controller) syncQueueJob(qj *arbv1.QueueJob) error {
return cc.manageQueueJob(queueJob, pods)
}

func (cc *Controller) getPodsForQueueJob(qj *arbv1.QueueJob) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(qj.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("couldn't convert QueueJob selector: %v", err)
}
func (cc *Controller) getPodsForQueueJob(qj *arbv1.QueueJob) (map[string][]*v1.Pod, error) {
pods := map[string][]*v1.Pod{}

// List all pods under QueueJob
pods, err := cc.podStore.Pods(qj.Namespace).List(selector)
if err != nil {
return nil, err
for _, ts := range qj.Spec.TaskSpecs {
selector, err := metav1.LabelSelectorAsSelector(ts.Selector)
if err != nil {
return nil, fmt.Errorf("couldn't convert QueueJob selector: %v", err)
}

// List all pods under QueueJob
ps, err := cc.podListr.Pods(qj.Namespace).List(selector)
if err != nil {
return nil, err
}

// TODO (k82cn): optimic by cache
for _, pod := range ps {
if !metav1.IsControlledBy(pod, qj) {
continue
}
// Hash by TaskSpec.Template.Name
pods[ts.Template.Name] = append(pods[ts.Template.Name], pod)
}
}

return pods, nil
}

// manageQueueJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>.
func (cc *Controller) manageQueueJob(qj *arbv1.QueueJob, pods []*v1.Pod) error {
func (cc *Controller) manageQueueJob(qj *arbv1.QueueJob, pods map[string][]*v1.Pod) error {
var err error

replicas := qj.Spec.Replicas

running := int32(filterPods(pods, v1.PodRunning))
pending := int32(filterPods(pods, v1.PodPending))
succeeded := int32(filterPods(pods, v1.PodSucceeded))
failed := int32(filterPods(pods, v1.PodFailed))

glog.V(3).Infof("There are %d pods of QueueJob %s: replicas %d, pending %d, running %d, succeeded %d, failed %d",
len(pods), qj.Name, replicas, pending, running, succeeded, failed)
runningSum := int32(0)
pendingSum := int32(0)
succeededSum := int32(0)
failedSum := int32(0)

ss, err := cc.arbclients.ArbV1().SchedulingSpecs(qj.Namespace).List(metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", qj.Name),
Expand All @@ -361,40 +368,58 @@ func (cc *Controller) manageQueueJob(qj *arbv1.QueueJob, pods []*v1.Pod) error {
len(ss.Items), qj.Namespace, qj.Name)
}

// Create pod if necessary
if diff := replicas - pending - running - succeeded; diff > 0 {
glog.V(3).Infof("Try to create %v Pods for QueueJob %v/%v", diff, qj.Namespace, qj.Name)

var errs []error
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i < diff; i++ {
go func(ix int32) {
defer wait.Done()
newPod := createQueueJobPod(qj, ix)
_, err := cc.clients.Core().Pods(newPod.Namespace).Create(newPod)
if err != nil {
// Failed to create Pod, wait a moment and then create it again
// This is to ensure all pods under the same QueueJob created
// So gang-scheduling could schedule the QueueJob successfully
glog.Errorf("Failed to create pod %s for QueueJob %s, err %#v",
newPod.Name, qj.Name, err)
errs = append(errs, err)
}
}(i)
}
wait.Wait()
for _, ts := range qj.Spec.TaskSpecs {
replicas := ts.Replicas
name := ts.Template.Name

running := int32(filterPods(pods[name], v1.PodRunning))
pending := int32(filterPods(pods[name], v1.PodPending))
succeeded := int32(filterPods(pods[name], v1.PodSucceeded))
failed := int32(filterPods(pods[name], v1.PodFailed))

runningSum += running
pendingSum += pending
succeededSum += succeeded
failedSum += failed

glog.V(3).Infof("There are %d pods of QueueJob %s (%s): replicas %d, pending %d, running %d, succeeded %d, failed %d",
len(pods), qj.Name, name, replicas, pending, running, succeeded, failed)

// Create pod if necessary
if diff := replicas - pending - running - succeeded; diff > 0 {
glog.V(3).Infof("Try to create %v Pods for QueueJob %v/%v", diff, qj.Namespace, qj.Name)

var errs []error
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i < diff; i++ {
go func(ix int32) {
defer wait.Done()
newPod := createQueueJobPod(qj, &ts.Template, ix)
_, err := cc.clients.Core().Pods(newPod.Namespace).Create(newPod)
if err != nil {
// Failed to create Pod, wait a moment and then create it again
// This is to ensure all pods under the same QueueJob created
// So gang-scheduling could schedule the QueueJob successfully
glog.Errorf("Failed to create pod %s for QueueJob %s, err %#v",
newPod.Name, qj.Name, err)
errs = append(errs, err)
}
}(i)
}
wait.Wait()

if len(errs) != 0 {
return fmt.Errorf("failed to create %d pods of %d", len(errs), diff)
if len(errs) != 0 {
return fmt.Errorf("failed to create %d pods of %d", len(errs), diff)
}
}
}

qj.Status = arbv1.QueueJobStatus{
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
Pending: pendingSum,
Running: runningSum,
Succeeded: succeededSum,
Failed: failedSum,
MinAvailable: int32(qj.Spec.SchedSpec.MinAvailable),
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/queuejob/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func createQueueJobSchedulingSpec(qj *arbv1.QueueJob) *arbv1.SchedulingSpec {
}
}

func createQueueJobPod(qj *arbv1.QueueJob, ix int32) *corev1.Pod {
templateCopy := qj.Spec.Template.DeepCopy()
func createQueueJobPod(qj *arbv1.QueueJob, template *corev1.PodTemplateSpec, ix int32) *corev1.Pod {
templateCopy := template.DeepCopy()

prefix := fmt.Sprintf("%s-", qj.Name)

Expand Down
7 changes: 6 additions & 1 deletion pkg/karcli/job/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ func ListJobs() error {
fmt.Printf("%-30s%-25s%-12s%-8s%-12s%-12s%-12s%-12s\n",
"Name", "Creation", "Replicas", "Min", "Pending", "Running", "Succeeded", "Failed")
for _, qj := range queueJobs.Items {
replicas := int32(0)
for _, ts := range qj.Spec.TaskSpecs {
replicas += ts.Replicas
}

fmt.Printf("%-30s%-25s%-12d%-8d%-12d%-12d%-12d%-12d\n",
qj.Name, qj.CreationTimestamp.Format("2006-01-02 15:04:05"), qj.Spec.Replicas,
qj.Name, qj.CreationTimestamp.Format("2006-01-02 15:04:05"), replicas,
qj.Status.MinAvailable, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)
}

Expand Down
46 changes: 26 additions & 20 deletions pkg/karcli/job/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,35 @@ func RunJob() error {
Namespace: launchJobFlags.Namespace,
},
Spec: arbv1.QueueJobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
queueJobName: launchJobFlags.Name,
},
},
Replicas: int32(launchJobFlags.Replicas),
SchedSpec: arbv1.SchedulingSpecTemplate{
MinAvailable: launchJobFlags.MinAvailable,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{queueJobName: launchJobFlags.Name},
},
Spec: v1.PodSpec{
SchedulerName: launchJobFlags.SchedulerName,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Image: launchJobFlags.Image,
Name: launchJobFlags.Name,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
TaskSpecs: []arbv1.TaskSpec{
{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
queueJobName: launchJobFlags.Name,
},
},
Replicas: int32(launchJobFlags.Replicas),

Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: launchJobFlags.Name,
Labels: map[string]string{queueJobName: launchJobFlags.Name},
},
Spec: v1.PodSpec{
SchedulerName: launchJobFlags.SchedulerName,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Image: launchJobFlags.Image,
Name: launchJobFlags.Name,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
},
},
},
},
},
Expand Down
Loading

0 comments on commit acf3a65

Please sign in to comment.