Skip to content

Commit

Permalink
add volcano gang-scheduler pg min resource calculation (#566)
Browse files Browse the repository at this point in the history
* add volcano gang-scheduler pg min resource calculation

Signed-off-by: lowang_bh <lhui_wang@163.com>

* use priorityclass lister

Signed-off-by: lowang_bh <lhui_wang@163.com>

* Update pkg/controller/podgroup.go

Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>

---------

Signed-off-by: lowang_bh <lhui_wang@163.com>
Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
lowang-bh and tenzen-y authored Jun 16, 2023
1 parent fda0532 commit caa1112
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 32 deletions.
6 changes: 3 additions & 3 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,12 @@ func NewMPIJobControllerWithClock(
priorityClassLister schedulinglisters.PriorityClassLister
priorityClassSynced cache.InformerSynced
)
priorityClassLister = priorityClassInformer.Lister()
priorityClassSynced = priorityClassInformer.Informer().HasSynced
if gangSchedulingName == options.GangSchedulerVolcano {
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace)
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace, priorityClassLister)
} else if len(gangSchedulingName) != 0 {
// Use scheduler-plugins as a default gang-scheduler.
priorityClassLister = priorityClassInformer.Lister()
priorityClassSynced = priorityClassInformer.Informer().HasSynced
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, namespace, gangSchedulingName, priorityClassLister)
}
if podGroupCtrl != nil {
Expand Down
64 changes: 41 additions & 23 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,31 @@ type PodGroupControl interface {
decoratePodTemplateSpec(pts *corev1.PodTemplateSpec, mpiJobName string)
// calculatePGMinResources will calculate minResources for podGroup.
calculatePGMinResources(minMember *int32, mpiJob *kubeflow.MPIJob) *corev1.ResourceList
// podGroupSpecIsDeepEqual will return true if the spec fields of two podGroup are equals.
// pgSpecsAreEqual will return true if the spec fields of two podGroup are equals.
pgSpecsAreEqual(a, b metav1.Object) bool
}

// VolcanoCtrl is the implementation fo PodGroupControl with volcano.
type VolcanoCtrl struct {
Client volcanoclient.Interface
InformerFactory volcanoinformers.SharedInformerFactory
PodGroupInformer volcanopodgroupinformer.PodGroupInformer
schedulerName string
Client volcanoclient.Interface
InformerFactory volcanoinformers.SharedInformerFactory
PodGroupInformer volcanopodgroupinformer.PodGroupInformer
PriorityClassLister schedulinglisters.PriorityClassLister
schedulerName string
}

func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string) *VolcanoCtrl {
func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
var informerFactoryOpts []volcanoinformers.SharedInformerOption
if watchNamespace != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(watchNamespace))
}
informerFactory := volcanoinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
return &VolcanoCtrl{
Client: c,
InformerFactory: informerFactory,
PodGroupInformer: informerFactory.Scheduling().V1beta1().PodGroups(),
schedulerName: options.GangSchedulerVolcano,
Client: c,
InformerFactory: informerFactory,
PodGroupInformer: informerFactory.Scheduling().V1beta1().PodGroups(),
PriorityClassLister: pcLister,
schedulerName: options.GangSchedulerVolcano,
}
}

Expand Down Expand Up @@ -167,11 +169,21 @@ func (v *VolcanoCtrl) decoratePodTemplateSpec(pts *corev1.PodTemplateSpec, mpiJo
pts.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = mpiJobName
}

