Skip to content

Commit

Permalink
feat: fix bugs in retryWorkflow if failed pod node has children nodes.
Browse files Browse the repository at this point in the history
…Fix argoproj#9244 (argoproj#9285)

* feat: fix bugs in retry workflow if failed node has children nodes.

Signed-off-by: smile-luobin <smile.luobin@gmail.com>

* Fix bugs in retryWorkflow

Signed-off-by: smile-luobin <smile.luobin@gmail.com>

* refactor the method that gets the descendantNodes

Signed-off-by: smile-luobin <smile.luobin@gmail.com>

* do some refactoring

Signed-off-by: smile-luobin <smile.luobin@gmail.com>

* fix bugs, and add more checks in test

Signed-off-by: smile-luobin <smile.luobin@gmail.com>

Signed-off-by: smile-luobin <smile.luobin@gmail.com>
Signed-off-by: juchao <juchao@coscene.io>
  • Loading branch information
smile-luobin authored and juchaosong committed Nov 3, 2022
1 parent 697858a commit 0a048c0
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 9 deletions.
41 changes: 37 additions & 4 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,26 @@ func convertNodeID(newWf *wfv1.Workflow, regex *regexp.Regexp, oldNodeID string,
return newWf.NodeID(newNodeName)
}

func getDescendantNodeIDs(wf *wfv1.Workflow, node wfv1.NodeStatus) []string {
var descendantNodeIDs []string
descendantNodeIDs = append(descendantNodeIDs, node.Children...)
for _, child := range node.Children {
descendantNodeIDs = append(descendantNodeIDs, getDescendantNodeIDs(wf, wf.Status.Nodes[child])...)
}
return descendantNodeIDs
}

func deletePodNodeDuringRetryWorkflow(wf *wfv1.Workflow, node wfv1.NodeStatus, deletedPods map[string]bool, podsToDelete []string) []string {
templateName := getTemplateFromNode(node)
version := GetWorkflowPodNameVersion(wf)
podName := PodName(wf.Name, node.Name, templateName, node.ID, version)
if _, ok := deletedPods[podName]; !ok {
deletedPods[podName] = true
podsToDelete = append(podsToDelete, podName)
}
return podsToDelete
}

// FormulateRetryWorkflow formulates a previous workflow to be retried, deleting all failed steps as well as the onExit node (and children)
func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, parameters []string) (*wfv1.Workflow, []string, error) {

Expand Down Expand Up @@ -791,6 +811,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce

// Iterate the previous nodes. If it was successful Pod carry it forward
deletedNodes := make(map[string]bool)
deletedPods := make(map[string]bool)
var podsToDelete []string
for _, node := range wf.Status.Nodes {
doForceResetNode := false
Expand Down Expand Up @@ -844,10 +865,17 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}

if node.Type == wfv1.NodeTypePod {
templateName := getTemplateFromNode(node)
version := GetWorkflowPodNameVersion(wf)
podName := PodName(wf.Name, node.Name, templateName, node.ID, version)
podsToDelete = append(podsToDelete, podName)
deletedNodes[node.ID] = true
podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)

descendantNodeIDs := getDescendantNodeIDs(wf, node)
for _, descendantNodeID := range descendantNodeIDs {
deletedNodes[descendantNodeID] = true
descendantNode := wf.Status.Nodes[descendantNodeID]
if descendantNode.Type == wfv1.NodeTypePod {
podsToDelete = deletePodNodeDuringRetryWorkflow(wf, descendantNode, deletedPods, podsToDelete)
}
}
} else if node.Name == wf.ObjectMeta.Name {
newNode := node.DeepCopy()
newWF.Status.Nodes[newNode.ID] = resetNode(*newNode)
Expand All @@ -857,6 +885,11 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce

if len(deletedNodes) > 0 {
for _, node := range newWF.Status.Nodes {
if deletedNodes[node.ID] {
delete(newWF.Status.Nodes, node.ID)
continue
}

var newChildren []string
for _, child := range node.Children {
if !deletedNodes[child] {
Expand Down
212 changes: 207 additions & 5 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -920,13 +921,12 @@ func TestFormulateRetryWorkflow(t *testing.T) {
assert.NoError(t, err)
wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=3", nil)
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 5) {
// node #3 & node #4 are deleted and will be recreated. so only 3 nodes in wf.Status.Nodes
if assert.Len(t, wf.Status.Nodes, 3) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["my-nested-dag-1"].Phase)
// These should all be running since the child node #3 belongs up to node #1.
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["1"].Phase)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["2"].Phase)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["3"].Phase)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["4"].Phase)
}
}
})
Expand All @@ -950,14 +950,14 @@ func TestFormulateRetryWorkflow(t *testing.T) {
assert.NoError(t, err)
wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "", nil)
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 5) {
// node #4 is deleted and will be recreated. so only 4 nodes in wf.Status.Nodes
if assert.Len(t, wf.Status.Nodes, 4) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["my-nested-dag-2"].Phase)
// This should be running since it's node #4's parent node.
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["1"].Phase)
// This should be running since it's node #1's child node and node #1 is being retried.
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["2"].Phase)
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["3"].Phase)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["4"].Phase)
}
}
})
Expand Down Expand Up @@ -1048,3 +1048,205 @@ func TestGetTemplateFromNode(t *testing.T) {
assert.Equal(t, tc.expectedTemplateName, actual)
}
}

