Skip to content

Commit

Permalink
fix: make sure taskresult completed when mark node succeed when it ha…
Browse files Browse the repository at this point in the history
…s outputs (#12537)

Signed-off-by: shuangkun <tsk2013uestc@163.com>
Signed-off-by: shuangkun tian <72060326+shuangkun@users.noreply.github.com>
Co-authored-by: Julie Vogelman <julievogelman0@gmail.com>
Signed-off-by: Isitha Subasinghe <isubasinghe@student.unimelb.edu.au>
  • Loading branch information
2 people authored and isubasinghe committed Feb 28, 2024
1 parent 901cfb6 commit a980270
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 20 deletions.
2 changes: 1 addition & 1 deletion api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ WorkflowStatus contains overall status information about a workflow
|`storedTemplates`|[`Template`](#template)|StoredTemplates is a mapping between a template ref and the node's status.|
|`storedWorkflowTemplateSpec`|[`WorkflowSpec`](#workflowspec)|StoredWorkflowSpec stores the WorkflowTemplate spec for future execution.|
|`synchronization`|[`SynchronizationStatus`](#synchronizationstatus)|Synchronization stores the status of synchronization locks|
|`taskResultsCompletionStatus`|`Map< boolean , string >`|TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.|
|`taskResultsCompletionStatus`|`Map< boolean , string >`|TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.|

## CronWorkflowSpec

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,7 @@ type WorkflowStatus struct {
// ArtifactGCStatus maintains the status of Artifact Garbage Collection
ArtifactGCStatus *ArtGCStatus `json:"artifactGCStatus,omitempty" protobuf:"bytes,19,opt,name=artifactGCStatus"`

// TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.
// TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.
TaskResultsCompletionStatus map[string]bool `json:"taskResultsCompletionStatus,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompletionStatus"`
}

Expand All @@ -1967,6 +1967,14 @@ func (ws *WorkflowStatus) TaskResultsInProgress() bool {
return false
}

func (ws *WorkflowStatus) IsTaskResultIncomplete(name string) bool {
value, found := ws.TaskResultsCompletionStatus[name]
if found {
return !value
}
return true
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
return ws.OffloadNodeStatusVersion != ""
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ func makePodsPhase(ctx context.Context, woc *wfOperationCtx, phase apiv1.PodPhas
if err != nil {
panic(err)
}
if phase == apiv1.PodSucceeded {
nodeID := woc.nodeID(&pod)
woc.wf.Status.MarkTaskResultComplete(nodeID)
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func TestStepsOnExitTmplWithArt(t *testing.T) {
},
}
woc.wf.Status.Nodes[idx] = node
woc.wf.Status.MarkTaskResultComplete(node.ID)
}
}
woc1 := newWorkflowOperationCtx(woc.wf, controller)
Expand Down Expand Up @@ -283,6 +284,7 @@ func TestDAGOnExitTmplWithArt(t *testing.T) {
},
}
woc.wf.Status.Nodes[idx] = node
woc.wf.Status.MarkTaskResultComplete(node.ID)
}
}
woc1 := newWorkflowOperationCtx(woc.wf, controller)
Expand Down Expand Up @@ -383,6 +385,7 @@ func TestStepsTmplOnExit(t *testing.T) {
},
}
woc2.wf.Status.Nodes[idx] = node
woc.wf.Status.MarkTaskResultComplete(node.ID)
}
}

Expand Down Expand Up @@ -487,6 +490,7 @@ func TestDAGOnExit(t *testing.T) {
},
}
woc2.wf.Status.Nodes[idx] = node
woc.wf.Status.MarkTaskResultComplete(node.ID)
}
}
woc3 := newWorkflowOperationCtx(woc2.wf, controller)
Expand Down
4 changes: 3 additions & 1 deletion workflow/controller/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ spec:
assert.Equal(t, wfv1.NodePending, node.Phase)
makePodsPhase(ctx, woc, apiv1.PodFailed)
woc = newWorkflowOperationCtx(woc.wf, controller)
err := woc.podReconciliation(ctx)
err, _ := woc.podReconciliation(ctx)
assert.NoError(t, err)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-failures.hooks.failure")
assert.NotNil(t, node)
Expand Down Expand Up @@ -1140,6 +1140,7 @@ spec:
pod, _ := podcs.Get(ctx, "hook-running", metav1.GetOptions{})
pod.Status.Phase = apiv1.PodSucceeded
updatedPod, _ := podcs.Update(ctx, pod, metav1.UpdateOptions{})
woc.wf.Status.MarkTaskResultComplete(woc.nodeID(pod))
_ = woc.controller.podInformer.GetStore().Update(updatedPod)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
Expand Down Expand Up @@ -1231,6 +1232,7 @@ spec:
pod.Status.Phase = apiv1.PodSucceeded
updatedPod, _ := podcs.Update(ctx, &pod, metav1.UpdateOptions{})
_ = woc.controller.podInformer.GetStore().Update(updatedPod)
woc.wf.Status.MarkTaskResultComplete(woc.nodeID(&pod))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress)
Expand Down
28 changes: 24 additions & 4 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.wf.Status.EstimatedDuration = woc.estimateWorkflowDuration()
} else {
woc.workflowDeadline = woc.getWorkflowDeadline()
err = woc.podReconciliation(ctx)
err, podReconciliationCompleted := woc.podReconciliation(ctx)
if err == nil {
woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
}
Expand All @@ -318,6 +318,12 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
// TODO: we need to re-add to the workqueue, but should happen in caller
return
}

