Skip to content

Commit

Permalink
Support multiple events in the lifecycle policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajadeepan D Ramesh committed Jul 3, 2019
1 parent 7cdd739 commit 5a93fdf
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 44 deletions.
40 changes: 25 additions & 15 deletions pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,42 @@ func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path)
exitCodes := map[int32]struct{}{}

for _, policy := range policies {
if policy.Event != "" && policy.ExitCode != nil {
if len(policy.Event) != 0 && policy.ExitCode != nil {
err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously"))
break
}

if policy.Event == "" && policy.ExitCode == nil {
if len(policy.Event) == 0 && policy.ExitCode == nil {
err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified"))
break
}

if policy.Event != "" {
if allow, ok := policyEventMap[policy.Event]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, policy.Event, fmt.Sprintf("invalid policy event")))
break
}

if allow, ok := policyActionMap[policy.Action]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action")))
break
if len(policy.Event) != 0 {
bFlag := false
for _, event := range policy.Event {
if allow, ok := policyEventMap[event]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, event, fmt.Sprintf("invalid policy event")))
bFlag = true
break
}

if allow, ok := policyActionMap[policy.Action]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action")))
bFlag = true
break
}
if _, found := policyEvents[event]; found {
err = multierror.Append(err, fmt.Errorf("duplicate event %v", event))
bFlag = true
break
} else {
policyEvents[event] = struct{}{}
}
}
if _, found := policyEvents[policy.Event]; found {
err = multierror.Append(err, fmt.Errorf("duplicate event %v", policy.Event))
if bFlag == true {
break
} else {
policyEvents[policy.Event] = struct{}{}
}

} else {
if *policy.ExitCode == 0 {
err = multierror.Append(err, fmt.Errorf("0 is not a valid error code"))
Expand Down
22 changes: 11 additions & 11 deletions pkg/admission/admit_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.PodFailedEvent,
Event: []v1alpha1.Event{v1alpha1.PodFailedEvent},
Action: v1alpha1.AbortJobAction,
},
{
Event: v1alpha1.PodFailedEvent,
Event: []v1alpha1.Event{v1alpha1.PodFailedEvent},
Action: v1alpha1.RestartJobAction,
},
},
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.PodFailedEvent,
Event: []v1alpha1.Event{v1alpha1.PodFailedEvent},
Action: v1alpha1.AbortJobAction,
ExitCode: &policyExitCode,
},
Expand Down Expand Up @@ -570,7 +570,7 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.Event("someFakeEvent"),
Event: []v1alpha1.Event{v1alpha1.Event("someFakeEvent")},
Action: v1alpha1.AbortJobAction,
},
},
Expand Down Expand Up @@ -612,7 +612,7 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.PodEvictedEvent,
Event: []v1alpha1.Event{v1alpha1.PodEvictedEvent},
Action: v1alpha1.Action("someFakeAction"),
},
},
Expand Down Expand Up @@ -746,11 +746,11 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.AnyEvent,
Event: []v1alpha1.Event{v1alpha1.AnyEvent},
Action: v1alpha1.AbortJobAction,
},
{
Event: v1alpha1.PodFailedEvent,
Event: []v1alpha1.Event{v1alpha1.PodFailedEvent},
Action: v1alpha1.RestartJobAction,
},
},
Expand Down Expand Up @@ -792,7 +792,7 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.AnyEvent,
Event: []v1alpha1.Event{v1alpha1.AnyEvent},
Action: v1alpha1.AbortJobAction,
},
},
Expand Down Expand Up @@ -839,7 +839,7 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.AnyEvent,
Event: []v1alpha1.Event{v1alpha1.AnyEvent},
Action: v1alpha1.AbortJobAction,
},
},
Expand Down Expand Up @@ -887,11 +887,11 @@ func TestValidateExecution(t *testing.T) {
},
Policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.AnyEvent,
Event: []v1alpha1.Event{v1alpha1.AnyEvent},
Action: v1alpha1.AbortJobAction,
},
{
Event: v1alpha1.PodFailedEvent,
Event: []v1alpha1.Event{v1alpha1.PodFailedEvent},
Action: v1alpha1.RestartJobAction,
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type LifecyclePolicy struct {
// The Event recorded by scheduler; the controller takes actions
// according to this Event.
// +optional
Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`
Event []Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`

// The exit code of the pod container, controller will take action
// according to this code.
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/controllers/apis"
Expand Down Expand Up @@ -144,7 +145,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
if task.Name == req.TaskName {
for _, policy := range task.Policies {
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
if checkEventExist(policy.Event, req.Event) || checkEventExist(policy.Event, vkv1.AnyEvent) {
return policy.Action
}
}
Expand All @@ -162,7 +163,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
// Parse Job level policies
for _, policy := range job.Spec.Policies {
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
if checkEventExist(policy.Event, req.Event) || checkEventExist(policy.Event, vkv1.AnyEvent) {
return policy.Action
}
}
Expand All @@ -176,6 +177,16 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
return vkv1.SyncJobAction
}

func checkEventExist(policyEvents []v1alpha1.Event, reqEvent v1alpha1.Event) bool {
for _, event := range policyEvents {
if event == reqEvent {
return true
}
}
return false

}

func addResourceList(list, new v1.ResourceList) {
for name, quantity := range new {
if value, ok := list[name]; !ok {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/job/job_controller_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func TestApplyPolicies(t *testing.T) {
Policies: []v1alpha1.LifecyclePolicy{
{
Action: v1alpha1.SyncJobAction,
Event: v1alpha1.CommandIssuedEvent,
Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent},
ExitCode: &errorCode0,
},
},
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestApplyPolicies(t *testing.T) {
Policies: []v1alpha1.LifecyclePolicy{
{
Action: v1alpha1.SyncJobAction,
Event: v1alpha1.CommandIssuedEvent,
Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent},
},
},
},
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestApplyPolicies(t *testing.T) {
Policies: []v1alpha1.LifecyclePolicy{
{
Action: v1alpha1.SyncJobAction,
Event: v1alpha1.CommandIssuedEvent,
Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent},
},
},
},
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestApplyPolicies(t *testing.T) {
Policies: []v1alpha1.LifecyclePolicy{
{
Action: v1alpha1.SyncJobAction,
Event: v1alpha1.CommandIssuedEvent,
Event: []v1alpha1.Event{v1alpha1.CommandIssuedEvent},
ExitCode: &errorCode0,
},
},
Expand Down
21 changes: 11 additions & 10 deletions test/e2e/job_error_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
jobutil "volcano.sh/volcano/pkg/controllers/job"
)
Expand All @@ -39,7 +40,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.RestartJobAction,
Event: vkv1.PodFailedEvent,
Event: []v1alpha1.Event{vkv1.PodFailedEvent},
},
},
tasks: []taskSpec{
Expand Down Expand Up @@ -76,7 +77,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.TerminateJobAction,
Event: vkv1.PodFailedEvent,
Event: []v1alpha1.Event{vkv1.PodFailedEvent},
},
},
tasks: []taskSpec{
Expand Down Expand Up @@ -113,7 +114,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.AbortJobAction,
Event: vkv1.PodFailedEvent,
Event: []v1alpha1.Event{vkv1.PodFailedEvent},
},
},
tasks: []taskSpec{
Expand Down Expand Up @@ -150,7 +151,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.RestartJobAction,
Event: vkv1.PodEvictedEvent,
Event: []v1alpha1.Event{vkv1.PodEvictedEvent},
},
},
tasks: []taskSpec{
Expand Down Expand Up @@ -194,7 +195,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.TerminateJobAction,
Event: vkv1.PodEvictedEvent,
Event: []v1alpha1.Event{vkv1.PodEvictedEvent},
},
},
tasks: []taskSpec{
Expand Down Expand Up @@ -238,7 +239,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.AbortJobAction,
Event: vkv1.PodEvictedEvent,
Event: []v1alpha1.Event{vkv1.PodEvictedEvent},
},
},
tasks: []taskSpec{
Expand Down Expand Up @@ -282,7 +283,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.RestartJobAction,
Event: vkv1.AnyEvent,
Event: []v1alpha1.Event{vkv1.AnyEvent},
},
},
tasks: []taskSpec{
Expand Down Expand Up @@ -326,7 +327,7 @@ var _ = Describe("Job Error Handling", func() {
namespace: "test",
policies: []vkv1.LifecyclePolicy{
{
Event: vkv1.JobUnknownEvent,
Event: []v1alpha1.Event{vkv1.JobUnknownEvent},
Action: vkv1.RestartJobAction,
},
},
Expand Down Expand Up @@ -385,7 +386,7 @@ var _ = Describe("Job Error Handling", func() {
namespace: "test",
policies: []vkv1.LifecyclePolicy{
{
Event: vkv1.JobUnknownEvent,
Event: []v1alpha1.Event{vkv1.JobUnknownEvent},
Action: vkv1.AbortJobAction,
},
},
Expand Down Expand Up @@ -440,7 +441,7 @@ var _ = Describe("Job Error Handling", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.CompleteJobAction,
Event: vkv1.TaskCompletedEvent,
Event: []v1alpha1.Event{vkv1.TaskCompletedEvent},
},
},
tasks: []taskSpec{
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

Expand All @@ -35,7 +36,7 @@ var _ = Describe("MPI E2E Test", func() {
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.CompleteJobAction,
Event: vkv1.TaskCompletedEvent,
Event: []v1alpha1.Event{vkv1.TaskCompletedEvent},
},
},
plugins: map[string][]string{
Expand Down

0 comments on commit 5a93fdf

Please sign in to comment.