func (v *VolcanoCtrl) calculatePGMinResources(_ *int32, mpiJob *kubeflow.MPIJob) *corev1.ResourceList {
if schedPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedPolicy != nil {
// calculatePGMinResources calculates minResources for volcano podGroup.
// The minMember is task's total MinAvailable or replicas if task's minAvailable is not set in vcJob.
// PodGroup's MinResources leaves empty now if it is not set. So we calculate the minResources among those first minMember replicas with higher priority.
// ret: https://github.com/volcano-sh/volcano/blob/1933d46bdc4434772518ebb74c4281671ddeffa1/pkg/webhooks/admission/jobs/mutate/mutate_job.go#L168
// ref: https://github.com/volcano-sh/volcano/blob/1933d46bdc4434772518ebb74c4281671ddeffa1/pkg/controllers/job/job_controller_actions.go#L761
func (v *VolcanoCtrl) calculatePGMinResources(minMember *int32, mpiJob *kubeflow.MPIJob) *corev1.ResourceList {
if schedPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedPolicy != nil && schedPolicy.MinResources != nil {
return schedPolicy.MinResources
}
return nil
if minMember != nil && *minMember == 0 {
return nil
}

// sort task by priorityClasses
return calPGMinResource(minMember, mpiJob, v.PriorityClassLister)
}

func (v *VolcanoCtrl) pgSpecsAreEqual(a, b metav1.Object) bool {
Expand Down Expand Up @@ -311,16 +323,30 @@ func (s *SchedulerPluginsCtrl) calculatePGMinResources(minMember *int32, mpiJob
return nil
}

return calPGMinResource(minMember, mpiJob, s.PriorityClassLister)
}

func (s *SchedulerPluginsCtrl) pgSpecsAreEqual(a, b metav1.Object) bool {
PGa := a.(*schedv1alpha1.PodGroup)
PGb := b.(*schedv1alpha1.PodGroup)
return equality.Semantic.DeepEqual(PGa.Spec, PGb.Spec)
}

var _ PodGroupControl = &SchedulerPluginsCtrl{}

// calPGMinResource returns the minimum resource for mpiJob with minMembers
func calPGMinResource(minMember *int32, mpiJob *kubeflow.MPIJob, pcLister schedulinglisters.PriorityClassLister) *corev1.ResourceList {
var order replicasOrder
for rt, replica := range mpiJob.Spec.MPIReplicaSpecs {
rp := replicaPriority{
priority: 0,
replicaType: rt,
ReplicaSpec: *replica,
}

pcName := replica.Template.Spec.PriorityClassName
if len(pcName) != 0 {
if priorityClass, err := s.PriorityClassLister.Get(pcName); err != nil {
if len(pcName) != 0 && pcLister != nil {
if priorityClass, err := pcLister.Get(pcName); err != nil {
klog.Warningf("Ignore replica %q priority class %q: %v", rt, pcName, err)
} else {
rp.priority = priorityClass.Value
Expand Down Expand Up @@ -359,14 +385,6 @@ func (s *SchedulerPluginsCtrl) calculatePGMinResources(minMember *int32, mpiJob
return &minResources
}

func (s *SchedulerPluginsCtrl) pgSpecsAreEqual(a, b metav1.Object) bool {
PGa := a.(*schedv1alpha1.PodGroup)
PGb := b.(*schedv1alpha1.PodGroup)
return equality.Semantic.DeepEqual(PGa.Spec, PGb.Spec)
}

var _ PodGroupControl = &SchedulerPluginsCtrl{}

// calculateMinAvailable calculates minAvailable for the PodGroup.
// If the schedulingPolicy.minAvailable is nil, it returns returns `NUM(workers) + 1`; otherwise returns `schedulingPolicy.minAvailable`.
func calculateMinAvailable(mpiJob *kubeflow.MPIJob) *int32 {
Expand Down
115 changes: 109 additions & 6 deletions pkg/controller/podgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package controller

import (
"reflect"
"sort"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -183,6 +185,10 @@ func TestNewPodGroup(t *testing.T) {
MinMember: 3,
Queue: "project-x",
PriorityClassName: "high",
MinResources: &corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("21"),
corev1.ResourceMemory: resource.MustParse("42Gi"),
},
},
},
wantSchedPG: &schedv1alpha1.PodGroup{
Expand All @@ -207,8 +213,10 @@ func TestNewPodGroup(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
volcanoFixture := newFixture(t, "default-scheduler")
jobController, _, _ := volcanoFixture.newController(clock.RealClock{})
volcanoPGCtrl := &VolcanoCtrl{
Client: volcanoFixture.volcanoClient,
Client: volcanoFixture.volcanoClient,
PriorityClassLister: jobController.priorityClassLister,
}
volcanoPG := volcanoPGCtrl.newPodGroup(tc.mpiJob)
if diff := cmp.Diff(tc.wantVolcanoPG, volcanoPG, ignoreReferences); len(diff) != 0 {
Expand Down Expand Up @@ -361,8 +369,10 @@ func TestDecoratePodTemplateSpec(t *testing.T) {

func TestCalculatePGMinResources(t *testing.T) {
volcanoTests := map[string]struct {
job *kubeflow.MPIJob
want *corev1.ResourceList
job *kubeflow.MPIJob
priorityClasses []*schedulingv1.PriorityClass
minMember int32
want *corev1.ResourceList
}{
"minResources is not empty": {
job: &kubeflow.MPIJob{
Expand All @@ -388,12 +398,68 @@ func TestCalculatePGMinResources(t *testing.T) {
},
want: nil,
},
"without priorityClass": {
minMember: 3,
job: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: kubeflow.MPIJobSpec{
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: {
Replicas: pointer.Int32(1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
},
},
},
},
},
kubeflow.MPIReplicaTypeWorker: {
Replicas: pointer.Int32(2),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
},
},
},
},
},
},
},
},
},
},
want: &corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("22"),
corev1.ResourceMemory: resource.MustParse("65Gi"),
},
},
}
for name, tc := range volcanoTests {
t.Run(name, func(t *testing.T) {
f := newFixture(t, "scheduler-plugins-scheduler")
pgCtrl := VolcanoCtrl{Client: f.volcanoClient}
got := pgCtrl.calculatePGMinResources(pointer.Int32(0), tc.job)
f := newFixture(t, "volcano-scheduler")
if tc.priorityClasses != nil {
for _, pc := range tc.priorityClasses {
f.setUpPriorityClass(pc)
}
}
jobController, _, _ := f.newController(clock.RealClock{})
pgCtrl := VolcanoCtrl{Client: f.volcanoClient, PriorityClassLister: jobController.priorityClassLister}
got := pgCtrl.calculatePGMinResources(&tc.minMember, tc.job)
if diff := cmp.Diff(tc.want, got); len(diff) != 0 {
t.Fatalf("Unexpected calculatePGMinResources for the volcano (-want,+got):\n%s", diff)
}
Expand Down Expand Up @@ -752,3 +818,40 @@ func TestCalculateMinAvailable(t *testing.T) {
})
}
}

func TestReplicasOrder(t *testing.T) {
var lancherReplic, wokerReplic int32 = 1, 2
tests := map[string]struct {
original replicasOrder
expected replicasOrder
}{
"1-lancher, 2-worker, lancher higher priority": {
original: replicasOrder{
{priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
},
expected: replicasOrder{
{priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
},
},
"1-lancher, 2-worker, equal priority": {
original: replicasOrder{
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
{priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
},
expected: replicasOrder{
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
{priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
sort.Sort(sort.Reverse(tc.original))
if !reflect.DeepEqual(tc.original, tc.expected) {
t.Fatalf("Unexpected sort list (-want,+got):\n-want:%v\n+got:%v", tc.expected, tc.original)
}
})
}
}

0 comments on commit caa1112

Please sign in to comment.