Skip to content

Commit

Permalink
fix: Fix not working Running state lifecycle hooks in dag task. Fixes #…
Browse files Browse the repository at this point in the history
…9897 (#10307)

Signed-off-by: GeunSam2 <rootiron96@gmail.com>
  • Loading branch information
GeunSam2 authored and terrytangyuan committed Apr 11, 2023
1 parent 87b3910 commit a3bfce2
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 11 deletions.
213 changes: 211 additions & 2 deletions test/e2e/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ spec:
- name: argosay
container:
image: argoproj/argosay:v2
command: ["/argosay; exit 1"]
command: ["/argosay", "sleep 5", "exit 1"]
- name: hook
container:
image: argoproj/argosay:v2
Expand All @@ -111,6 +111,215 @@ spec:
})
}

func (s *HooksSuite) TestTemplateLevelHooksStepSuccessVersion() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-tmpl-level-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: step-1
hooks:
running:
expression: steps["step-1"].status == "Running"
template: argosay
succeed:
expression: steps["step-1"].status == "Succeeded"
template: argosay
template: argosay
- - name: step-2
hooks:
running:
expression: steps["step-2"].status == "Running"
template: argosay
succeed:
expression: steps["step-2"].status == "Succeeded"
template: argosay
template: argosay
- name: argosay
container:
image: argoproj/argosay:v2
command: ["/argosay"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, v1alpha1.WorkflowSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.succeed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-2.hooks.succeed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-2.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

func (s *HooksSuite) TestTemplateLevelHooksStepFailVersion() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-tmpl-level-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: step-1
hooks:
running:
expression: steps["step-1"].status == "Running"
template: hook
failed:
expression: steps["step-1"].status == "Failed"
template: hook
template: argosay
- name: argosay
container:
image: argoproj/argosay:v2
command: ["/argosay", "sleep 5", "exit 1"]
- name: hook
container:
image: argoproj/argosay:v2
command: ["/argosay"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, v1alpha1.WorkflowFailed, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.failed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

func (s *HooksSuite) TestTemplateLevelHooksDagSuccessVersion() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-tmpl-level-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: step-1
hooks:
running:
expression: tasks["step-1"].status == "Running"
template: argosay
succeed:
expression: tasks["step-1"].status == "Succeeded"
template: argosay
template: argosay
- name: step-2
hooks:
running:
expression: tasks["step-2"].status == "Running"
template: argosay
succeed:
expression: tasks["step-2"].status == "Succeeded"
template: argosay
template: argosay
dependencies: [step-1]
- name: argosay
container:
image: argoproj/argosay:v2
command: ["/argosay"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, v1alpha1.WorkflowSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.succeed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-2.hooks.succeed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-2.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

func (s *HooksSuite) TestTemplateLevelHooksDagFailVersion() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-tmpl-level-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: step-1
hooks:
running:
expression: tasks["step-1"].status == "Running"
template: hook
failed:
expression: tasks["step-1"].status == "Failed"
template: hook
template: argosay
- name: argosay
container:
image: argoproj/argosay:v2
command: ["/argosay", "sleep 5", "exit 1"]
- name: hook
container:
image: argoproj/argosay:v2
command: ["/argosay"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, v1alpha1.WorkflowFailed, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.failed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "step-1.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

func TestHooksSuite(t *testing.T) {
suite.Run(t, new(HooksSuite))
}
27 changes: 18 additions & 9 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,24 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
node := dagCtx.getTaskNode(taskName)
task := dagCtx.GetTask(taskName)
log := woc.log.WithField("taskName", taskName)
if node != nil && (node.Fulfilled() || node.Phase == wfv1.NodeRunning) {
scope, err := woc.buildLocalScopeFromTask(dagCtx, task)
if err != nil {
log.Error("Failed to build local scope from task")
woc.markNodeError(node.Name, err)
return
}
scope.addParamToScope(fmt.Sprintf("tasks.%s.status", task.Name), string(node.Phase))
hookCompleted, err := woc.executeTmplLifeCycleHook(ctx, scope, dagCtx.GetTask(taskName).Hooks, node, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName)
if err != nil {
woc.markNodeError(node.Name, err)
}
// Check all hooks are completes
if !hookCompleted {
return
}
}

if node != nil && node.Fulfilled() {
// Collect the completed task metrics
_, tmpl, _, _ := dagCtx.tmplCtx.ResolveTemplate(task)
Expand All @@ -379,22 +397,13 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)
}

task := dagCtx.GetTask(taskName)
scope, err := woc.buildLocalScopeFromTask(dagCtx, task)
if err != nil {
woc.markNodeError(node.Name, err)
log.Error("Failed to build local scope from task")
return
}
scope.addParamToScope(fmt.Sprintf("tasks.%s.status", task.Name), string(node.Phase))
hookCompleted, err := woc.executeTmplLifeCycleHook(ctx, scope, dagCtx.GetTask(taskName).Hooks, node, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName)
if err != nil {
woc.markNodeError(node.Name, err)
}
// Check all hooks are completes
if !hookCompleted {
return
}

if node.Completed() {
// Run the node's onExit node, if any.
Expand Down

0 comments on commit a3bfce2

Please sign in to comment.