Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add podGroup completed phase #2667

Merged
merged 2 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ rules:
resources: ["networkpolicies"]
verbs: ["get", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets", "replicasets", "statefulsets"]
resources: ["daemonsets", "statefulsets"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["get", "list", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get"]
Expand Down
5 changes: 4 additions & 1 deletion installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8564,8 +8564,11 @@ rules:
resources: ["networkpolicies"]
verbs: ["get", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets", "replicasets", "statefulsets"]
resources: ["daemonsets", "statefulsets"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["get", "list", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get"]
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package podgroup
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
appinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -47,6 +48,7 @@ type pgcontroller struct {

podInformer coreinformers.PodInformer
pgInformer schedulinginformer.PodGroupInformer
rsInformer appinformers.ReplicaSetInformer

informerFactory informers.SharedInformerFactory
vcInformerFactory vcinformer.SharedInformerFactory
Expand All @@ -59,6 +61,9 @@ type pgcontroller struct {
pgLister schedulinglister.PodGroupLister
pgSynced func() bool

// A store of replicaset
rsSynced func() bool

queue workqueue.RateLimitingInterface

schedulerNames []string
Expand Down Expand Up @@ -98,6 +103,13 @@ func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error {
pg.pgLister = pg.pgInformer.Lister()
pg.pgSynced = pg.pgInformer.Informer().HasSynced

pg.rsInformer = pg.informerFactory.Apps().V1().ReplicaSets()
pg.rsSynced = pg.rsInformer.Informer().HasSynced
pg.rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pg.addReplicaSet,
UpdateFunc: pg.updateReplicaSet,
})

return nil
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"encoding/json"
"strings"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
batchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"

"volcano.sh/apis/pkg/apis/helpers"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
Expand Down Expand Up @@ -61,6 +63,26 @@ func (pg *pgcontroller) addPod(obj interface{}) {
pg.queue.Add(req)
}

func (pg *pgcontroller) addReplicaSet(obj interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waiterQ As far as I know, when replicaset was created, it would always be 0 replica. And after creating, the replicaset would scale up to defined replica numbers. So why deleting podgroup on both addReplicaSet and updateReplicaSet, but not only updateReplicaSet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right. In normal process with one version, volcano just need updateReplicaSet, and if consider the situation upgrade from a version to another, there isn't addReplicaSet help to cleanup stock podgroups in cluster. addReplicaSet is work with already-exist podgroups, addReplicaSet work with upcoming podgroups.

rs, ok := obj.(*appsv1.ReplicaSet)
if !ok {
klog.Errorf("Failed to convert %v to appsv1.ReplicaSet", obj)
return
}

if *rs.Spec.Replicas == 0 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a probability that there will be two replicasets with none zero replicas when doing roll upgrade which means two pg exists, does this matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is about deployment's rollingUpdate strategy, in pod rolling creating, its definitely 2 kind pods exists. I think it's normal, not a problem.

pgName := batchv1alpha1.PodgroupNamePrefix + string(rs.UID)
err := pg.vcClient.SchedulingV1beta1().PodGroups(rs.Namespace).Delete(context.TODO(), pgName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete PodGroup <%s/%s>: %v", rs.Namespace, pgName, err)
}
}
}

func (pg *pgcontroller) updateReplicaSet(oldObj, newObj interface{}) {
pg.addReplicaSet(newObj)
}

func (pg *pgcontroller) updatePodAnnotations(pod *v1.Pod, pgName string) error {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus {
// If there're enough allocated resource, it's running
if int32(allocated) >= jobInfo.PodGroup.Spec.MinMember {
status.Phase = scheduling.PodGroupRunning
// If all allocated tasks is succeeded, it's completed
if len(jobInfo.TaskStatusIndex[api.Succeeded]) == allocated {
Copy link

@zhoushuke zhoushuke Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for batchv1 native job, if using .spec.completions and .spec.parallelism in job, for case, successed 10, in the same time, the queue is full, other 10 pod will pending, len(jobInfo.TaskStatusIndex[api.Succeeded]) == allocated will be true, job not finished but pg status is completed, would it happen?

status.Phase = scheduling.PodGroupCompleted
}
} else if jobInfo.PodGroup.Status.Phase != scheduling.PodGroupInqueue {
status.Phase = scheduling.PodGroupPending
}
Expand Down
64 changes: 61 additions & 3 deletions test/e2e/schedulingbase/job_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (

batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"

vcscheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"

e2eutil "volcano.sh/volcano/test/e2e/util"
)

Expand Down Expand Up @@ -329,7 +329,7 @@ var _ = Describe("Job E2E Test", func() {
Tasks: []e2eutil.TaskSpec{
{
Img: e2eutil.DefaultNginxImage,
Req: slot,
Req: e2eutil.HalfCPU,
Min: count,
Rep: count,
},
Expand Down Expand Up @@ -650,4 +650,62 @@ var _ = Describe("Job E2E Test", func() {
Expect(q2ScheduledPod).Should(BeNumerically("<=", expectPod/2+1),
fmt.Sprintf("expectPod %d, q1ScheduledPod %d, q2ScheduledPod %d", expectPod, q1ScheduledPod, q2ScheduledPod))
})

It("changeable Deployment's PodGroup", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please give a formal and complete description for func

ctx := e2eutil.InitTestContext(e2eutil.Options{})
defer e2eutil.CleanupTestContext(ctx)
rep := e2eutil.ClusterSize(ctx, e2eutil.OneCPU)/2 + 1

d := e2eutil.CreateDeployment(ctx, "d-1", rep, e2eutil.DefaultNginxImage, e2eutil.OneCPU)
err := e2eutil.WaitDeploymentReady(ctx, d.Name)
Expect(err).NotTo(HaveOccurred())

pgs, err := ctx.Vcclient.SchedulingV1beta1().PodGroups(ctx.Namespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to list podGroups in namespace %s", ctx.Namespace)
Expect(len(pgs.Items)).To(Equal(1), "this test need a clean cluster")
oldOne := &pgs.Items[0]

d.ResourceVersion = ""
d.Spec.Template.Spec.Containers[0].Resources.Requests = e2eutil.HalfCPU
d, err = ctx.Kubeclient.AppsV1().Deployments(ctx.Namespace).Update(context.TODO(), d, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to update deployment(%s) in namespace %s", d.Name, ctx.Namespace)
err = e2eutil.WaitDeploymentReady(ctx, d.Name)
Expect(err).NotTo(HaveOccurred())

wait.Poll(time.Second, time.Minute, func() (bool, error) {
oldOne, err = ctx.Vcclient.SchedulingV1beta1().PodGroups(ctx.Namespace).Get(context.TODO(), oldOne.Name, metav1.GetOptions{})
if err != nil {
return true, nil
}
return false, nil
})
Expect(errors.IsNotFound(err)).To(BeTrue(), "old pg(%s) should not found", oldOne.Name)

pgs, err = ctx.Vcclient.SchedulingV1beta1().PodGroups(ctx.Namespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to list podGroups in namespace %s", ctx.Namespace)
Expect(len(pgs.Items)).To(Equal(1), "only one podGroup should be exists")
})

It("k8s Job", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a formal and complete description for func

ctx := e2eutil.InitTestContext(e2eutil.Options{})
defer e2eutil.CleanupTestContext(ctx)

jb := e2eutil.CreateSampleK8sJob(ctx, "job1", e2eutil.DefaultNginxImage, e2eutil.OneCPU)
err := e2eutil.Waitk8sJobCompleted(ctx, jb.Name)
Expect(err).NotTo(HaveOccurred())

var pgPhase vcscheduling.PodGroupPhase
wait.Poll(time.Second, time.Second*30, func() (bool, error) {
pgs, err := ctx.Vcclient.SchedulingV1beta1().PodGroups(ctx.Namespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to list podGroups in namespace %s", ctx.Namespace)
Expect(len(pgs.Items)).To(Equal(1), "this test need a clean cluster")
pgPhase = pgs.Items[0].Status.Phase
if pgPhase != vcscheduling.PodGroupRunning {
return true, nil
}
return false, nil
})
Expect(pgPhase).To(Equal(vcscheduling.PodGroupCompleted), "podGroup Phase is %s, should be %s",
ctx.Namespace, vcscheduling.PodGroupCompleted)
})
})
189 changes: 189 additions & 0 deletions test/e2e/util/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a util package, why name the file name as deployment.go

Copyright 2021 The Volcano Authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Copyright is not correct.


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"context"
"time"

. "github.com/onsi/gomega"
appv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
)

// CreateDeployment creates a new deployment
func CreateDeployment(ctx *TestContext, name string, rep int32, img string, req v1.ResourceList) *appv1.Deployment {
deploymentName := "deployment.k8s.io"
d := &appv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ctx.Namespace,
},
Spec: appv1.DeploymentSpec{
Replicas: &rep,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
deploymentName: name,
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{deploymentName: name},
},
Spec: v1.PodSpec{
SchedulerName: "volcano",
RestartPolicy: v1.RestartPolicyAlways,
Containers: []v1.Container{
{
Image: img,
Name: name,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
},
},
},
},
},
},
}

deployment, err := ctx.Kubeclient.AppsV1().Deployments(ctx.Namespace).Create(context.TODO(), d, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to create deployment %s", name)

return deployment
}

func deploymentReady(ctx *TestContext, name string) wait.ConditionFunc {
return func() (bool, error) {
deployment, err := ctx.Kubeclient.AppsV1().Deployments(ctx.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to get deployment %s in namespace %s", name, ctx.Namespace)

pods, err := ctx.Kubeclient.CoreV1().Pods(ctx.Namespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to list pods in namespace %s", ctx.Namespace)

labelSelector := labels.SelectorFromSet(deployment.Spec.Selector.MatchLabels)

readyTaskNum := 0
for _, pod := range pods.Items {
if !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
if pod.DeletionTimestamp != nil {
return false, nil
}
if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodSucceeded {
readyTaskNum++
}
}

return *(deployment.Spec.Replicas) == int32(readyTaskNum), nil
}
}

func WaitDeploymentReady(ctx *TestContext, name string) error {
return wait.Poll(100*time.Millisecond, FiveMinute, deploymentReady(ctx, name))
}

func DeleteDeployment(ctx *TestContext, name string) error {
foreground := metav1.DeletePropagationForeground
return ctx.Kubeclient.AppsV1().Deployments(ctx.Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{
PropagationPolicy: &foreground,
})
}

// CreateSampleK8sJob creates a new k8s job
func CreateSampleK8sJob(ctx *TestContext, name string, img string, req v1.ResourceList) *batchv1.Job {
k8sjobname := "job.k8s.io"
defaultTrue := true
j := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
k8sjobname: name,
},
},
ManualSelector: &defaultTrue,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{k8sjobname: name},
},
Spec: v1.PodSpec{
SchedulerName: "volcano",
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
{
Image: img,
Name: name,
Command: []string{"/bin/sh", "-c", "sleep 10"},
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
},
},
},
},
},
},
}

jb, err := ctx.Kubeclient.BatchV1().Jobs(ctx.Namespace).Create(context.TODO(), j, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to create k8sjob %s", name)

return jb
}

func k8sjobCompleted(ctx *TestContext, name string) wait.ConditionFunc {
return func() (bool, error) {
jb, err := ctx.Kubeclient.BatchV1().Jobs(ctx.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to get k8sjob %s in namespace %s", name, ctx.Namespace)

pods, err := ctx.Kubeclient.CoreV1().Pods(ctx.Namespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to list pods in namespace %s", ctx.Namespace)

labelSelector := labels.SelectorFromSet(jb.Spec.Selector.MatchLabels)

for _, pod := range pods.Items {
if !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
if pod.Status.Phase == v1.PodSucceeded {
return true, nil
}
}

return false, nil
}
}

func Waitk8sJobCompleted(ctx *TestContext, name string) error {
return wait.Poll(100*time.Millisecond, FiveMinute, k8sjobCompleted(ctx, name))
}

func DeleteK8sJob(ctx *TestContext, name string) error {
foreground := metav1.DeletePropagationForeground
return ctx.Kubeclient.BatchV1().Jobs(ctx.Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{
PropagationPolicy: &foreground,
})
}