Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove shadowPodgroup in scheduler #406

Merged
merged 1 commit into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/scheduler/api/pod_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ const (

//PodGroupVersionV1Alpha2 represents PodGroupVersion of V1Alpha2
PodGroupVersionV1Alpha2 string = "v1alpha2"
// PodPending means the pod group has been accepted by the system, but scheduler can not allocate
// PodGroupPending means the pod group has been accepted by the system, but scheduler can not allocate
// enough resources to it.
PodGroupPending PodGroupPhase = "Pending"

// PodRunning means `spec.minMember` pods of PodGroups has been in running phase.
// PodGroupRunning means `spec.minMember` pods of PodGroups has been in running phase.
PodGroupRunning PodGroupPhase = "Running"

// PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not
Expand Down
75 changes: 30 additions & 45 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/volumebinder"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha1"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -507,24 +506,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
}
}()

if !shadowPodGroup(job.PodGroup) {
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
pg, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason)
} else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
pg, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason)
} else {
return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version)
}
if err := sc.convertPodGroupInfo(job); err != nil {
glog.Errorf("Error While converting api.PodGroup %v", err)
return err
}

return nil
Expand Down Expand Up @@ -778,32 +762,15 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
baseErrorMessage = kbapi.AllNodeUnavailableMsg
}

if !shadowPodGroup(job.PodGroup) {
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == api.PodGroupUnknown ||
job.PodGroup.Status.Phase == api.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == api.PodGroupUnknown ||
job.PodGroup.Status.Phase == api.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
podGroup, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err)
}
sc.Recorder.Eventf(podGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}

if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
podGroup, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err)
}
sc.Recorder.Eventf(podGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}
// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
if err := sc.convertPodGroupInfo(job); err != nil {
glog.Errorf("Error While converting api.PodGroup %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should record event here :)

}
}

Expand All @@ -825,7 +792,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {

// UpdateJobStatus update the status of job and its tasks.
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) {
if updatePG && !shadowPodGroup(job.PodGroup) {
if updatePG {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
return nil, err
Expand All @@ -837,3 +804,21 @@ func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*k

return job, nil
}

func (sc *SchedulerCache) convertPodGroupInfo(job *kbapi.JobInfo) error {
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
_, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
return fmt.Errorf("to v1alpha.PodGroup with error: %v", err)
}
} else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
_, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
return fmt.Errorf("to v2alpha.PodGroup with error: %v", err)
}
} else {
return fmt.Errorf("invalid PodGroup Version: %s", job.PodGroup.Version)
}

return nil
}
135 changes: 1 addition & 134 deletions pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,139 +125,6 @@ func buildOwnerReference(owner string) metav1.OwnerReference {
}
}

func TestAddPod(t *testing.T) {

owner := buildOwnerReference("j1")

// case 1:
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner}, make(map[string]string))
pi1 := api.NewTaskInfo(pod1)
pi1.Job = "j1" // The job name is set by cache.
pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner}, make(map[string]string))
pi2 := api.NewTaskInfo(pod2)
pi2.Job = "j1" // The job name is set by cache.

j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2)
pg := createShadowPodGroup(pod1)
j1.SetPodGroup(pg)

node1 := buildNode("n1", buildResourceList("2000m", "10G"))
ni1 := api.NewNodeInfo(node1)
ni1.AddTask(pi2)

tests := []struct {
pods []*v1.Pod
nodes []*v1.Node
expected *SchedulerCache
}{
{
pods: []*v1.Pod{pod1, pod2},
nodes: []*v1.Node{node1},
expected: &SchedulerCache{
Nodes: map[string]*api.NodeInfo{
"n1": ni1,
},
Jobs: map[api.JobID]*api.JobInfo{
"j1": j1,
},
},
},
}

for i, test := range tests {
cache := &SchedulerCache{
Jobs: make(map[api.JobID]*api.JobInfo),
Nodes: make(map[string]*api.NodeInfo),
}

for _, n := range test.nodes {
cache.AddNode(n)
}

for _, p := range test.pods {
cache.AddPod(p)
}

if !cacheEqual(cache, test.expected) {
t.Errorf("case %d: \n expected %v, \n got %v \n",
i, test.expected, cache)
}
}
}