if !podReconciliationCompleted {
woc.log.WithField("workflow", woc.wf.ObjectMeta.Name).Info("pod reconciliation didn't complete, will retry")
woc.requeue()
return
}
}

if woc.ShouldSuspend() {
Expand Down Expand Up @@ -1088,15 +1094,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
// pods and update the node state before continuing the evaluation of the workflow.
// Records all pods which were observed completed, which will be labeled completed=true
// after successful persist of the workflow.
func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
// returns whether pod reconciliation successfully completed
func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool) {
podList, err := woc.getAllWorkflowPods()
if err != nil {
return err
return err, false
}
seenPods := make(map[string]*apiv1.Pod)
seenPodLock := &sync.Mutex{}
wfNodesLock := &sync.RWMutex{}
podRunningCondition := wfv1.Condition{Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse}
taskResultIncomplete := false
performAssessment := func(pod *apiv1.Pod) {
if pod == nil {
return
Expand All @@ -1115,6 +1123,12 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err == nil {
if newState := woc.assessNodeStatus(pod, node); newState != nil {
// Check whether its taskresult is in an incompleted state.
if newState.Succeeded() && woc.wf.Status.IsTaskResultIncomplete(node.ID) {
woc.log.WithFields(log.Fields{"nodeID": newState.ID}).Debug("Taskresult of the node not yet completed")
taskResultIncomplete = true
return
}
woc.addOutputsToGlobalScope(newState.Outputs)
if newState.MemoizationStatus != nil {
if newState.Succeeded() {
Expand Down Expand Up @@ -1158,6 +1172,12 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {

wg.Wait()

// If true, it means there are some nodes which have outputs we wanted to be marked succeed, but the node's taskresults didn't completed.
// We should make sure the taskresults processing is complete as it will be possible to reference it in the next step.
if taskResultIncomplete {
return nil, false
}

woc.wf.Status.Conditions.UpsertCondition(podRunningCondition)

// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in
Expand Down Expand Up @@ -1197,7 +1217,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
woc.markNodePhase(node.Name, wfv1.NodeError, "pod deleted")
}
}
return nil
return nil, !taskResultIncomplete
}

func (woc *wfOperationCtx) nodeID(pod *apiv1.Pod) string {
Expand Down
10 changes: 5 additions & 5 deletions workflow/controller/operator_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestSemaphoreTmplLevel(t *testing.T) {
woc_two.operate(ctx)

// Check Node status
err = woc_two.podReconciliation(ctx)
err, _ = woc_two.podReconciliation(ctx)
assert.NoError(t, err)
for _, node := range woc_two.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestSemaphoreScriptTmplLevel(t *testing.T) {
woc_two.operate(ctx)

// Check Node status
err = woc_two.podReconciliation(ctx)
err, _ = woc_two.podReconciliation(ctx)
assert.NoError(t, err)
for _, node := range woc_two.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestSemaphoreScriptConfigMapInDifferentNamespace(t *testing.T) {
woc_two.operate(ctx)

// Check Node status
err = woc_two.podReconciliation(ctx)
err, _ = woc_two.podReconciliation(ctx)
assert.NoError(t, err)
for _, node := range woc_two.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestSemaphoreResourceTmplLevel(t *testing.T) {
woc_two.operate(ctx)

// Check Node status
err = woc_two.podReconciliation(ctx)
err, _ = woc_two.podReconciliation(ctx)
assert.NoError(t, err)
for _, node := range woc_two.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
Expand Down Expand Up @@ -416,7 +416,7 @@ func TestSemaphoreWithOutConfigMap(t *testing.T) {
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
err = woc.podReconciliation(ctx)
err, _ = woc.podReconciliation(ctx)
assert.NoError(t, err)
for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
Expand Down
Loading

0 comments on commit a980270

Please sign in to comment.