From e6fe8d917c3ffe9471814c064a7b59b272f9a51a Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Sat, 3 Aug 2019 15:13:46 +0800 Subject: [PATCH] remove shadowPodgroup in scheduler --- pkg/apis/helpers/helpers.go | 18 +++++ .../podgroup/pg_controller_handler.go | 20 +---- pkg/scheduler/cache/cache.go | 76 +++++++++---------- pkg/scheduler/cache/cache_test.go | 30 ++++---- pkg/scheduler/cache/event_handlers.go | 17 +---- pkg/scheduler/cache/util.go | 31 +------- test/e2e/predicates.go | 12 +-- 7 files changed, 83 insertions(+), 121 deletions(-) diff --git a/pkg/apis/helpers/helpers.go b/pkg/apis/helpers/helpers.go index 21476f3854c..c6c137d2701 100644 --- a/pkg/apis/helpers/helpers.go +++ b/pkg/apis/helpers/helpers.go @@ -130,3 +130,21 @@ func DeleteConfigmap(job *vkv1.Job, kubeClients kubernetes.Interface, cmName str return nil } + +// GeneratePodgroupName generate podgroup name of normal pod +func GeneratePodgroupName(pod *v1.Pod) string { + pgName := vkbatchv1.PodgroupNamePrefix + + if len(pod.OwnerReferences) != 0 { + for _, ownerReference := range pod.OwnerReferences { + if ownerReference.Controller != nil && *ownerReference.Controller == true { + pgName += string(ownerReference.UID) + return pgName + } + } + } + + pgName += string(pod.UID) + + return pgName +} diff --git a/pkg/controllers/podgroup/pg_controller_handler.go b/pkg/controllers/podgroup/pg_controller_handler.go index 7ab072e0879..f74c84f4fdd 100644 --- a/pkg/controllers/podgroup/pg_controller_handler.go +++ b/pkg/controllers/podgroup/pg_controller_handler.go @@ -23,7 +23,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) @@ -71,7 +70,7 @@ func (cc *Controller) updatePodAnnotations(pod *v1.Pod, pgName string) error { } func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error { - pgName := generatePodgroupName(pod) + pgName := helpers.GeneratePodgroupName(pod) if _, err := cc.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil { if !apierrors.IsNotFound(err) { @@ -101,23 +100,6 @@ func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error { return cc.updatePodAnnotations(pod, pgName) } -func generatePodgroupName(pod *v1.Pod) string { - pgName := vkbatchv1.PodgroupNamePrefix - - if len(pod.OwnerReferences) != 0 { - for _, ownerReference := range pod.OwnerReferences { - if ownerReference.Controller != nil && *ownerReference.Controller == true { - pgName += string(ownerReference.UID) - return pgName - } - } - } - - pgName += string(pod.UID) - - return pgName -} - func newPGOwnerReferences(pod *v1.Pod) []metav1.OwnerReference { if len(pod.OwnerReferences) != 0 { for _, ownerReference := range pod.OwnerReferences { diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 20cf9c9e106..28a40119017 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -507,24 +507,22 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error { } }() - if !shadowPodGroup(job.PodGroup) { - if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { - pg, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup) - if err != nil { - glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err) - return err - } - sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) - } else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { - pg, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup) - if err != nil { - glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err) - return err - } - sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) - } else { - return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version) + if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { + pg, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup) + if err != nil { + glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err) + return err + } + sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) + } else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { + pg, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup) + if err != nil { + glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err) + return err } + sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) + } else { + return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version) } return nil @@ -778,32 +776,30 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { baseErrorMessage = kbapi.AllNodeUnavailableMsg } - if !shadowPodGroup(job.PodGroup) { - pgUnschedulable := job.PodGroup != nil && - (job.PodGroup.Status.Phase == api.PodGroupUnknown || - job.PodGroup.Status.Phase == api.PodGroupPending) - pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0 + pgUnschedulable := job.PodGroup != nil && + (job.PodGroup.Status.Phase == api.PodGroupUnknown || + job.PodGroup.Status.Phase == api.PodGroupPending) + pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0 - // If pending or unschedulable, record unschedulable event. - if pgUnschedulable || pdbUnschedulabe { - msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError()) - if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { - podGroup, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup) - if err != nil { - glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err) - } - sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, - string(v1alpha1.PodGroupUnschedulableType), msg) + // If pending or unschedulable, record unschedulable event. + if pgUnschedulable || pdbUnschedulabe { + msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError()) + if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { + podGroup, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup) + if err != nil { + glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err) } + sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, + string(v1alpha1.PodGroupUnschedulableType), msg) + } - if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { - podGroup, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup) - if err != nil { - glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err) - } - sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, - string(v1alpha1.PodGroupUnschedulableType), msg) + if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { + podGroup, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup) + if err != nil { + glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err) } + sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, + string(v1alpha1.PodGroupUnschedulableType), msg) } } @@ -825,7 +821,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { // UpdateJobStatus update the status of job and its tasks. func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) { - if updatePG && !shadowPodGroup(job.PodGroup) { + if updatePG { pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup) if err != nil { return nil, err diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 93fc7a898a2..ea74125aaed 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -133,15 +133,15 @@ func TestAddPod(t *testing.T) { pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{owner}, make(map[string]string)) pi1 := api.NewTaskInfo(pod1) - pi1.Job = "j1" // The job name is set by cache. + pg1 := createShadowPodGroup(pod1) + pi1.Job = getJobID(pg1) // The job name is set by cache. pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), []metav1.OwnerReference{owner}, make(map[string]string)) pi2 := api.NewTaskInfo(pod2) - pi2.Job = "j1" // The job name is set by cache. + pg2 := createShadowPodGroup(pod2) + pi2.Job = getJobID(pg2) // The job name is set by cache. - j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2) - pg := createShadowPodGroup(pod1) - j1.SetPodGroup(pg) + j1 := api.NewJobInfo(api.JobID("c1/podgroup-j1"), pi1, pi2) node1 := buildNode("n1", buildResourceList("2000m", "10G")) ni1 := api.NewNodeInfo(node1) @@ -160,7 +160,7 @@ func TestAddPod(t *testing.T) { "n1": ni1, }, Jobs: map[api.JobID]*api.JobInfo{ - "j1": j1, + "c1/podgroup-j1": j1, }, }, }, @@ -196,23 +196,21 @@ func TestAddNode(t *testing.T) { pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{owner1}, make(map[string]string)) pi1 := api.NewTaskInfo(pod1) - pi1.Job = "j1" // The job name is set by cache. + pg1 := createShadowPodGroup(pod1) + pi1.Job = getJobID(pg1) // The job name is set by cache. pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), []metav1.OwnerReference{owner2}, make(map[string]string)) pi2 := api.NewTaskInfo(pod2) - pi2.Job = "j2" // The job name is set by cache. + pg2 := createShadowPodGroup(pod2) + pi2.Job = getJobID(pg2) // The job name is set by cache. ni1 := api.NewNodeInfo(node1) ni1.AddTask(pi2) - j1 := api.NewJobInfo("j1") - pg1 := createShadowPodGroup(pod1) - j1.SetPodGroup(pg1) + j1 := api.NewJobInfo("c1/podgroup-j1") - j2 := api.NewJobInfo("j2") - pg2 := createShadowPodGroup(pod2) - j2.SetPodGroup(pg2) + j2 := api.NewJobInfo("c1/podgroup-j2") j1.AddTaskInfo(pi1) j2.AddTaskInfo(pi2) @@ -230,8 +228,8 @@ func TestAddNode(t *testing.T) { "n1": ni1, }, Jobs: map[api.JobID]*api.JobInfo{ - "j1": j1, - "j2": j2, + "c1/podgroup-j1": j1, + "c1/podgroup-j2": j2, }, }, }, diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 0b7e0f98bb4..5af0f8534ef 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -49,20 +49,11 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { return nil } pb := createShadowPodGroup(pi.Pod) - pi.Job = kbapi.JobID(pb.Name) - - if _, found := sc.Jobs[pi.Job]; !found { - job := kbapi.NewJobInfo(pi.Job) - job.SetPodGroup(pb) - // Set default queue for shadow podgroup. - job.Queue = kbapi.QueueID(sc.defaultQueue) + pi.Job = getJobID(pb) + } - sc.Jobs[pi.Job] = job - } - } else { - if _, found := sc.Jobs[pi.Job]; !found { - sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job) - } + if _, found := sc.Jobs[pi.Job]; !found { + sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job) } return sc.Jobs[pi.Job] diff --git a/pkg/scheduler/cache/util.go b/pkg/scheduler/cache/util.go index 2e1340d74e2..5ac274590de 100644 --- a/pkg/scheduler/cache/util.go +++ b/pkg/scheduler/cache/util.go @@ -17,43 +17,20 @@ limitations under the License. package cache import ( - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "volcano.sh/volcano/pkg/apis/utils" + "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/scheduler/api" ) -const ( - shadowPodGroupKey = "volcano/shadow-pod-group" -) - -func shadowPodGroup(pg *api.PodGroup) bool { - if pg == nil { - return true - } - - _, found := pg.Annotations[shadowPodGroupKey] - - return found -} - func createShadowPodGroup(pod *v1.Pod) *api.PodGroup { - jobID := api.JobID(utils.GetController(pod)) - if len(jobID) == 0 { - jobID = api.JobID(pod.UID) - } + pgName := helpers.GeneratePodgroupName(pod) return &api.PodGroup{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, - Name: string(jobID), - Annotations: map[string]string{ - shadowPodGroupKey: string(jobID), - }, - }, - Spec: api.PodGroupSpec{ - MinMember: 1, + Name: pgName, }, } } diff --git a/test/e2e/predicates.go b/test/e2e/predicates.go index 64f12784de5..1a106587d89 100644 --- a/test/e2e/predicates.go +++ b/test/e2e/predicates.go @@ -84,7 +84,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "na-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: slot, min: 1, rep: rep, @@ -130,7 +130,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "pa-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: slot, min: rep, rep: rep, @@ -177,7 +177,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "pa-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: slot, min: 2, rep: 2, @@ -222,7 +222,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "tt-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: oneCPU, min: 1, rep: 1, @@ -270,7 +270,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "tt-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: oneCPU, min: 1, rep: 1, @@ -283,7 +283,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "tt-job-no-toleration", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: oneCPU, min: 1, rep: 1,