func TestAddNode(t *testing.T) {
owner1 := buildOwnerReference("j1")
owner2 := buildOwnerReference("j2")

// case 1
node1 := buildNode("n1", buildResourceList("2000m", "10G"))
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner1}, make(map[string]string))
pi1 := api.NewTaskInfo(pod1)
pi1.Job = "j1" // The job name is set by cache.

pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner2}, make(map[string]string))
pi2 := api.NewTaskInfo(pod2)
pi2.Job = "j2" // The job name is set by cache.

ni1 := api.NewNodeInfo(node1)
ni1.AddTask(pi2)

j1 := api.NewJobInfo("j1")
pg1 := createShadowPodGroup(pod1)
j1.SetPodGroup(pg1)

j2 := api.NewJobInfo("j2")
pg2 := createShadowPodGroup(pod2)
j2.SetPodGroup(pg2)

j1.AddTaskInfo(pi1)
j2.AddTaskInfo(pi2)

tests := []struct {
pods []*v1.Pod
nodes []*v1.Node
expected *SchedulerCache
}{
{
pods: []*v1.Pod{pod1, pod2},
nodes: []*v1.Node{node1},
expected: &SchedulerCache{
Nodes: map[string]*api.NodeInfo{
"n1": ni1,
},
Jobs: map[api.JobID]*api.JobInfo{
"j1": j1,
"j2": j2,
},
},
},
}

for i, test := range tests {
cache := &SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
}

for _, p := range test.pods {
cache.AddPod(p)
}

for _, n := range test.nodes {
cache.AddNode(n)
}

if !cacheEqual(cache, test.expected) {
t.Errorf("case %d: \n expected %v, \n got %v \n",
i, test.expected, cache)
}
}
}

func TestGetOrCreateJob(t *testing.T) {
owner1 := buildOwnerReference("j1")
owner2 := buildOwnerReference("j2")
Expand Down Expand Up @@ -292,7 +159,7 @@ func TestGetOrCreateJob(t *testing.T) {
},
{
task: pi2,
gotJob: true,
gotJob: false,
},
{
task: pi3,
Expand Down
19 changes: 4 additions & 15 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,12 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo {
if pi.Pod.Spec.SchedulerName != sc.schedulerName {
glog.V(4).Infof("Pod %s/%s will not not scheduled by %s, skip creating PodGroup and Job for it",
pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName)
return nil
}
pb := createShadowPodGroup(pi.Pod)
pi.Job = kbapi.JobID(pb.Name)

if _, found := sc.Jobs[pi.Job]; !found {
job := kbapi.NewJobInfo(pi.Job)
job.SetPodGroup(pb)
// Set default queue for shadow podgroup.
job.Queue = kbapi.QueueID(sc.defaultQueue)
return nil
}

sc.Jobs[pi.Job] = job
}
} else {
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job)
}
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job)
}

return sc.Jobs[pi.Job]
Expand Down
40 changes: 1 addition & 39 deletions pkg/scheduler/cache/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,9 @@ limitations under the License.
package cache

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/volcano/pkg/apis/utils"
"volcano.sh/volcano/pkg/scheduler/api"
)

const (
shadowPodGroupKey = "volcano/shadow-pod-group"
"k8s.io/api/core/v1"
)

func shadowPodGroup(pg *api.PodGroup) bool {
if pg == nil {
return true
}

_, found := pg.Annotations[shadowPodGroupKey]

return found
}

func createShadowPodGroup(pod *v1.Pod) *api.PodGroup {
jobID := api.JobID(utils.GetController(pod))
if len(jobID) == 0 {
jobID = api.JobID(pod.UID)
}

return &api.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: string(jobID),
Annotations: map[string]string{
shadowPodGroupKey: string(jobID),
},
},
Spec: api.PodGroupSpec{
MinMember: 1,
},
}
}

// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
Expand Down
Loading