Skip to content

Commit

Permalink
fix: check hooked nodes in executeWfLifeCycleHook and executeTmplLife…
Browse files Browse the repository at this point in the history
…CycleHook (#11113, #11117) (#11176)

Signed-off-by: toyamagu2021@gmail.com <toyamagu2021@gmail.com>
  • Loading branch information
toyamagu-2021 authored Jun 4, 2023
1 parent f3c948a commit 0c5a6dd
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 3 deletions.
156 changes: 155 additions & 1 deletion test/e2e/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,160 @@ spec:
})
}

func (s *HooksSuite) TestWorkflowLevelHooksWaitForTriggeredHook() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-
spec:
entrypoint: main
hooks:
running:
expression: workflow.status == "Running"
template: sleep
# This hook never triggered by following test.
# To guarantee workflow does not wait forever for untriggered hooks.
failed:
expression: workflow.status == "Failed"
template: sleep
templates:
- name: main
steps:
- - name: step1
template: exit0
- name: exit0
container:
image: alpine:latest
command: ["/bin/sh", "-c"]
args: ["exit 0"]
- name: sleep
container:
image: alpine:latest
command: ["/bin/sh", "-c"]
args: ["/bin/sleep 2; exit 0"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded)
assert.Equal(t, status.Progress, v1alpha1.Progress("2/2"))
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, ".hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

func (s *HooksSuite) TestTemplateLevelHooksWaitForTriggeredHook() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: example-steps
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: job
template: exit0
hooks:
running:
expression: steps['job'].status == "Running"
template: hook
failed:
expression: steps['job'].status == "Failed"
template: hook
- name: hook
script:
image: alpine:latest
command: [/bin/sh]
source: |
sleep 2
- name: exit0
script:
image: alpine:latest
command: [/bin/sh]
source: |
exit 0
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded)
assert.Equal(t, status.Progress, v1alpha1.Progress("2/2"))
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "job.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

// Ref: https://github.com/argoproj/argo-workflows/issues/11117
func (s *HooksSuite) TestTemplateLevelHooksWaitForTriggeredHookAndRespectSynchronization() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: example-steps-simple-mutex
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: job
template: exit0
hooks:
running:
expression: steps['job'].status == "Running"
template: sleep
succeed:
expression: steps['job'].status == "Succeeded"
template: sleep
- name: sleep
synchronization:
mutex:
name: job
script:
image: alpine:latest
command: [/bin/sh]
source: |
sleep 3
- name: exit0
script:
image: alpine:latest
command: [/bin/sh]
source: |
sleep 1
exit 0
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded)
assert.Equal(t, status.Progress, v1alpha1.Progress("3/3"))
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "job.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "job.hooks.succeed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

