Skip to content

Commit

Permalink
Added e2e test.
Browse files Browse the repository at this point in the history
  • Loading branch information
k82cn committed Jul 5, 2018
1 parent 8cb6036 commit 07ac230
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 34 deletions.
35 changes: 35 additions & 0 deletions test/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,39 @@ var _ = Describe("E2E Test", func() {
err = waitTasksReady(context, qj1.Name, int(rep)/2)
Expect(err).NotTo(HaveOccurred())
})

It("TaskPriority", func() {
context := initTestContext()
defer cleanupTestContext(context)

slot := oneCPU
rep := clusterSize(context, slot)

replicaset := createReplicaSet(context, "rs-1", rep/2, "nginx", slot)
err := waitReplicaSetReady(context, replicaset.Name)
Expect(err).NotTo(HaveOccurred())

ts := []taskSpec{
{
name: "worker",
img: "nginx",
pri: workerPriority,
rep: rep,
req: slot,
},
{
name: "master",
img: "nginx",
pri: masterPriority,
rep: 1,
req: slot,
},
}

qj := createQueueJobEx(context, "multi-pod-qj", rep/2, ts)

expectedTasks := map[string]int32{"master": 1, "worker": rep/2 - 1}
err = waitTasksReadyEx(context, qj.Name, expectedTasks)
Expect(err).NotTo(HaveOccurred())
})
})
223 changes: 189 additions & 34 deletions test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/api/core/v1"
appv1 "k8s.io/api/extensions/v1beta1"
schedv1 "k8s.io/api/scheduling/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -44,6 +45,11 @@ var oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")}
var twoCPU = v1.ResourceList{"cpu": resource.MustParse("2000m")}
var threeCPU = v1.ResourceList{"cpu": resource.MustParse("3000m")}

const (
workerPriority = "worker-pri"
masterPriority = "master-pri"
)

func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
Expand Down Expand Up @@ -78,7 +84,24 @@ func initTestContext() *context {
Namespace: cxt.namespace,
},
})
Expect(err).NotTo(HaveOccurred())

_, err = cxt.kubeclient.SchedulingV1alpha1().PriorityClasses().Create(&schedv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: masterPriority,
},
Value: 100,
GlobalDefault: false,
})
Expect(err).NotTo(HaveOccurred())

_, err = cxt.kubeclient.SchedulingV1alpha1().PriorityClasses().Create(&schedv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: workerPriority,
},
Value: 1,
GlobalDefault: false,
})
Expect(err).NotTo(HaveOccurred())

return cxt
Expand All @@ -98,43 +121,123 @@ func cleanupTestContext(cxt *context) {
err := cxt.kubeclient.CoreV1().Namespaces().Delete(cxt.namespace, &metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

foreground := metav1.DeletePropagationForeground
err = cxt.kubeclient.SchedulingV1alpha1().PriorityClasses().Delete(masterPriority, &metav1.DeleteOptions{
PropagationPolicy: &foreground,
})
Expect(err).NotTo(HaveOccurred())

err = cxt.kubeclient.SchedulingV1alpha1().PriorityClasses().Delete(workerPriority, &metav1.DeleteOptions{
PropagationPolicy: &foreground,
})
Expect(err).NotTo(HaveOccurred())

// Wait for namespace deleted.
err = wait.Poll(100*time.Millisecond, oneMinute, namespaceNotExist(cxt))
Expect(err).NotTo(HaveOccurred())
}

func createQueueJob(context *context, name string, min, rep int32, img string, req v1.ResourceList) *arbv1.QueueJob {
queueJobName := "queuejob.k8s.io"
type taskSpec struct {
name string
pri string
rep int32
img string
req v1.ResourceList
}

queueJob := &arbv1.QueueJob{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: context.namespace,
},
Spec: arbv1.QueueJobSpec{
func createQueueJobEx(context *context, name string, min int32, tss []taskSpec) *arbv1.QueueJob {
taskName := "task.queuejob.k8s.io"

var taskSpecs []arbv1.TaskSpec

for _, ts := range tss {
taskSpecs = append(taskSpecs, arbv1.TaskSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
queueJobName: name,
taskName: ts.name,
},
},
Replicas: rep,
SchedSpec: arbv1.SchedulingSpecTemplate{
MinAvailable: int(min),
},
Replicas: ts.rep,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{queueJobName: name},
Name: ts.name,
Labels: map[string]string{taskName: ts.name},
},
Spec: v1.PodSpec{
SchedulerName: "kar-scheduler",
RestartPolicy: v1.RestartPolicyNever,
SchedulerName: "kar-scheduler",
PriorityClassName: ts.pri,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Image: img,
Name: name,
Image: ts.img,
Name: ts.name,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
Requests: ts.req,
},
},
},
},
},
})
}

