Skip to content

Commit

Permalink
feat: fix workflow hangs during executeDAGTask. Fixes #6557 (#8992)
Browse files Browse the repository at this point in the history
* feat(controller): fix workflow hangs during executeDAGTask

Signed-off-by: smile-luobin <luobin_smile@163.com>

* Add retryTypeDagTaskWithExitNode test

Signed-off-by: smile-luobin <smile.luobin@gmail.com>

* modify some comments

Signed-off-by: smile-luobin <smile.luobin@gmail.com>

* fix some spelling mistakes

Signed-off-by: smile-luobin <smile.luobin@gmail.com>
  • Loading branch information
smile-luobin authored Aug 9, 2022
1 parent 00660fb commit f481e3b
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 1 deletion.
10 changes: 9 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}

// Finally execute the template
_, err = woc.executeTemplate(ctx, taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate})
node, err = woc.executeTemplate(ctx, taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate})
if err != nil {
switch err {
case ErrDeadlineExceeded:
Expand All @@ -524,6 +524,14 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
return
}
}
if node.Completed() {
// if the node type is NodeTypeRetry, and its last child is completed, it will be completed after woc.executeTemplate;
hasOnExitNode, onExitNode, err := woc.runOnExitNode(ctx, task.GetExitHook(woc.execWf.Spec.Arguments), node, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName)
if hasOnExitNode && (onExitNode == nil || !onExitNode.Fulfilled() || err != nil) {
// The onExit node is either not complete or has errored out, return.
return
}
}
}

if taskGroupNode != nil {
Expand Down
142 changes: 142 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3448,3 +3448,145 @@ func TestDagHttpChildrenAssigned(t *testing.T) {
}
}
}

var retryTypeDagTaskRunExitNodeAfterCompleted = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
labels:
workflows.argoproj.io/phase: Running
name: test-workflow-with-hang-cztfs
namespace: argo-system
spec:
entrypoint: dag
templates:
- name: linuxExitHandler
steps:
- - name: printExit
template: printExit
- container:
args:
- echo
- exit
command:
- /argosay
image: argoproj/argosay:v2
name: ""
name: printExit
- container:
args:
- echo
- a
command:
- /argosay
image: argoproj/argosay:v2
name: ""
name: printA
retryStrategy:
limit: "3"
retryPolicy: OnError
- dag:
tasks:
- hooks:
exit:
template: linuxExitHandler
name: printA
template: printA
- depends: printA.Succeeded
hooks:
exit:
template: linuxExitHandler
name: dependencyTesting
template: printA
name: dag
status:
nodes:
test-workflow-with-hang-cztfs:
children:
- test-workflow-with-hang-cztfs-1556528266
displayName: test-workflow-with-hang-cztfs
finishedAt: null
id: test-workflow-with-hang-cztfs
name: test-workflow-with-hang-cztfs
phase: Running
progress: 4/4
startedAt: "2022-08-04T02:28:38Z"
templateName: dag
templateScope: local/test-workflow-with-hang-cztfs
type: DAG
test-workflow-with-hang-cztfs-589413809:
boundaryID: test-workflow-with-hang-cztfs
children:
- test-workflow-with-hang-cztfs-527957059
displayName: printA(0)
finishedAt: "2022-08-04T02:28:43Z"
hostNodeName: node2
id: test-workflow-with-hang-cztfs-589413809
name: test-workflow-with-hang-cztfs.printA(0)
outputs:
exitCode: "0"
phase: Succeeded
progress: 1/1
resourcesDuration:
cpu: 2
memory: 2
startedAt: "2022-08-04T02:28:38Z"
templateName: printA
templateScope: local/test-workflow-with-hang-cztfs
type: Pod
test-workflow-with-hang-cztfs-1556528266:
boundaryID: test-workflow-with-hang-cztfs
children:
- test-workflow-with-hang-cztfs-589413809
displayName: printA
finishedAt: "2022-08-04T02:28:48Z"
id: test-workflow-with-hang-cztfs-1556528266
name: test-workflow-with-hang-cztfs.printA
outputs:
exitCode: "0"
phase: Succeeded
progress: 4/4
resourcesDuration:
cpu: 5
memory: 5
startedAt: "2022-08-04T02:28:38Z"
templateName: printA
templateScope: local/test-workflow-with-hang-cztfs
type: Retry
phase: Running
progress: 4/4
resourcesDuration:
cpu: 5
memory: 5
startedAt: "2022-08-04T02:28:38Z"
`

func TestRetryTypeDagTaskRunExitNodeAfterCompleted(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(retryTypeDagTaskRunExitNodeAfterCompleted)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
// retryTypeDAGTask completed
printAChild := woc.wf.Status.Nodes.FindByDisplayName("printA(0)")
assert.Equal(t, wfv1.NodeSucceeded, printAChild.Phase)

// run ExitNode
woc.operate(ctx)
onExitNode := woc.wf.Status.Nodes.FindByDisplayName("printA.onExit")
assert.NotNil(t, onExitNode)
assert.Equal(t, wfv1.NodeRunning, onExitNode.Phase)

// exitNode succeeded
makePodsPhase(ctx, woc, v1.PodSucceeded)
woc.operate(ctx)
onExitNode = woc.wf.Status.Nodes.FindByDisplayName("printA.onExit")
assert.Equal(t, wfv1.NodeSucceeded, onExitNode.Phase)

// run next DAGTask
woc.operate(ctx)
nextDAGTaskNode := woc.wf.Status.Nodes.FindByDisplayName("dependencyTesting")
assert.NotNil(t, nextDAGTaskNode)
assert.Equal(t, wfv1.NodeRunning, nextDAGTaskNode.Phase)
}

0 comments on commit f481e3b

Please sign in to comment.