Skip to content

Commit

Permalink
Merge pull request #122 from danielvegamyhre/automated-cherry-pick-of…
Browse files Browse the repository at this point in the history
…-#119-upstream-release-0.1

Automated cherry pick of #119: fix default success policy
  • Loading branch information
k8s-ci-robot authored May 7, 2023
2 parents 033ff26 + c176aa7 commit 64db1a6
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 43 deletions.
11 changes: 10 additions & 1 deletion pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

// If all jobs have succeeded, JobSet has succeeded.
if len(ownedJobs.successful) == len(js.Spec.ReplicatedJobs) {
// TODO(#120): ensure that the successful jobs counted are actually the ones that we expect JobSet to have
if len(ownedJobs.successful) == totalJobs(&js) {
if err := r.ensureCondition(ctx, &js, corev1.EventTypeNormal, metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Expand Down Expand Up @@ -590,6 +591,14 @@ func dnsHostnamesEnabled(rjob *jobset.ReplicatedJob) bool {
return rjob.Network.EnableDNSHostnames != nil && *rjob.Network.EnableDNSHostnames
}

func totalJobs(js *jobset.JobSet) int {
totalJobs := 0
for _, rjob := range js.Spec.ReplicatedJobs {
totalJobs += rjob.Replicas
}
return totalJobs
}

func concat[T any](slices ...[]T) []T {
var result []T
for _, slice := range slices {
Expand Down
11 changes: 8 additions & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ var _ = ginkgo.Describe("JobSet", func() {
gomega.Eventually(k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &jobset.JobSet{}), timeout, interval).Should(gomega.Succeed())

ginkgo.By("checking all jobs were created successfully")
gomega.Eventually(util.CheckNumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))
gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))

// Check jobset status if specified.
ginkgo.By("checking jobset condition")
gomega.Eventually(util.CheckJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, jobset.JobSetCompleted).Should(gomega.Equal(true))
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})
}) // end of Describe
Expand All @@ -96,6 +96,10 @@ func pingTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
podHostnames = append(podHostnames, fmt.Sprintf("%s-%s-%d-0.%s-%s", jsName, rjobName, jobIdx, jsName, rjobName))
}