func TestHooksSuite(t *testing.T) {
suite.Run(t, new(HooksSuite))
}
}
10 changes: 8 additions & 2 deletions workflow/controller/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ func (woc *wfOperationCtx) executeWfLifeCycleHook(ctx context.Context, tmplCtx *
continue
}
hookNodeName := generateLifeHookNodeName(woc.wf.ObjectMeta.Name, string(hookName))
// To check a node was triggered.
hookedNode := woc.wf.GetNodeByName(hookNodeName)
if hook.Expression == "" {
return true, errors.Errorf(errors.CodeBadRequest, "Expression required for hook %s", hookNodeName)
}
execute, err := argoexpr.EvalBool(hook.Expression, env.GetFuncMap(template.EnvMap(woc.globalParams)))
if err != nil {
return true, err
}
if execute {
// executeTemplated should be invoked when hookedNode != nil, because we should reexecute the function to check mutex condition, etc.
if execute || hookedNode != nil {
woc.log.WithField("lifeCycleHook", hookName).WithField("node", hookNodeName).Infof("Running workflow level hooks")
hookNode, err := woc.executeTemplate(ctx, hookNodeName, &wfv1.WorkflowStep{Template: hook.Template, TemplateRef: hook.TemplateRef}, tmplCtx, hook.Arguments, &executeTemplateOpts{})
if err != nil {
Expand Down Expand Up @@ -58,14 +61,17 @@ func (woc *wfOperationCtx) executeTmplLifeCycleHook(ctx context.Context, scope *
continue
}
hookNodeName := generateLifeHookNodeName(parentNode.Name, string(hookName))
// To check a node was triggered
hookedNode := woc.wf.GetNodeByName(hookNodeName)
if hook.Expression == "" {
return false, errors.Errorf(errors.CodeBadRequest, "Expression required for hook %s", hookNodeName)
}
execute, err := argoexpr.EvalBool(hook.Expression, env.GetFuncMap(template.EnvMap(woc.globalParams.Merge(scope.getParameters()))))
if err != nil {
return false, err
}
if execute {
// executeTemplated should be invoked when hookedNode != nil, because we should reexecute the function to check mutex condition, etc.
if execute || hookedNode != nil {
outputs := parentNode.Outputs
if parentNode.Type == wfv1.NodeTypeRetry {
lastChildNode := getChildNodeIndex(parentNode, woc.wf.Status.Nodes, -1)
Expand Down
167 changes: 167 additions & 0 deletions workflow/controller/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
Expand Down Expand Up @@ -146,6 +147,7 @@ status:
assert.NotNil(t, node)
node = woc.wf.Status.Nodes.FindByDisplayName("lifecycle-hook-bgsf6.hooks.running")
assert.Nil(t, node)
assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
}

func TestExecuteTmplLifeCycleHook(t *testing.T) {
Expand Down Expand Up @@ -1069,3 +1071,168 @@ spec:
assert.Equal(t, wfv1.WorkflowFailed, woc.wf.Status.Phase)
assert.Equal(t, "invalid spec: templates.main.steps[0].step-1.foo Expression required", woc.wf.Status.Message)
}

func TestWfHookWfWaitForTriggeredHook(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hook-running
namespace: argo
spec:
entrypoint: main
hooks:
running:
expression: workflow.status == "Running"
template: sleep
# This hook never triggered by following test.
# To guarantee workflow does not wait forever for untriggered hooks.
failure:
expression: workflow.status == "Failed"
template: sleep
templates:
- name: main
container:
image: alpine:latest
command: [sh, -c]
args: ["echo", "This template finish fastest"]
- name: sleep
script:
image: alpine:latest
command: [sh]
source: |
sleep 10
`)

// Setup
cancel, controller := newController(wf)
defer cancel()
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodRunning)

// Check if running hook is triggered
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
node := woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running")
assert.NotNil(t, node)
assert.Equal(t, wfv1.NodePending, node.Phase)

// Make all pods running
makePodsPhase(ctx, woc, apiv1.PodRunning)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running")
assert.Equal(t, wfv1.NodeRunning, node.Phase)

// Make main pod completed
podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
pod, _ := podcs.Get(ctx, "hook-running", metav1.GetOptions{})
pod.Status.Phase = apiv1.PodSucceeded
updatedPod, _ := podcs.Update(ctx, pod, metav1.UpdateOptions{})
_ = woc.controller.podInformer.GetStore().Update(updatedPod)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running")
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)

// Make all pod completed
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.Progress("2/2"), woc.wf.Status.Progress)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestWfTemplHookWfWaitForTriggeredHook(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hook-running
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: job
template: exit0
hooks:
running:
expression: steps['job'].status == "Running"
template: hook
failed:
expression: steps['job'].status == "Failed"
template: hook
- name: hook
script:
image: alpine:latest
command: [/bin/sh]
source: |
sleep 5
- name: exit0
script:
image: alpine:latest
command: [/bin/sh]
source: |
exit 0
`)

// Setup
cancel, controller := newController(wf)
defer cancel()
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodRunning)

// Check if running hook is triggered
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
node := woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running")
assert.NotNil(t, node)
assert.Equal(t, wfv1.NodePending, node.Phase)

// Make all pods running
makePodsPhase(ctx, woc, apiv1.PodRunning)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
node = woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running")
assert.Equal(t, wfv1.NodeRunning, node.Phase)

// Make main pod completed
podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
pods, _ := podcs.List(ctx, metav1.ListOptions{})
pod := pods.Items[0]
pod.Status.Phase = apiv1.PodSucceeded
updatedPod, _ := podcs.Update(ctx, &pod, metav1.UpdateOptions{})
_ = woc.controller.podInformer.GetStore().Update(updatedPod)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress)
node = woc.wf.Status.Nodes.FindByDisplayName("job")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
node = woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running")
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)

// Make all pod completed
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.Progress("2/2"), woc.wf.Status.Progress)
node = woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
node = woc.wf.Status.Nodes.FindByDisplayName("job")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

0 comments on commit 0c5a6dd

Please sign in to comment.