Skip to content

Commit

Permalink
fix: Retry with DAG. Fixes #7617 (#7652)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Jan 27, 2022
1 parent 7a3b766 commit 3429b16
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 39 deletions.
2 changes: 1 addition & 1 deletion workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ func retryWorkflow(ctx context.Context, kubeClient kubernetes.Interface, hydrato
continue
}
case wfv1.NodeError, wfv1.NodeFailed, wfv1.NodeOmitted:
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) {
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeTaskGroup || node.Type == wfv1.NodeTypeStepGroup) {
newNode := node.DeepCopy()
newNode.Phase = wfv1.NodeRunning
newNode.Message = ""
Expand Down
102 changes: 64 additions & 38 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,50 +761,76 @@ func TestDeepDeleteNodes(t *testing.T) {
}

func TestRetryWorkflow(t *testing.T) {
ctx := context.Background()
kubeClient := kubefake.NewSimpleClientset()
wfClient := argofake.NewSimpleClientset().ArgoprojV1alpha1().Workflows("my-ns")
createdTime := metav1.Time{Time: time.Now().UTC()}
finishedTime := metav1.Time{Time: createdTime.Add(time.Second * 2)}
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
common.LabelKeyCompleted: "true",
common.LabelKeyWorkflowArchivingStatus: "Pending",
}},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
StartedAt: createdTime,
FinishedAt: finishedTime,
Nodes: map[string]wfv1.NodeStatus{
"failed-node": {Name: "failed-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeFailed, Message: "failed"},
"succeeded-node": {Name: "succeeded-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeSucceeded, Message: "succeeded"}},
},
}

ctx := context.Background()
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, err = RetryWorkflow(ctx, kubeClient, hydratorfake.Always, wfClient, wf.Name, false, "")
if assert.NoError(t, err) {
assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase)
assert.Equal(t, metav1.Time{}, wf.Status.FinishedAt)
assert.True(t, wf.Status.StartedAt.After(createdTime.Time))
assert.NotContains(t, wf.Labels, common.LabelKeyCompleted)
assert.NotContains(t, wf.Labels, common.LabelKeyWorkflowArchivingStatus)
for _, node := range wf.Status.Nodes {
switch node.Phase {
case wfv1.NodeSucceeded:
assert.Equal(t, "succeeded", node.Message)
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Equal(t, createdTime, node.StartedAt)
assert.Equal(t, finishedTime, node.FinishedAt)
case wfv1.NodeFailed:
assert.Equal(t, "", node.Message)
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.Equal(t, metav1.Time{}, node.FinishedAt)
assert.True(t, node.StartedAt.After(createdTime.Time))
t.Run("Steps", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "my-steps",
Labels: map[string]string{
common.LabelKeyCompleted: "true",
common.LabelKeyWorkflowArchivingStatus: "Pending",
},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
StartedAt: createdTime,
FinishedAt: finishedTime,
Nodes: map[string]wfv1.NodeStatus{
"failed-node": {Name: "failed-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeFailed, Message: "failed"},
"succeeded-node": {Name: "succeeded-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeSucceeded, Message: "succeeded"}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, err = RetryWorkflow(ctx, kubeClient, hydratorfake.Noop, wfClient, wf.Name, false, "")
if assert.NoError(t, err) {
assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase)
assert.Equal(t, metav1.Time{}, wf.Status.FinishedAt)
assert.True(t, wf.Status.StartedAt.After(createdTime.Time))
assert.NotContains(t, wf.Labels, common.LabelKeyCompleted)
assert.NotContains(t, wf.Labels, common.LabelKeyWorkflowArchivingStatus)
for _, node := range wf.Status.Nodes {
switch node.Phase {
case wfv1.NodeSucceeded:
assert.Equal(t, "succeeded", node.Message)
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Equal(t, createdTime, node.StartedAt)
assert.Equal(t, finishedTime, node.FinishedAt)
case wfv1.NodeFailed:
assert.Equal(t, "", node.Message)
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.Equal(t, metav1.Time{}, node.FinishedAt)
assert.True(t, node.StartedAt.After(createdTime.Time))
}
}
}
}
})
t.Run("DAG", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "my-dag",
Labels: map[string]string{},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
Nodes: map[string]wfv1.NodeStatus{
"": {Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeTaskGroup}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, err = RetryWorkflow(ctx, kubeClient, hydratorfake.Noop, wfClient, wf.Name, false, "")
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 1) {
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes[""].Phase)
}

}
})
}

func TestFromUnstructuredObj(t *testing.T) {
Expand Down

0 comments on commit 3429b16

Please sign in to comment.