var retryWorkflowWithFailedNodeHasChildrenNodes = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
labels:
workflows.argoproj.io/completed: "true"
workflows.argoproj.io/phase: Failed
name: test-pipeline-bnzwv
namespace: argo-system
spec:
entrypoint: test-pipeline-dag
templates:
- dag:
tasks:
- name: t1
template: succeeded
- depends: t1
name: t2
template: failed
- depends: t2 || t2.Failed
name: t3
template: succeeded
- depends: t3
name: t4-1
template: succeeded
- depends: t3
name: t4-2
template: succeeded
- depends: t3
name: t4-3
template: failed
name: test-pipeline-dag
- container:
command:
- "true"
image: alpine
name: succeeded
- container:
command:
- "false"
image: alpine
name: failed
status:
conditions:
- status: "False"
type: PodRunning
- status: "True"
type: Completed
finishedAt: "2022-08-04T08:36:31Z"
nodes:
test-pipeline-bnzwv:
children:
- test-pipeline-bnzwv-929756297
displayName: test-pipeline-bnzwv
finishedAt: "2022-08-04T08:36:31Z"
id: test-pipeline-bnzwv
name: test-pipeline-bnzwv
outboundNodes:
- test-pipeline-bnzwv-748480356
- test-pipeline-bnzwv-798813213
- test-pipeline-bnzwv-782035594
phase: Failed
progress: 4/6
startedAt: "2022-08-04T08:35:10Z"
templateName: test-pipeline-dag
templateScope: local/test-pipeline-bnzwv
type: DAG
test-pipeline-bnzwv-748480356:
boundaryID: test-pipeline-bnzwv
displayName: t4-1
finishedAt: "2022-08-04T08:36:19Z"
hostNodeName: node2
id: test-pipeline-bnzwv-748480356
name: test-pipeline-bnzwv.t4-1
outputs:
exitCode: "0"
phase: Succeeded
progress: 1/1
startedAt: "2022-08-04T08:36:11Z"
templateName: succeeded
templateScope: local/test-pipeline-bnzwv
type: Pod
test-pipeline-bnzwv-782035594:
boundaryID: test-pipeline-bnzwv
displayName: t4-3
finishedAt: "2022-08-04T08:36:16Z"
hostNodeName: node1
id: test-pipeline-bnzwv-782035594
message: Error (exit code 1)
name: test-pipeline-bnzwv.t4-3
outputs:
exitCode: "1"
phase: Failed
progress: 0/1
startedAt: "2022-08-04T08:36:11Z"
templateName: failed
templateScope: local/test-pipeline-bnzwv
type: Pod
test-pipeline-bnzwv-798813213:
boundaryID: test-pipeline-bnzwv
displayName: t4-2
finishedAt: "2022-08-04T08:36:19Z"
hostNodeName: node1
id: test-pipeline-bnzwv-798813213
name: test-pipeline-bnzwv.t4-2
outputs:
exitCode: "0"
phase: Succeeded
progress: 1/1
startedAt: "2022-08-04T08:36:11Z"
templateName: succeeded
templateScope: local/test-pipeline-bnzwv
type: Pod
test-pipeline-bnzwv-879423440:
boundaryID: test-pipeline-bnzwv
children:
- test-pipeline-bnzwv-896201059
displayName: t2
finishedAt: "2022-08-04T08:35:39Z"
hostNodeName: node2
id: test-pipeline-bnzwv-879423440
message: Error (exit code 1)
name: test-pipeline-bnzwv.t2
outputs:
exitCode: "1"
phase: Failed
progress: 0/1
startedAt: "2022-08-04T08:35:30Z"
templateName: failed
templateScope: local/test-pipeline-bnzwv
type: Pod
test-pipeline-bnzwv-896201059:
boundaryID: test-pipeline-bnzwv
children:
- test-pipeline-bnzwv-748480356
- test-pipeline-bnzwv-798813213
- test-pipeline-bnzwv-782035594
displayName: t3
finishedAt: "2022-08-04T08:36:00Z"
hostNodeName: node2
id: test-pipeline-bnzwv-896201059
name: test-pipeline-bnzwv.t3
outputs:
exitCode: "0"
phase: Succeeded
progress: 1/1
startedAt: "2022-08-04T08:35:50Z"
templateName: succeeded
templateScope: local/test-pipeline-bnzwv
type: Pod
test-pipeline-bnzwv-929756297:
boundaryID: test-pipeline-bnzwv
children:
- test-pipeline-bnzwv-879423440
displayName: t1
finishedAt: "2022-08-04T08:35:18Z"
hostNodeName: node2
id: test-pipeline-bnzwv-929756297
name: test-pipeline-bnzwv.t1
outputs:
exitCode: "0"
phase: Succeeded
progress: 1/1
startedAt: "2022-08-04T08:35:10Z"
templateName: succeeded
templateScope: local/test-pipeline-bnzwv
type: Pod
phase: Failed
progress: 4/6
resourcesDuration:
cpu: 32
memory: 32
startedAt: "2022-08-04T08:35:10Z"
`

func TestRetryWorkflowWithFailedNodeHasChildrenNodes(t *testing.T) {
ctx := context.Background()
wf := wfv1.MustUnmarshalWorkflow(retryWorkflowWithFailedNodeHasChildrenNodes)
version := GetWorkflowPodNameVersion(wf)
needDeletedNodeNames := []string{"t2", "t3", "t4-1", "t4-2", "t4-3"}
needDeletedPodNames := make([]string, 5)
for i, nodeName := range needDeletedNodeNames {
node := wf.Status.Nodes.FindByDisplayName(nodeName)
templateName := getTemplateFromNode(*node)
podName := PodName(wf.Name, node.Name, templateName, node.ID, version)
needDeletedPodNames[i] = podName
}
slices.Sort(needDeletedPodNames)

wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, false, "", nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(wf.Status.Nodes))
for _, nodeName := range needDeletedNodeNames {
node := wf.Status.Nodes.FindByDisplayName(nodeName)
assert.Nil(t, node)
}

assert.Equal(t, 5, len(podsToDelete))
slices.Sort(podsToDelete)
assert.Equal(t, needDeletedPodNames, podsToDelete)
}

0 comments on commit 0a048c0

Please sign in to comment.