Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #119: fix default success policy #122

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

// If all jobs have succeeded, JobSet has succeeded.
if len(ownedJobs.successful) == len(js.Spec.ReplicatedJobs) {
// TODO(#120): ensure that the successful jobs counted are actually the ones that we expect JobSet to have
if len(ownedJobs.successful) == totalJobs(&js) {
if err := r.ensureCondition(ctx, &js, corev1.EventTypeNormal, metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Expand Down Expand Up @@ -589,6 +590,14 @@ func dnsHostnamesEnabled(rjob *jobset.ReplicatedJob) bool {
return rjob.Network.EnableDNSHostnames != nil && *rjob.Network.EnableDNSHostnames
}

func totalJobs(js *jobset.JobSet) int {
totalJobs := 0
for _, rjob := range js.Spec.ReplicatedJobs {
totalJobs += rjob.Replicas
}
return totalJobs
}

func concat[T any](slices ...[]T) []T {
var result []T
for _, slice := range slices {
Expand Down
11 changes: 8 additions & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ var _ = ginkgo.Describe("JobSet", func() {
gomega.Eventually(k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &jobset.JobSet{}), timeout, interval).Should(gomega.Succeed())

ginkgo.By("checking all jobs were created successfully")
gomega.Eventually(util.CheckNumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))
gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))

// Check jobset status if specified.
ginkgo.By("checking jobset condition")
gomega.Eventually(util.CheckJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, jobset.JobSetCompleted).Should(gomega.Equal(true))
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})
}) // end of Describe
Expand All @@ -96,6 +96,10 @@ func pingTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
podHostnames = append(podHostnames, fmt.Sprintf("%s-%s-%d-0.%s-%s", jsName, rjobName, jobIdx, jsName, rjobName))
}