queueJob := &arbv1.QueueJob{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: context.namespace,
},
Spec: arbv1.QueueJobSpec{
SchedSpec: arbv1.SchedulingSpecTemplate{
MinAvailable: int(min),
},
TaskSpecs: taskSpecs,
},
}

queueJob, err := context.karclient.ArbV1().QueueJobs(context.namespace).Create(queueJob)
Expect(err).NotTo(HaveOccurred())

return queueJob
}

func createQueueJob(context *context, name string, min, rep int32, img string, req v1.ResourceList) *arbv1.QueueJob {
queueJobName := "queuejob.k8s.io"

queueJob := &arbv1.QueueJob{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: context.namespace,
},
Spec: arbv1.QueueJobSpec{
SchedSpec: arbv1.SchedulingSpecTemplate{
MinAvailable: int(min),
},
TaskSpecs: []arbv1.TaskSpec{
{

Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
queueJobName: name,
},
},
Replicas: rep,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{queueJobName: name},
},
Spec: v1.PodSpec{
SchedulerName: "kar-scheduler",
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Image: img,
Name: name,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
},
},
},
},
},
Expand Down Expand Up @@ -205,15 +308,17 @@ func taskReady(ctx *context, jobName string, taskNum int) wait.ConditionFunc {
pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespace).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels)

readyTaskNum := 0
for _, pod := range pods.Items {
if !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodSucceeded {
readyTaskNum++
for _, ts := range queueJob.Spec.TaskSpecs {
labelSelector := labels.SelectorFromSet(ts.Selector.MatchLabels)
if !labelSelector.Matches(labels.Set(pod.Labels)) ||
!metav1.IsControlledBy(&pod, queueJob) {
continue
}
if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodSucceeded {
readyTaskNum++
}
}
}

Expand All @@ -225,6 +330,45 @@ func taskReady(ctx *context, jobName string, taskNum int) wait.ConditionFunc {
}
}

func taskReadyEx(ctx *context, jobName string, tss map[string]int32) wait.ConditionFunc {
return func() (bool, error) {
queueJob, err := ctx.karclient.ArbV1().QueueJobs(ctx.namespace).Get(jobName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespace).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

var taskSpecs []arbv1.TaskSpec
for _, ts := range queueJob.Spec.TaskSpecs {
if _, found := tss[ts.Template.Name]; found {
taskSpecs = append(taskSpecs, ts)
}
}

readyTaskNum := map[string]int32{}
for _, pod := range pods.Items {
for _, ts := range taskSpecs {
labelSelector := labels.SelectorFromSet(ts.Selector.MatchLabels)
if !labelSelector.Matches(labels.Set(pod.Labels)) ||
!metav1.IsControlledBy(&pod, queueJob) {
continue
}

if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodSucceeded {
readyTaskNum[ts.Template.Name]++
}
}
}

for name, expected := range tss {
if readyTaskNum[name] < expected {
return false, nil
}
}
return true, nil
}
}

func waitJobReady(ctx *context, name string) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskReady(ctx, name, -1))
}
Expand All @@ -233,6 +377,10 @@ func waitTasksReady(ctx *context, name string, taskNum int) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskReady(ctx, name, taskNum))
}

func waitTasksReadyEx(ctx *context, name string, ts map[string]int32) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskReadyEx(ctx, name, ts))
}

func jobNotReady(ctx *context, jobName string) wait.ConditionFunc {
return func() (bool, error) {
queueJob, err := ctx.karclient.ArbV1().QueueJobs(ctx.namespace).Get(jobName, metav1.GetOptions{})
Expand All @@ -241,19 +389,26 @@ func jobNotReady(ctx *context, jobName string) wait.ConditionFunc {
pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespace).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels)

pendingTaskNum := int32(0)
for _, pod := range pods.Items {
if !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
if pod.Status.Phase == v1.PodPending && len(pod.Spec.NodeName) == 0 {
pendingTaskNum++
for _, ts := range queueJob.Spec.TaskSpecs {
labelSelector := labels.SelectorFromSet(ts.Selector.MatchLabels)
if !labelSelector.Matches(labels.Set(pod.Labels)) ||
!metav1.IsControlledBy(&pod, queueJob) {
continue
}
if pod.Status.Phase == v1.PodPending && len(pod.Spec.NodeName) == 0 {
pendingTaskNum++
}
}
}

return pendingTaskNum == queueJob.Spec.Replicas, nil
replicas := int32(0)
for _, ts := range queueJob.Spec.TaskSpecs {
replicas += ts.Replicas
}

return pendingTaskNum == replicas, nil
}
}

Expand Down

0 comments on commit 07ac230

Please sign in to comment.