From 5a93fdff7b1141118a239f6e98471cdcd1ecb34e Mon Sep 17 00:00:00 2001 From: Rajadeepan D Ramesh Date: Tue, 2 Jul 2019 18:00:38 +0530 Subject: [PATCH] Support multiple events in the lifecycle policy --- pkg/admission/admission_controller.go | 40 ++++++++++++------- pkg/admission/admit_job_test.go | 22 +++++----- pkg/apis/batch/v1alpha1/job.go | 2 +- .../batch/v1alpha1/zz_generated.deepcopy.go | 5 +++ pkg/controllers/job/job_controller_util.go | 15 ++++++- .../job/job_controller_util_test.go | 8 ++-- test/e2e/job_error_handling.go | 21 +++++----- test/e2e/mpi.go | 3 +- 8 files changed, 72 insertions(+), 44 deletions(-) diff --git a/pkg/admission/admission_controller.go b/pkg/admission/admission_controller.go index 7d502d2e773..edecf3433b7 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/admission_controller.go @@ -115,32 +115,42 @@ func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path) exitCodes := map[int32]struct{}{} for _, policy := range policies { - if policy.Event != "" && policy.ExitCode != nil { + if len(policy.Event) != 0 && policy.ExitCode != nil { err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously")) break } - if policy.Event == "" && policy.ExitCode == nil { + if len(policy.Event) == 0 && policy.ExitCode == nil { err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified")) break } - if policy.Event != "" { - if allow, ok := policyEventMap[policy.Event]; !ok || !allow { - err = multierror.Append(err, field.Invalid(fldPath, policy.Event, fmt.Sprintf("invalid policy event"))) - break - } - - if allow, ok := policyActionMap[policy.Action]; !ok || !allow { - err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action"))) - break + if len(policy.Event) != 0 { + bFlag := false + for _, event := range policy.Event { + if allow, ok := policyEventMap[event]; !ok || !allow { + err = multierror.Append(err, field.Invalid(fldPath, event, fmt.Sprintf("invalid policy event"))) + bFlag = true + break + } + + if allow, ok := policyActionMap[policy.Action]; !ok || !allow { + err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action"))) + bFlag = true + break + } + if _, found := policyEvents[event]; found { + err = multierror.Append(err, fmt.Errorf("duplicate event %v", event)) + bFlag = true + break + } else { + policyEvents[event] = struct{}{} + } } - if _, found := policyEvents[policy.Event]; found { - err = multierror.Append(err, fmt.Errorf("duplicate event %v", policy.Event)) + if bFlag == true { break - } else { - policyEvents[policy.Event] = struct{}{} } + } else { if *policy.ExitCode == 0 { err = multierror.Append(err, fmt.Errorf("0 is not a valid error code")) diff --git a/pkg/admission/admit_job_test.go b/pkg/admission/admit_job_test.go index 5f019b4485d..ad897125bf1 100644 --- a/pkg/admission/admit_job_test.go +++ b/pkg/admission/admit_job_test.go @@ -163,11 +163,11 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.PodFailedEvent, + Event: []v1alpha1.Event{v1alpha1.PodFailedEvent}, Action: v1alpha1.AbortJobAction, }, { - Event: v1alpha1.PodFailedEvent, + Event: []v1alpha1.Event{v1alpha1.PodFailedEvent}, Action: v1alpha1.RestartJobAction, }, }, @@ -486,7 +486,7 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.PodFailedEvent, + Event: []v1alpha1.Event{v1alpha1.PodFailedEvent}, Action: v1alpha1.AbortJobAction, ExitCode: &policyExitCode, }, @@ -570,7 +570,7 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.Event("someFakeEvent"), + Event: []v1alpha1.Event{v1alpha1.Event("someFakeEvent")}, Action: v1alpha1.AbortJobAction, }, }, @@ -612,7 +612,7 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.PodEvictedEvent, + Event: []v1alpha1.Event{v1alpha1.PodEvictedEvent}, Action: v1alpha1.Action("someFakeAction"), }, }, @@ -746,11 +746,11 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.AnyEvent, + Event: []v1alpha1.Event{v1alpha1.AnyEvent}, Action: v1alpha1.AbortJobAction, }, { - Event: v1alpha1.PodFailedEvent, + Event: []v1alpha1.Event{v1alpha1.PodFailedEvent}, Action: v1alpha1.RestartJobAction, }, }, @@ -792,7 +792,7 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.AnyEvent, + Event: []v1alpha1.Event{v1alpha1.AnyEvent}, Action: v1alpha1.AbortJobAction, }, }, @@ -839,7 +839,7 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.AnyEvent, + Event: []v1alpha1.Event{v1alpha1.AnyEvent}, Action: v1alpha1.AbortJobAction, }, }, @@ -887,11 +887,11 @@ func TestValidateExecution(t *testing.T) { }, Policies: []v1alpha1.LifecyclePolicy{ { - Event: v1alpha1.AnyEvent, + Event: []v1alpha1.Event{v1alpha1.AnyEvent}, Action: v1alpha1.AbortJobAction, }, { - Event: v1alpha1.PodFailedEvent, + Event: []v1alpha1.Event{v1alpha1.PodFailedEvent}, Action: v1alpha1.RestartJobAction, }, }, diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index 9190cab4ccf..05a09d1a38b 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -178,7 +178,7 @@ type LifecyclePolicy struct { // The Event recorded by scheduler; the controller takes actions // according to this Event. // +optional - Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"` + Event []Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"` // The exit code of the pod container, controller will take action // according to this code. diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index 7fd00a4301a..b4293b29d77 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -188,6 +188,11 @@ func (in *JobStatus) DeepCopy() *JobStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LifecyclePolicy) DeepCopyInto(out *LifecyclePolicy) { *out = *in + if in.Event != nil { + in, out := &in.Event, &out.Event + *out = make([]Event, len(*in)) + copy(*out, *in) + } if in.ExitCode != nil { in, out := &in.ExitCode, &out.ExitCode *out = new(int32) diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 8104d8f0f44..36ce3929fb9 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/controllers/apis" @@ -144,7 +145,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { if task.Name == req.TaskName { for _, policy := range task.Policies { if len(policy.Event) > 0 && len(req.Event) > 0 { - if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { + if checkEventExist(policy.Event, req.Event) || checkEventExist(policy.Event, vkv1.AnyEvent) { return policy.Action } } @@ -162,7 +163,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { // Parse Job level policies for _, policy := range job.Spec.Policies { if len(policy.Event) > 0 && len(req.Event) > 0 { - if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { + if checkEventExist(policy.Event, req.Event) || checkEventExist(policy.Event, vkv1.AnyEvent) { return policy.Action } } @@ -176,6 +177,16 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { return vkv1.SyncJobAction } +func checkEventExist(policyEvents []v1alpha1.Event, reqEvent v1alpha1.Event) bool { + for _, event := range policyEvents { + if event == reqEvent { + return true + } + } + return false + +} + func addResourceList(list, new v1.ResourceList) { for name, quantity := range new { if value, ok := list[name]; !ok { diff --git a/pkg/controllers/job/job_controller_util_test.go b/pkg/controllers/job/job_controller_util_test.go index cb63ae51ab6..40531e52e4e 100644 --- a/pkg/controllers/job/job_controller_util_test.go +++ b/pkg/controllers/job/job_controller_util_test.go @@ -406,7 +406,7 @@ func TestApplyPolicies(t *testing.T) { Policies: []v1alpha1.LifecyclePolicy{ { Action: v1alpha1.SyncJobAction, - Event: v1alpha1.CommandIssuedEvent, + Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent}, ExitCode: &errorCode0, }, }, @@ -453,7 +453,7 @@ func TestApplyPolicies(t *testing.T) { Policies: []v1alpha1.LifecyclePolicy{ { Action: v1alpha1.SyncJobAction, - Event: v1alpha1.CommandIssuedEvent, + Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent}, }, }, }, @@ -543,7 +543,7 @@ func TestApplyPolicies(t *testing.T) { Policies: []v1alpha1.LifecyclePolicy{ { Action: v1alpha1.SyncJobAction, - Event: v1alpha1.CommandIssuedEvent, + Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent}, }, }, }, @@ -589,7 +589,7 @@ func TestApplyPolicies(t *testing.T) { Policies: []v1alpha1.LifecyclePolicy{ { Action: v1alpha1.SyncJobAction, - Event: v1alpha1.CommandIssuedEvent, + Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent}, ExitCode: &errorCode0, }, }, diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index 73287a9e71d..28b66c718b0 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -23,6 +23,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" jobutil "volcano.sh/volcano/pkg/controllers/job" ) @@ -39,7 +40,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.RestartJobAction, - Event: vkv1.PodFailedEvent, + Event: []v1alpha1.Event{vkv1.PodFailedEvent}, }, }, tasks: []taskSpec{ @@ -76,7 +77,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.TerminateJobAction, - Event: vkv1.PodFailedEvent, + Event: []v1alpha1.Event{vkv1.PodFailedEvent}, }, }, tasks: []taskSpec{ @@ -113,7 +114,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.AbortJobAction, - Event: vkv1.PodFailedEvent, + Event: []v1alpha1.Event{vkv1.PodFailedEvent}, }, }, tasks: []taskSpec{ @@ -150,7 +151,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.RestartJobAction, - Event: vkv1.PodEvictedEvent, + Event: []v1alpha1.Event{vkv1.PodEvictedEvent}, }, }, tasks: []taskSpec{ @@ -194,7 +195,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.TerminateJobAction, - Event: vkv1.PodEvictedEvent, + Event: []v1alpha1.Event{vkv1.PodEvictedEvent}, }, }, tasks: []taskSpec{ @@ -238,7 +239,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.AbortJobAction, - Event: vkv1.PodEvictedEvent, + Event: []v1alpha1.Event{vkv1.PodEvictedEvent}, }, }, tasks: []taskSpec{ @@ -282,7 +283,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.RestartJobAction, - Event: vkv1.AnyEvent, + Event: []v1alpha1.Event{vkv1.AnyEvent}, }, }, tasks: []taskSpec{ @@ -326,7 +327,7 @@ var _ = Describe("Job Error Handling", func() { namespace: "test", policies: []vkv1.LifecyclePolicy{ { - Event: vkv1.JobUnknownEvent, + Event: []v1alpha1.Event{vkv1.JobUnknownEvent}, Action: vkv1.RestartJobAction, }, }, @@ -385,7 +386,7 @@ var _ = Describe("Job Error Handling", func() { namespace: "test", policies: []vkv1.LifecyclePolicy{ { - Event: vkv1.JobUnknownEvent, + Event: []v1alpha1.Event{vkv1.JobUnknownEvent}, Action: vkv1.AbortJobAction, }, }, @@ -440,7 +441,7 @@ var _ = Describe("Job Error Handling", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.CompleteJobAction, - Event: vkv1.TaskCompletedEvent, + Event: []v1alpha1.Event{vkv1.TaskCompletedEvent}, }, }, tasks: []taskSpec{ diff --git a/test/e2e/mpi.go b/test/e2e/mpi.go index 0c4be548ef6..a3cfc954dc9 100644 --- a/test/e2e/mpi.go +++ b/test/e2e/mpi.go @@ -20,6 +20,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" ) @@ -35,7 +36,7 @@ var _ = Describe("MPI E2E Test", func() { policies: []vkv1.LifecyclePolicy{ { Action: vkv1.CompleteJobAction, - Event: vkv1.TaskCompletedEvent, + Event: []v1alpha1.Event{vkv1.TaskCompletedEvent}, }, }, plugins: map[string][]string{