Skip to content

Commit

Permalink
fix: Descendants of suspended nodes need to be removed when retrying …
Browse files Browse the repository at this point in the history
…workflow (#9440)

* fix: Descendants of suspended nodes need to be removed when retrying workflow

Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>

* chore: add tests

Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>

Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>
  • Loading branch information
terrytangyuan committed Aug 25, 2022
1 parent a09172a commit ff41099
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
11 changes: 8 additions & 3 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,9 +864,14 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
return nil, nil, errors.InternalErrorf("Workflow cannot be retried with node %s in %s phase", node.Name, node.Phase)
}

if node.Type == wfv1.NodeTypePod {
deletedNodes[node.ID] = true
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
if node.Type == wfv1.NodeTypePod || node.Type == wfv1.NodeTypeSuspend {
// Only remove the descendants of a suspended node but not the suspended node itself. The descendants
// of a suspended node need to be removed since because the conditions should be re-evaluated based on
// the modified supplied parameter values.
if node.Type != wfv1.NodeTypeSuspend {
deletedNodes[node.ID] = true
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
}

descendantNodeIDs := getDescendantNodeIDs(wf, node)
for _, descendantNodeID := range descendantNodeIDs {
Expand Down
23 changes: 15 additions & 8 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,21 +910,28 @@ func TestFormulateRetryWorkflow(t *testing.T) {
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
Nodes: map[string]wfv1.NodeStatus{
"my-nested-dag-1": {ID: "my-nested-dag-1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypeTaskGroup},
"suspended": {ID: "suspended", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypeSuspend, BoundaryID: "my-nested-dag-1", Outputs: &wfv1.Outputs{Parameters: []wfv1.Parameter{{
Name: "param-1",
Value: wfv1.AnyStringPtr("3"),
ValueFrom: &wfv1.ValueFrom{Supplied: &wfv1.SuppliedValueFrom{}},
}}}},
"skipped": {ID: "skipped", Phase: wfv1.NodeSkipped, Type: wfv1.NodeTypeSkipped, BoundaryID: "suspended"},
"entrypoint": {ID: "entrypoint", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypeTaskGroup, Children: []string{"suspended", "skipped"}},
"suspended": {
ID: "suspended",
Phase: wfv1.NodeSucceeded,
Type: wfv1.NodeTypeSuspend,
BoundaryID: "entrypoint",
Children: []string{"child"},
Outputs: &wfv1.Outputs{Parameters: []wfv1.Parameter{{
Name: "param-1",
Value: wfv1.AnyStringPtr("3"),
ValueFrom: &wfv1.ValueFrom{Supplied: &wfv1.SuppliedValueFrom{}},
}}}},
"child": {ID: "child", Phase: wfv1.NodeSkipped, Type: wfv1.NodeTypeSkipped, BoundaryID: "suspended"},
"skipped": {ID: "skipped", Phase: wfv1.NodeSkipped, Type: wfv1.NodeTypeSkipped, BoundaryID: "entrypoint"},
}},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=suspended", nil)
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 3) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["my-nested-dag-1"].Phase)
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["entrypoint"].Phase)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["suspended"].Phase)
assert.Equal(t, wfv1.Parameter{
Name: "param-1",
Expand Down

0 comments on commit ff41099

Please sign in to comment.