// This bash script loops infinitely until it successfully pings all pods by hostname.
// Once successful, it sleeps for 5 seconds to reduce flakiness, since occasionally
// all pods but one will successfully ping eachother and complete before the last one
// successfully pings them all, resulting in a failed test run.
cmd := fmt.Sprintf(`for pod in {"%s","%s","%s","%s"}
do
gotStatus="-1"
Expand All @@ -110,7 +114,8 @@ do
fi
done
echo "Successfully pinged pod: $pod"
done`, podHostnames[0], podHostnames[1], podHostnames[2], podHostnames[3])
done
sleep 5`, podHostnames[0], podHostnames[1], podHostnames[2], podHostnames[3])

return testing.MakeJobSet(jsName, ns.Name).
ReplicatedJob(testing.MakeReplicatedJob(rjobName).
Expand Down
66 changes: 40 additions & 26 deletions test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
// Create test namespace before each test.
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-ns-",
GenerateName: "jobset-ns-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())
Expand All @@ -199,11 +199,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
// update contains the mutations to perform on the jobs/jobset and the
// checks to perform afterwards.
type update struct {
jobSetUpdateFn func(*jobset.JobSet)
jobUpdateFn func(*batchv1.JobList)
checkJobSetState func(*jobset.JobSet)
checkJobState func(*jobset.JobSet)
expectedJobSetCondition jobset.JobSetConditionType
jobSetUpdateFn func(*jobset.JobSet)
jobUpdateFn func(*batchv1.JobList)
checkJobSetState func(*jobset.JobSet)
checkJobState func(*jobset.JobSet)
checkJobSetCondition func(context.Context, client.Client, *jobset.JobSet, time.Duration)
}

type testCase struct {
Expand All @@ -228,7 +228,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
gomega.Eventually(k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &jobset.JobSet{}), timeout, interval).Should(gomega.Succeed())

ginkgo.By("checking all jobs were created successfully")
gomega.Eventually(util.CheckNumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))
gomega.Eventually(util.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(util.NumExpectedJobs(js)))

// Perform a series of updates to jobset resources and check resulting jobset state after each update.
for _, up := range tc.updates {
Expand All @@ -252,18 +252,31 @@ var _ = ginkgo.Describe("JobSet controller", func() {
}

// Check jobset status if specified.
if up.expectedJobSetCondition != "" {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", up.expectedJobSetCondition))
gomega.Eventually(util.CheckJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, up.expectedJobSetCondition).Should(gomega.Equal(true))
if up.checkJobSetCondition != nil {
up.checkJobSetCondition(ctx, k8sClient, &jobSet, timeout)
}
}
},
ginkgo.Entry("jobset should succeed after all jobs succeed", &testCase{
makeJobSet: testJobSet,
updates: []*update{
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
ginkgo.Entry("jobset should not succeed if any job is not completed", &testCase{
makeJobSet: testJobSet,
updates: []*update{
{
jobUpdateFn: func(jobList *batchv1.JobList) {
ginkgo.By("completing all but 1 job")
for i := 0; i < len(jobList.Items)-1; i++ {
completeJob(&jobList.Items[i])
}
},
checkJobSetCondition: util.JobSetActive,
},
},
}),
Expand All @@ -274,7 +287,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobUpdateFn: func(jobList *batchv1.JobList) {
failJob(&jobList.Items[0])
},
expectedJobSetCondition: jobset.JobSetFailed,
checkJobSetCondition: util.JobSetFailed,
},
},
}),
Expand All @@ -285,8 +298,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
checkJobSetState: checkExpectedServices,
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -297,8 +310,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
checkJobSetState: checkExpectedServices,
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -312,7 +325,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobUpdateFn: func(jobList *batchv1.JobList) {
failJob(&jobList.Items[0])
},
expectedJobSetCondition: jobset.JobSetFailed,
checkJobSetCondition: util.JobSetFailed,
},
},
}),
Expand All @@ -337,7 +350,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobUpdateFn: func(jobList *batchv1.JobList) {
failJob(&jobList.Items[1])
},
expectedJobSetCondition: jobset.JobSetFailed,
checkJobSetCondition: util.JobSetFailed,
},
},
}),
Expand All @@ -360,8 +373,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -372,11 +385,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
updates: []*update{
{
expectedJobSetCondition: jobset.JobSetSuspended,
checkJobSetState: func(js *jobset.JobSet) {
ginkgo.By("checking all jobs are suspended")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, true).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetSuspended,
},
},
}),
Expand All @@ -386,11 +399,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
updates: []*update{
{
expectedJobSetCondition: jobset.JobSetSuspended,
checkJobSetState: func(js *jobset.JobSet) {
ginkgo.By("checking all jobs are suspended")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, true).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetSuspended,
},
{
jobSetUpdateFn: func(js *jobset.JobSet) {
Expand All @@ -400,10 +413,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
ginkgo.By("checking all jobs are resumed")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, false).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetResumed,
},
{
jobUpdateFn: completeAllJobs,
expectedJobSetCondition: jobset.JobSetCompleted,
jobUpdateFn: completeAllJobs,
checkJobSetCondition: util.JobSetCompleted,
},
},
}),
Expand All @@ -422,11 +436,11 @@ var _ = ginkgo.Describe("JobSet controller", func() {
jobSetUpdateFn: func(js *jobset.JobSet) {
suspendJobSet(js, true)
},
expectedJobSetCondition: jobset.JobSetSuspended,
checkJobSetState: func(js *jobset.JobSet) {
ginkgo.By("checking all jobs are suspended")
gomega.Eventually(matchJobsSuspendState, timeout, interval).WithArguments(js, true).Should(gomega.Equal(true))
},
checkJobSetCondition: util.JobSetSuspended,
},
},
}),
Expand Down
85 changes: 72 additions & 13 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,20 @@ package util

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/v1alpha1"
)

func CheckJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, condition jobset.JobSetConditionType) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
return false, err
}
for _, c := range fetchedJS.Status.Conditions {
if c.Type == string(condition) {
return true, nil
}
}
return false, nil
}
const interval = time.Millisecond * 250

func NumExpectedJobs(js *jobset.JobSet) int {
expectedJobs := 0
Expand All @@ -44,10 +38,75 @@ func NumExpectedJobs(js *jobset.JobSet) int {
return expectedJobs
}

func CheckNumJobs(ctx context.Context, k8sClient client.Client, js *jobset.JobSet) (int, error) {
func NumJobs(ctx context.Context, k8sClient client.Client, js *jobset.JobSet) (int, error) {
var jobList batchv1.JobList
if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil {
return -1, err
}
return len(jobList.Items), nil
}

func JobSetCompleted(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetCompleted))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionTrue,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetFailed))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionTrue,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetSuspended(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetSuspended))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetSuspended),
Status: metav1.ConditionTrue,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetResumed(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By("checking jobset status is resumed")
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetSuspended),
Status: metav1.ConditionFalse,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
}

func JobSetActive(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By("checking jobset status is active")
gomega.Consistently(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, []metav1.Condition{}).Should(gomega.Equal(true))
}

func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, conditions []metav1.Condition) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
return false, err
}
found := 0
for _, want := range conditions {
for _, c := range fetchedJS.Status.Conditions {
if c.Type == want.Type && c.Status == want.Status {
found += 1
}
}
}
return found == len(conditions), nil
}