From 7dc53cd8b1ed30c06789193d6e9d37efce34784b Mon Sep 17 00:00:00 2001 From: wangyuqing4 Date: Wed, 29 May 2019 09:42:41 +0800 Subject: [PATCH 1/7] debug try to delete bug --- pkg/controllers/cache/cache.go | 12 +++++++++++- pkg/controllers/job/job_controller_handler.go | 14 ++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/cache/cache.go b/pkg/controllers/cache/cache.go index 9684330684..cb76ce88fc 100644 --- a/pkg/controllers/cache/cache.go +++ b/pkg/controllers/cache/cache.go @@ -19,8 +19,12 @@ package cache import ( "fmt" "sync" + "time" "github.com/golang/glog" + + "golang.org/x/time/rate" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" @@ -71,9 +75,15 @@ func jobKeyOfPod(pod *v1.Pod) (string, error) { //New gets the job Cache func New() Cache { + queue := workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 180*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) + return &jobCache{ jobs: map[string]*apis.JobInfo{}, - deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + deletedJobs: workqueue.NewRateLimitingQueue(queue), } } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 0f521ee9aa..a84e3b1e1c 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -153,6 +153,11 @@ func (cc *Controller) addPod(obj interface{}) { return } + if pod.DeletionTimestamp != nil { + cc.deletePod(pod) + return + } + req := apis.Request{ Namespace: pod.Namespace, JobName: jobName, @@ -209,6 +214,15 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { return } + if newPod.ResourceVersion == oldPod.ResourceVersion { + return + } + + if newPod.DeletionTimestamp != nil { + cc.deletePod(newObj) + return + } + if err := cc.cache.UpdatePod(newPod); err != nil { glog.Errorf("Failed to update Pod <%s/%s>: %v in cache", newPod.Namespace, newPod.Name, err) From f4517bff4fb1435d39228d5b579bff7fb7d59a52 Mon Sep 17 00:00:00 2001 From: wangyuqing4 Date: Mon, 3 Jun 2019 14:00:36 +0800 Subject: [PATCH 2/7] complete pods in job event --- pkg/controllers/job/job_controller_actions.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index d28c7fa135..e8ad24aba7 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -26,6 +26,7 @@ import ( "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8scontroller "k8s.io/kubernetes/pkg/controller" kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" @@ -90,6 +91,8 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM if len(errs) != 0 { glog.Errorf("failed to kill pods for job %s/%s, with err %+v", job.Namespace, job.Name, errs) + cc.recorder.Event(job, v1.EventTypeWarning, k8scontroller.FailedDeletePodReason, + fmt.Sprintf("Error deleting pods: %+v", errs)) return fmt.Errorf("failed to kill %d pods of %d", len(errs), total) } @@ -276,7 +279,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt // So gang-scheduling could schedule the Job successfully glog.Errorf("Failed to create pod %s for Job %s, err %#v", pod.Name, job.Name, err) - creationErrs = append(creationErrs, err) + creationErrs = append(creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err)) } else { if err != nil && apierrors.IsAlreadyExists(err) { cc.resyncTask(pod) @@ -292,6 +295,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt waitCreationGroup.Wait() if len(creationErrs) != 0 { + cc.recorder.Event(job, v1.EventTypeWarning, k8scontroller.FailedCreatePodReason, + fmt.Sprintf("Error creating pods: %+v", creationErrs)) return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate)) } @@ -321,6 +326,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt waitDeletionGroup.Wait() if len(deletionErrs) != 0 { + cc.recorder.Event(job, v1.EventTypeWarning, k8scontroller.FailedDeletePodReason, + fmt.Sprintf("Error deleting pods: %+v", deletionErrs)) return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete)) } @@ -490,7 +497,7 @@ func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) error { glog.Errorf("Failed to delete pod %s/%s for Job %s, err %#v", pod.Namespace, pod.Name, jobName, err) - return err + return fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err) } return nil From 42fcc3c3fa1827b9d043eda6d75d97139022951d Mon Sep 17 00:00:00 2001 From: wangyuqing4 Date: Mon, 3 Jun 2019 17:06:47 +0800 Subject: [PATCH 3/7] fix admit job minA >= 0 --- pkg/admission/admit_job.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index 6a011b72a3..b3660c72e6 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -79,9 +79,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st taskNames := map[string]string{} var totalReplicas int32 - if job.Spec.MinAvailable < 0 { + if job.Spec.MinAvailable <= 0 { reviewResponse.Allowed = false - return fmt.Sprintf("'minAvailable' cannot be less than zero.") + return fmt.Sprintf("'minAvailable' must be greater than zero.") } if job.Spec.MaxRetry < 0 { From c13ed6b7794e9dd09747abfdb2966c84e0da7501 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Fri, 28 Jun 2019 16:23:59 +0800 Subject: [PATCH 4/7] fix ut --- pkg/admission/admit_job_test.go | 2 +- pkg/controllers/job/job_controller_handler_test.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/admission/admit_job_test.go b/pkg/admission/admit_job_test.go index ee615edeee..5f019b4485 100644 --- a/pkg/admission/admit_job_test.go +++ b/pkg/admission/admit_job_test.go @@ -322,7 +322,7 @@ func TestValidateExecution(t *testing.T) { }, }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: "'minAvailable' cannot be less than zero.", + ret: "'minAvailable' must be greater than zero.", ExpectErr: true, }, // maxretry less than zero diff --git a/pkg/controllers/job/job_controller_handler_test.go b/pkg/controllers/job/job_controller_handler_test.go index 12b5802a7c..6f262b2202 100644 --- a/pkg/controllers/job/job_controller_handler_test.go +++ b/pkg/controllers/job/job_controller_handler_test.go @@ -26,6 +26,7 @@ import ( kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" kubeclientset "k8s.io/client-go/kubernetes" vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" @@ -67,10 +68,11 @@ func newController() *Controller { func buildPod(namespace, name string, p v1.PodPhase, labels map[string]string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), - Name: name, - Namespace: namespace, - Labels: labels, + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + Name: name, + Namespace: namespace, + Labels: labels, + ResourceVersion: string(uuid.NewUUID()), }, Status: v1.PodStatus{ Phase: p, From 12ad858fd6d659995495600c17830729af9a3087 Mon Sep 17 00:00:00 2001 From: wangyuqing4 Date: Mon, 10 Jun 2019 19:31:33 +0800 Subject: [PATCH 5/7] fix ut --- pkg/controllers/job/job_controller_actions.go | 14 ++++++++------ test/e2e/command.go | 9 ++++----- test/e2e/job_controlled_resource.go | 6 +----- test/e2e/job_scheduling.go | 2 +- test/e2e/util.go | 17 +++++++++++++++++ 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index e8ad24aba7..243b8a0bd8 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -157,10 +157,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update job := jobInfo.Job.DeepCopy() glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) - if job, err := cc.initJobStatus(job); err != nil { + if update, job, err := cc.initJobStatus(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.JobStatusError), fmt.Sprintf("Failed to initialize job status, err: %v", err)) return err + } else if update { + return nil } if err := cc.pluginOnJobAdd(job); err != nil { @@ -537,9 +539,9 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { return &minAvailableTasksRes } -func (cc *Controller) initJobStatus(job *vkv1.Job) (*vkv1.Job, error) { +func (cc *Controller) initJobStatus(job *vkv1.Job) (bool, *vkv1.Job, error) { if job.Status.State.Phase != "" { - return job, nil + return false, job, nil } job.Status.State.Phase = vkv1.Pending @@ -548,13 +550,13 @@ func (cc *Controller) initJobStatus(job *vkv1.Job) (*vkv1.Job, error) { if err != nil { glog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) - return nil, err + return false, nil, err } if err := cc.cache.Update(newJob); err != nil { glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", newJob.Namespace, newJob.Name, err) - return nil, err + return false, nil, err } - return newJob, nil + return true, newJob, nil } diff --git a/test/e2e/command.go b/test/e2e/command.go index 8040a9691f..cd23d4cd2a 100644 --- a/test/e2e/command.go +++ b/test/e2e/command.go @@ -19,11 +19,11 @@ package e2e import ( "bytes" "fmt" - v1 "k8s.io/api/core/v1" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -98,9 +98,8 @@ var _ = Describe("Job E2E Test: Test Job Command", func() { //Pod is gone podName := jobUtil.MakePodName(jobName, taskName, 0) - _, err = context.kubeclient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{}) - Expect(apierrors.IsNotFound(err)).To(BeTrue(), - "Job related pod should be deleted when aborting job.") + err = waitPodGone(context, podName, job.Namespace) + Expect(err).NotTo(HaveOccurred()) //Resume job ResumeJob(jobName, namespace) @@ -116,7 +115,7 @@ var _ = Describe("Job E2E Test: Test Job Command", func() { It("Suspend pending job", func() { context := initTestContext() defer cleanupTestContext(context) - rep := clusterSize(context, oneCPU) * 2 + rep := clusterSize(context, oneCPU) jobName := "test-suspend-pending-job" namespace := "test" diff --git a/test/e2e/job_controlled_resource.go b/test/e2e/job_controlled_resource.go index b1f3ffb0fe..0ac234b937 100644 --- a/test/e2e/job_controlled_resource.go +++ b/test/e2e/job_controlled_resource.go @@ -47,10 +47,6 @@ var _ = Describe("Job E2E Test: Test Job PVCs", func() { }, }, volumes: []v1alpha1.VolumeSpec{ - { - MountPath: "/mountone", - VolumeClaimName: pvcName, - }, { MountPath: "/mounttwo", }, @@ -63,7 +59,7 @@ var _ = Describe("Job E2E Test: Test Job PVCs", func() { job, err = context.vkclient.BatchV1alpha1().Jobs(namespace).Get(jobName, v1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) - Expect(len(job.Spec.Volumes)).To(Equal(2), + Expect(len(job.Spec.Volumes)).To(Equal(1), "Two volumes should be created") for _, volume := range job.Spec.Volumes { Expect(volume.VolumeClaimName).Should(Or(ContainSubstring(jobName), Equal(pvcName)), diff --git a/test/e2e/job_scheduling.go b/test/e2e/job_scheduling.go index 23057ab392..81fabdcbba 100644 --- a/test/e2e/job_scheduling.go +++ b/test/e2e/job_scheduling.go @@ -140,7 +140,7 @@ var _ = Describe("Job E2E Test", func() { job.name = "gang-fq-qj2" job2 := createJob(context, job) - err = waitJobPending(context, job2) + err = waitJobStatePending(context, job2) Expect(err).NotTo(HaveOccurred()) err = waitJobReady(context, job1) diff --git a/test/e2e/util.go b/test/e2e/util.go index 1eb1c939c0..7fc26e2a55 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1005,3 +1005,20 @@ func readyNodeAmount(ctx *context) int { } return amount } + +func waitPodGone(ctx *context, podName, namespace string) error { + var additionalError error + err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + _, err := ctx.kubeclient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{}) + expected := errors.IsNotFound(err) + if !expected { + additionalError = fmt.Errorf("Job related pod should be deleted when aborting job.") + } + + return expected, nil + }) + if err != nil && strings.Contains(err.Error(), timeOutMessage) { + return fmt.Errorf("[Wait time out]: %s", additionalError) + } + return err +} From ec01987321bd73206f8622145b87c6a34aacd853 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Tue, 11 Jun 2019 21:45:56 +0800 Subject: [PATCH 6/7] modify state jump logic. modify UT --- pkg/controllers/job/state/aborting.go | 4 +- pkg/controllers/job/state/inqueue.go | 25 ++++------ pkg/controllers/job/state/pending.go | 25 ++++------ pkg/controllers/job/state/running.go | 32 +++--------- test/e2e/job_error_handling.go | 4 +- test/e2e/mpi.go | 2 +- test/e2e/util.go | 72 +++++++++++++++++++++++++-- 7 files changed, 99 insertions(+), 65 deletions(-) diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index fbf2b5e764..8c8fdeedd0 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -30,10 +30,10 @@ type abortingState struct { func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: - // Already in Restarting phase, just sync it return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { + status.State.Phase = vkv1.Restarting status.RetryCount++ - return false + return true }) default: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { diff --git a/pkg/controllers/job/state/inqueue.go b/pkg/controllers/job/state/inqueue.go index 1cd55fcba1..b8d6d5805c 100644 --- a/pkg/controllers/job/state/inqueue.go +++ b/pkg/controllers/job/state/inqueue.go @@ -29,31 +29,24 @@ func (ps *inqueueState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Restarting - status.RetryCount++ - } - status.State.Phase = phase + status.State.Phase = vkv1.Restarting + status.RetryCount++ return true }) case vkv1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Aborting - } - status.State.Phase = phase + status.State.Phase = vkv1.Aborting return true }) case vkv1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Completed - if status.Terminating != 0 { - phase = vkv1.Completing - } - status.State.Phase = phase + status.State.Phase = vkv1.Completing + return true + }) + case vkv1.TerminateJobAction: + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { + status.State.Phase = vkv1.Terminating return true }) default: diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index 38fa2e08a4..b3c63396fd 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -29,31 +29,24 @@ func (ps *pendingState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Restarting - status.RetryCount++ - } - status.State.Phase = phase + status.RetryCount++ + status.State.Phase = vkv1.Restarting return true }) case vkv1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Aborting - } - status.State.Phase = phase + status.State.Phase = vkv1.Aborting return true }) case vkv1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Completed - if status.Terminating != 0 { - phase = vkv1.Completing - } - status.State.Phase = phase + status.State.Phase = vkv1.Completing + return true + }) + case vkv1.TerminateJobAction: + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { + status.State.Phase = vkv1.Terminating return true }) case vkv1.EnqueueAction: diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index e25b3a77af..bdcf18090f 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -29,39 +29,23 @@ func (ps *runningState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { - if status.Terminating != 0 { - status.State.Phase = vkv1.Restarting - status.RetryCount++ - return true - } - return false + status.State.Phase = vkv1.Restarting + status.RetryCount++ + return true }) case vkv1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - if status.Terminating != 0 { - status.State.Phase = vkv1.Aborting - return true - } - - return false + status.State.Phase = vkv1.Aborting + return true }) case vkv1.TerminateJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - if status.Terminating != 0 { - status.State.Phase = vkv1.Terminating - return true - } - - return false + status.State.Phase = vkv1.Terminating + return true }) case vkv1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Completed - if status.Terminating != 0 { - phase = vkv1.Completing - } - - status.State.Phase = phase + status.State.Phase = vkv1.Completing return true }) default: diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index 1a875cb851..73287a9e71 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -436,7 +436,7 @@ var _ = Describe("Job Error Handling", func() { By("create job") job := createJob(context, &jobSpec{ - name: "any-restart-job", + name: "any-complete-job", policies: []vkv1.LifecyclePolicy{ { Action: vkv1.CompleteJobAction, @@ -463,7 +463,7 @@ var _ = Describe("Job Error Handling", func() { By("job scheduled, then task 'completed_task' finished and job finally complete") // job phase: pending -> running -> completing -> completed - err := waitJobStates(context, job, []vkv1.JobPhase{ + err := waitJobPhases(context, job, []vkv1.JobPhase{ vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/mpi.go b/test/e2e/mpi.go index 0fc57b9953..0c4be548ef 100644 --- a/test/e2e/mpi.go +++ b/test/e2e/mpi.go @@ -70,7 +70,7 @@ mpiexec --allow-run-as-root --hostfile /etc/volcano/mpiworker.host -np 2 mpi_hel job := createJob(context, spec) - err := waitJobStates(context, job, []vkv1.JobPhase{ + err := waitJobPhases(context, job, []vkv1.JobPhase{ vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 7fc26e2a55..5a55cfe02d 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -462,12 +462,76 @@ func jobEvicted(ctx *context, job *vkv1.Job, time time.Time) wait.ConditionFunc } func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { - for _, phase := range phases { - err := waitJobPhase(ctx, job, phase) - if err != nil { - return err + w, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Watch(metav1.ListOptions{}) + if err != nil { + return err + } + defer w.Stop() + + var additionalError error + total := int32(0) + for _, task := range job.Spec.Tasks { + total += task.Replicas + } + + ch := w.ResultChan() + index := 0 + timeout := time.After(oneMinute) + + for index < len(phases) { + select { + case event, open := <-ch: + if !open { + return fmt.Errorf("watch channel should be always open") + } + + newJob := event.Object.(*vkv1.Job) + phase := phases[index] + if newJob.Name != job.Name || newJob.Namespace != job.Namespace { + continue + } + + if newJob.Status.State.Phase != phase { + additionalError = fmt.Errorf( + "expected job '%s' to be in status %s, actual get %s", + job.Name, phase, newJob.Status.State.Phase) + continue + } + var flag = false + switch phase { + case vkv1.Pending: + flag = (newJob.Status.Pending+newJob.Status.Succeeded+ + newJob.Status.Failed+newJob.Status.Running) == 0 || + (total-newJob.Status.Terminating >= newJob.Status.MinAvailable) + case vkv1.Terminating, vkv1.Aborting, vkv1.Restarting, vkv1.Completing: + flag = newJob.Status.Terminating > 0 + case vkv1.Terminated, vkv1.Aborted, vkv1.Completed: + flag = newJob.Status.Pending == 0 && + newJob.Status.Running == 0 && + newJob.Status.Terminating == 0 + case vkv1.Running: + flag = newJob.Status.Running >= newJob.Spec.MinAvailable + case vkv1.Inqueue: + flag = newJob.Status.Pending > 0 + default: + return fmt.Errorf("unknown phase %s", phase) + } + + if !flag { + additionalError = fmt.Errorf( + "expected job '%s' to be in status %s, actual detail status %s", + job.Name, phase, getJobStatusDetail(newJob)) + continue + } + + index++ + timeout = time.After(oneMinute) + + case <-timeout: + return fmt.Errorf("[Wait time out]: %s", additionalError) } } + return nil } From 2f9e1197d02db65f1ee3eff0c0a9b4f4a17cd5dc Mon Sep 17 00:00:00 2001 From: lminzhw Date: Wed, 3 Jul 2019 11:45:17 +0800 Subject: [PATCH 7/7] fix ut --- .../job/job_controller_actions_test.go | 5 + pkg/controllers/job/job_state_test.go | 174 ++++++++++-------- test/e2e/util.go | 1 + 3 files changed, 100 insertions(+), 80 deletions(-) diff --git a/pkg/controllers/job/job_controller_actions_test.go b/pkg/controllers/job/job_controller_actions_test.go index 34fff91d48..f18d3bd7ee 100644 --- a/pkg/controllers/job/job_controller_actions_test.go +++ b/pkg/controllers/job/job_controller_actions_test.go @@ -179,6 +179,11 @@ func TestCreateJobFunc(t *testing.T) { Name: "job1", Namespace: namespace, }, + Status: v1alpha1.JobStatus{ + State: v1alpha1.JobState{ + Phase: v1alpha1.Pending, + }, + }, }, PodGroup: &kbv1aplha1.PodGroup{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controllers/job/job_state_test.go b/pkg/controllers/job/job_state_test.go index a45c5432e0..bb47f2fb28 100644 --- a/pkg/controllers/job/job_state_test.go +++ b/pkg/controllers/job/job_state_test.go @@ -494,6 +494,33 @@ func TestInqueueState_Execute(t *testing.T) { Action: v1alpha1.AbortJobAction, ExpectedVal: nil, }, + { + Name: "InqueueState- TerminateJobAction case With terminating pod count not equal to zero", + JobInfo: &apis.JobInfo{ + Namespace: namespace, + Name: "jobinfo1", + Job: &v1alpha1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Job1", + Namespace: namespace, + }, + Status: v1alpha1.JobStatus{ + Terminating: 2, + State: v1alpha1.JobState{ + Phase: v1alpha1.Inqueue, + }, + }, + }, + Pods: map[string]map[string]*v1.Pod{ + "task1": { + "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), + "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), + }, + }, + }, + Action: v1alpha1.TerminateJobAction, + ExpectedVal: nil, + }, { Name: "InqueueState- CompleteJobAction case With terminating pod count equal to zero", JobInfo: &apis.JobInfo{ @@ -631,34 +658,24 @@ func TestInqueueState_Execute(t *testing.T) { } if testcase.Action == v1alpha1.RestartJobAction { - if jobInfo.Job.Status.Terminating == 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Pending { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Pending, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Restarting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } + // always jump to restarting firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Restarting { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.AbortJobAction { - if jobInfo.Job.Status.Terminating == 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Pending { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Pending, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Aborting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } + // always jump to aborting firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Aborting { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Aborting, jobInfo.Job.Status.State.Phase, i) + } + } else if testcase.Action == v1alpha1.TerminateJobAction { + // always jump to terminating firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Terminating { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Terminating, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.CompleteJobAction { - if jobInfo.Job.Status.Terminating == 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Completed { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Completed, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Completing { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } + // always jump to completing firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Completing { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Completing, jobInfo.Job.Status.State.Phase, i) } } else { if jobInfo.Job.Spec.MinAvailable <= jobInfo.Job.Status.Running+jobInfo.Job.Status.Succeeded+jobInfo.Job.Status.Failed { @@ -777,6 +794,33 @@ func TestPendingState_Execute(t *testing.T) { Action: v1alpha1.AbortJobAction, ExpectedVal: nil, }, + { + Name: "PendingState- TerminateJobAction case With terminating pod count not equal to zero", + JobInfo: &apis.JobInfo{ + Namespace: namespace, + Name: "jobinfo1", + Job: &v1alpha1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Job1", + Namespace: namespace, + }, + Status: v1alpha1.JobStatus{ + Terminating: 2, + State: v1alpha1.JobState{ + Phase: v1alpha1.Pending, + }, + }, + }, + Pods: map[string]map[string]*v1.Pod{ + "task1": { + "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), + "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), + }, + }, + }, + Action: v1alpha1.TerminateJobAction, + ExpectedVal: nil, + }, { Name: "PendingState- CompleteJobAction case With terminating pod count equal to zero", JobInfo: &apis.JobInfo{ @@ -944,34 +988,24 @@ func TestPendingState_Execute(t *testing.T) { } if testcase.Action == v1alpha1.RestartJobAction { - if jobInfo.Job.Status.Terminating == 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Pending { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Pending, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Restarting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } + // always jump to restarting firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Restarting { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.AbortJobAction { - if jobInfo.Job.Status.Terminating == 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Pending { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Pending, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Aborting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } + // always jump to aborting firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Aborting { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Aborting, jobInfo.Job.Status.State.Phase, i) + } + } else if testcase.Action == v1alpha1.TerminateJobAction { + // always jump to completing firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Terminating { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Terminating, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.CompleteJobAction { - if jobInfo.Job.Status.Terminating == 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Completed { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Completed, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Completing { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } + // always jump to completing firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Completing { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Completing, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.EnqueueAction { if jobInfo.Job.Spec.MinAvailable <= jobInfo.Job.Status.Running+jobInfo.Job.Status.Succeeded+jobInfo.Job.Status.Failed { @@ -1415,44 +1449,24 @@ func TestRunningState_Execute(t *testing.T) { } if testcase.Action == v1alpha1.RestartJobAction { - if testcase.JobInfo.Job.Status.Terminating != 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Restarting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Running { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Running, jobInfo.Job.Status.State.Phase, i) - } + // always jump to restarting firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Restarting { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.AbortJobAction { - if testcase.JobInfo.Job.Status.Terminating != 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Aborting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Aborting, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Running { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Running, jobInfo.Job.Status.State.Phase, i) - } + // always jump to aborting firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Aborting { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.TerminateJobAction { - if testcase.JobInfo.Job.Status.Terminating != 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Terminating { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Terminating, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Running { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Running, jobInfo.Job.Status.State.Phase, i) - } + // always jump to terminating firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Terminating { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Terminating, jobInfo.Job.Status.State.Phase, i) } } else if testcase.Action == v1alpha1.CompleteJobAction { - if testcase.JobInfo.Job.Status.Terminating != 0 { - if jobInfo.Job.Status.State.Phase != v1alpha1.Completing { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Completing, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Completed { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Completed, jobInfo.Job.Status.State.Phase, i) - } + // always jump to completing firstly + if jobInfo.Job.Status.State.Phase != v1alpha1.Completing { + t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) } } else { total := state.TotalTasks(testcase.JobInfo.Job) diff --git a/test/e2e/util.go b/test/e2e/util.go index 5a55cfe02d..3c81a27264 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -497,6 +497,7 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { job.Name, phase, newJob.Status.State.Phase) continue } + var flag = false switch phase { case vkv1.Pending: