Skip to content

Commit

Permalink
feat: Support retrying complex workflows with nested group nodes (#9499)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrytangyuan committed Sep 15, 2022
1 parent aa45b34 commit 47544cc
Show file tree
Hide file tree
Showing 3 changed files with 716 additions and 191 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,10 @@ func (n Nodes) FindByDisplayName(name string) *NodeStatus {
return n.Find(NodeWithDisplayName(name))
}

func (n Nodes) FindByName(name string) *NodeStatus {
return n.Find(NodeWithName(name))
}

func (in Nodes) Any(f func(NodeStatus) bool) bool {
return in.Find(f) != nil
}
Expand All @@ -1717,6 +1721,10 @@ func (n Nodes) Find(f func(NodeStatus) bool) *NodeStatus {
return nil
}

func NodeWithName(name string) func(n NodeStatus) bool {
return func(n NodeStatus) bool { return n.Name == name }
}

func NodeWithDisplayName(name string) func(n NodeStatus) bool {
return func(n NodeStatus) bool { return n.DisplayName == name }
}
Expand Down
131 changes: 91 additions & 40 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,42 @@ func deletePodNodeDuringRetryWorkflow(wf *wfv1.Workflow, node wfv1.NodeStatus, d
return deletedPods, podsToDelete
}

func containsNode(nodes []string, node string) bool {
for _, e := range nodes {
if e == node {
return true
}
}
return false
}

func isGroupNode(node wfv1.NodeStatus) bool {
return node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeTaskGroup || node.Type == wfv1.NodeTypeStepGroup || node.Type == wfv1.NodeTypeSteps
}

func resetConnectedParentGroupNodes(oldWF *wfv1.Workflow, newWF *wfv1.Workflow, currentNode wfv1.NodeStatus, resetParentGroupNodes []string) (*wfv1.Workflow, []string) {
currentNodeID := currentNode.ID
for {
currentNode := oldWF.Status.Nodes[currentNodeID]
if !containsNode(resetParentGroupNodes, currentNodeID) {
newWF.Status.Nodes[currentNodeID] = resetNode(*currentNode.DeepCopy())
resetParentGroupNodes = append(resetParentGroupNodes, currentNodeID)
log.Debugf("Reset connected group node %s", currentNode.Name)
}
if currentNode.BoundaryID != "" && currentNode.BoundaryID != oldWF.ObjectMeta.Name {
parentNode := oldWF.Status.Nodes[currentNode.BoundaryID]
if isGroupNode(parentNode) {
currentNodeID = parentNode.ID
} else {
break
}
} else {
break
}
}
return newWF, resetParentGroupNodes
}

// FormulateRetryWorkflow formulates a previous workflow to be retried, deleting all failed steps as well as the onExit node (and children)
func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, parameters []string) (*wfv1.Workflow, []string, error) {

Expand Down Expand Up @@ -813,6 +849,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
deletedNodes := make(map[string]bool)
deletedPods := make(map[string]bool)
var podsToDelete []string
var resetParentGroupNodes []string
for _, node := range wf.Status.Nodes {
doForceResetNode := false
if _, present := nodeIDsToReset[node.ID]; present {
Expand All @@ -821,41 +858,70 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if !strings.HasPrefix(node.Name, onExitNodeName) && !doForceResetNode {
if doForceResetNode {
log.Debugf("Force reset for node: %s", node.Name)
// Reset parent node if this node is a step/task group or DAG.
if (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeTaskGroup || node.Type == wfv1.NodeTypeStepGroup) && node.BoundaryID != "" {
parentNodeID := node.BoundaryID
if parentNodeID != wf.ObjectMeta.Name { // Skip root node
for _, tmplNode := range wf.Status.Nodes {
// Reset the parent node
if tmplNode.ID == parentNodeID {
log.Debugln(fmt.Sprintf("Resetting parent node %s", parentNodeID))
newParentNode := tmplNode.DeepCopy()
newWF.Status.Nodes[tmplNode.ID] = resetNode(*newParentNode)
}
// If the node belongs to a node group that needs to be retried, reset its status
if tmplNode.BoundaryID == parentNodeID {
log.Debugln(fmt.Sprintf("Resetting child node %s since its parent node %s is being retried", tmplNode.ID, parentNodeID))
newChildNode := tmplNode.DeepCopy()
newWF.Status.Nodes[tmplNode.ID] = resetNode(*newChildNode)
if isGroupNode(node) && node.BoundaryID != "" {
if node.ID != wf.ObjectMeta.Name { // Skip root node
descendantNodeIDs := getDescendantNodeIDs(wf, node)
var nodeGroupNeedsReset bool
// Only reset DAG that's in the same branch as the nodeIDsToReset
for _, child := range descendantNodeIDs {
childNode := wf.Status.Nodes[child]
if _, present := nodeIDsToReset[child]; present {
log.Debugf("Group node %s needs to reset since its child %s is in the force reset path", node.Name, childNode.Name)
nodeGroupNeedsReset = true
break
}
}
if nodeGroupNeedsReset {
newWF, resetParentGroupNodes = resetConnectedParentGroupNodes(wf, newWF, node, resetParentGroupNodes)
}
}
} else {
if node.Type == wfv1.NodeTypePod || node.Type == wfv1.NodeTypeSuspend {
newWF, resetParentGroupNodes = resetConnectedParentGroupNodes(wf, newWF, node, resetParentGroupNodes)
// Only remove the descendants of a suspended node but not the suspended node itself. The descendants
// of a suspended node need to be removed since the conditions should be re-evaluated based on
// the modified supplied parameter values.
if node.Type != wfv1.NodeTypeSuspend {
deletedNodes[node.ID] = true
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
log.Debugf("Deleted pod node: %s", node.Name)
}

descendantNodeIDs := getDescendantNodeIDs(wf, node)
for _, descendantNodeID := range descendantNodeIDs {
deletedNodes[descendantNodeID] = true
descendantNode := wf.Status.Nodes[descendantNodeID]
if descendantNode.Type == wfv1.NodeTypePod {
newWF, resetParentGroupNodes = resetConnectedParentGroupNodes(wf, newWF, node, resetParentGroupNodes)
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, descendantNode, deletedPods, podsToDelete)
log.Debugf("Deleted pod node %s since it belongs to node %s", descendantNode.Name, node.Name)
}
}
} else {
log.Debugf("Reset non-pod/suspend node %s", node.Name)
newNode := node.DeepCopy()
newWF.Status.Nodes[newNode.ID] = resetNode(*newNode)
}
}
} else {
if !containsNode(resetParentGroupNodes, node.ID) {
log.Debugf("Node %s remains as is", node.Name)
newWF.Status.Nodes[node.ID] = node
}
continue
}
if doForceResetNode {
newNode := node.DeepCopy()
newWF.Status.Nodes[newNode.ID] = resetNode(*newNode)
}
case wfv1.NodeError, wfv1.NodeFailed, wfv1.NodeOmitted:
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeTaskGroup || node.Type == wfv1.NodeTypeStepGroup) {
if !strings.HasPrefix(node.Name, onExitNodeName) && isGroupNode(node) {
newNode := node.DeepCopy()
newWF.Status.Nodes[newNode.ID] = resetNode(*newNode)
log.Debugf("Reset %s node %s since it's a group node", node.Name, string(node.Phase))
continue
} else {
log.Debugf("Deleted %s node %s since it's not a group node", node.Name, string(node.Phase))
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
log.Debugf("Deleted pod node: %s", node.Name)
deletedNodes[node.ID] = true
}
// do not add this status to the node. pretend as if this node never existed.
Expand All @@ -864,24 +930,8 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
return nil, nil, errors.InternalErrorf("Workflow cannot be retried with node %s in %s phase", node.Name, node.Phase)
}

if node.Type == wfv1.NodeTypePod || node.Type == wfv1.NodeTypeSuspend {
// Only remove the descendants of a suspended node but not the suspended node itself. The descendants
// of a suspended node need to be removed since because the conditions should be re-evaluated based on
// the modified supplied parameter values.
if node.Type != wfv1.NodeTypeSuspend {
deletedNodes[node.ID] = true
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
}

descendantNodeIDs := getDescendantNodeIDs(wf, node)
for _, descendantNodeID := range descendantNodeIDs {
deletedNodes[descendantNodeID] = true
descendantNode := wf.Status.Nodes[descendantNodeID]
if descendantNode.Type == wfv1.NodeTypePod {
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, descendantNode, deletedPods, podsToDelete)
}
}
} else if node.Name == wf.ObjectMeta.Name {
if node.Name == wf.ObjectMeta.Name {
log.Debugf("Reset root node: %s", node.Name)
newNode := node.DeepCopy()
newWF.Status.Nodes[newNode.ID] = resetNode(*newNode)
continue
Expand All @@ -891,6 +941,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
if len(deletedNodes) > 0 {
for _, node := range newWF.Status.Nodes {
if deletedNodes[node.ID] {
log.Debugf("Removed node: %s", node.Name)
delete(newWF.Status.Nodes, node.ID)
continue
}
Expand Down
Loading

0 comments on commit 47544cc

Please sign in to comment.