Skip to content

Commit

Permalink
[YUNIKORN-2067] Test_With_Spark_Jobs e2e test wait for app state Runn…
Browse files Browse the repository at this point in the history
…ing after Spark job completed (#696)

Closes: #696

Signed-off-by: Craig Condit <ccondit@apache.org>
  • Loading branch information
chenyulin0719 authored and craigcondit committed Oct 26, 2023
1 parent 53f6f3e commit 44d8c38
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
19 changes: 19 additions & 0 deletions test/e2e/framework/helpers/k8s/k8s_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,25 @@ func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector string,
return nil
}

// Wait for all pods in 'namespace' with given 'selector' to enter succeeded state.
// Returns an error if no pods are found or not all discovered pods enter succeeded state.
func (k *KubeCtl) WaitForPodBySelectorSucceeded(namespace string, selector string, timeout time.Duration) error {
podList, err := k.ListPods(namespace, selector)
if err != nil {
return err
}
if len(podList.Items) == 0 {
return fmt.Errorf("no pods in %s with selector %s", namespace, selector)
}

for _, pod := range podList.Items {
if err := k.WaitForPodSucceeded(namespace, pod.Name, timeout); err != nil {
return err
}
}
return nil
}

// Wait up to timeout seconds for a pod in 'namespace' with given 'selector' to exist
func (k *KubeCtl) WaitForPodBySelector(namespace string, selector string, timeout time.Duration) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodSelectorInNs(selector, namespace).WithContext())
Expand Down
8 changes: 6 additions & 2 deletions test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,12 @@ var _ = Describe("", func() {

// Verify that all the spark jobs are scheduled and are in running state.
for _, id := range appIds {
By(fmt.Sprintf("Verify if app: %s is in running state", id))
err = restClient.WaitForAppStateTransition("default", "root."+sparkNS, id, yunikorn.States().Application.Running, 360)
By(fmt.Sprintf("Verify driver pod for application %s has been created.", id))
err = kClient.WaitForPodBySelector(sparkNS, fmt.Sprintf("spark-app-selector=%s, spark-role=driver", id), 180*time.Second)
Ω(err).ShouldNot(HaveOccurred())

By(fmt.Sprintf("Verify driver pod for application %s was completed.", id))
err = kClient.WaitForPodBySelectorSucceeded(sparkNS, fmt.Sprintf("spark-app-selector=%s, spark-role=driver", id), 360*time.Second)
Ω(err).NotTo(HaveOccurred())
}
})
Expand Down

0 comments on commit 44d8c38

Please sign in to comment.