// This bash script loops infinitely until it successfully pings all pods by hostname.
// Once successful, it sleeps for 5 seconds to reduce flakiness, since occasionally
// all pods but one will successfully ping eachother and complete before the last one
// successfully pings them all, resulting in a failed test run.
cmd := fmt.Sprintf(`for pod in {"%s","%s","%s","%s"}
do
gotStatus="-1"
Expand All @@ -110,7 +114,8 @@ do
fi
done
echo "Successfully pinged pod: $pod"
done`, podHostnames[0], podHostnames[1], podHostnames[2], podHostnames[3])
done
sleep 5`, podHostnames[0], podHostnames[1], podHostnames[2], podHostnames[3])

return testing.MakeJobSet(jsName, ns.Name).
ReplicatedJob(testing.MakeReplicatedJob(rjobName).
Expand Down
66 changes: 40 additions & 26 deletions test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
// Create test namespace before each test.
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-ns-",
GenerateName: "jobset-ns-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())
Expand All @@ -199,11 +199,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
// update contains the mutations to perform on the jobs/jobset and the
// checks to perform afterwards.
type update struct {
jobSetUpdateFn func(*jobset.JobSet)
jobUpdateFn func(*batchv1.JobList)
checkJobSetState func(*jobset.JobSet)
checkJobState func(*jobset.JobSet)
expectedJobSetCondition jobset.JobSetConditionType
jobSetUpdateFn func(*jobset.JobSet)
jobUpdateFn func(*batchv1.JobList)
checkJobSetState func(*jobset.JobSet)
checkJobState func(*jobset.JobSet)
checkJobSetCondition func(context.Context, client.Client, *jobset.JobSet, time.Duration)
}

type testCase struct {
Expand All @@ -228,7 +228,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
gomega.Eventually(k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &jobset.JobSet{}), timeout, interval).Should(gomega.Succeed())

ginkgo.By("checking all jobs were created successfully")
gomega.Eventually(util.CheckNumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))
gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))

// Perform a series of updates to jobset resources and check resulting jobset state after each update.
for _, up := range tc.updates {
Expand All @@ -252,18 +252,31 @@ var _ = ginkgo.Describe("JobSet controller", func() {
}

// Check jobset status if specified.
if up.expectedJobSetCondition != "" {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", up.expectedJobSetCondition))
gomega.Eventually(util.CheckJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, up.expectedJobSetCondition).Should(gomega.Equal(true))
if up.checkJobSetCondition != nil {
up.checkJobSetCondition(ctx, k8sClient, &jobSet, timeout)
}
}
},
ginkgo.Entry("jobset should succeed after all jobs succeed", &testCase{
makeJobSet: testJobSet,
updates: []*update{
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
ginkgo.Entry("jobset should not succeed if any job is not completed", &testCase{
makeJobSet: testJobSet,
updates: []*update{
{
jobUpdateFn: func(jobList *batchv1.JobList) {
ginkgo.By("completing all but 1 job")
for i := 0; i < len(jobList.Items)-1; i++ {
completeJob(&jobList.Items[i])
}
},
checkJobSetCondition: util.JobSetActive,
},
},
}),
Expand All @@ -274,7 +287,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobUpdateFn: func(jobList *batchv1.JobList) {
failJob(&jobList.Items[0])
},
expectedJobSetCondition: jobset.JobSetFailed,
checkJobSetCondition: util.JobSetFailed,
},
},
}),
Expand All @@ -285,8 +298,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
checkJobSetState: checkExpectedServices,
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -297,8 +310,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
checkJobSetState: checkExpectedServices,
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -312,7 +325,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobUpdateFn: func(jobList *batchv1.JobList) {
failJob(&jobList.Items[0])
},
expectedJobSetCondition: jobset.JobSetFailed,
checkJobSetCondition: util.JobSetFailed,
},
},
}),
Expand All @@ -337,7 +350,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobUpdateFn: func(jobList *batchv1.JobList) {
failJob(&jobList.Items[1])
},
expectedJobSetCondition: jobset.JobSetFailed,
checkJobSetCondition: util.JobSetFailed,
},
},
}),
Expand All @@ -360,8 +373,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -372,11 +385,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
updates: []*update{
{
expectedJobSetCondition: jobset.JobSetSuspended,
checkJobSetState: func(js *jobset.JobSet) {
ginkgo.By("checking all jobs are suspended")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, true).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetSuspended,
},
},
}),
Expand All @@ -386,11 +399,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
updates: []*update{
{
expectedJobSetCondition: jobset.JobSetSuspended,
checkJobSetState: func(js *jobset.JobSet) {
ginkgo.By("checking all jobs are suspended")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, true).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetSuspended,
},
{
jobSetUpdateFn: func(js *jobset.JobSet) {
Expand All @@ -400,10 +413,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
ginkgo.By("checking all jobs are resumed")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, false).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetResumed,
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -422,11 +436,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobSetUpdateFn: func(js *jobset.JobSet) {
suspendJobSet(js, true)
},
expectedJobSetCondition: jobset.JobSetSuspended,
checkJobSetState: func(js *jobset.JobSet) {
ginkgo.By("checking all jobs are suspended")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, true).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetSuspended,
},
},
}),
Expand Down
85 changes: 72 additions & 13 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,20 @@ package util

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/v1alpha1"
)

func CheckJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, condition jobset.JobSetConditionType) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
return false, err
}
for _, c := range fetchedJS.Status.Conditions {
if c.Type == string(condition) {
return true, nil
}
}
return false, nil
}
const interval = time.Millisecond * 250

func NumExpectedJobs(js *jobset.JobSet) int {
expectedJobs := 0
Expand All @@ -44,10 +38,75 @@ func NumExpectedJobs(js *jobset.JobSet) int {
return expectedJobs
}

func CheckNumJobs(ctx context.Context, k8sClient client.Client, js *jobset.JobSet) (int, error) {
func NumJobs(ctx context.Context, k8sClient client.Client, js *jobset.JobSet) (int, error) {
var jobList batchv1.JobList
if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil {
return -1, err
}
return len(jobList.Items), nil
}

func JobSetCompleted(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetCompleted))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionTrue,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetFailed))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionTrue,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetSuspended(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetSuspended))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetSuspended),
Status: metav1.ConditionTrue,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetResumed(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By("checking jobset status is resumed")
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetSuspended),
Status: metav1.ConditionFalse,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetActive(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By("checking jobset status is active")
gomega.Consistently(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, []metav1.Condition{}).Should(gomega.Equal(true))
}

func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, conditions []metav1.Condition) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
return false, err
}
found := 0
for _, want := range conditions {
for _, c := range fetchedJS.Status.Conditions {
if c.Type == want.Type && c.Status == want.Status {
found += 1
}
}
}
return found == len(conditions), nil
}

0 comments on commit 64db1a6

Please sign in to comment.