diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 00dc071dfb22..63662738008f 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1207,56 +1207,56 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu } } } - - if !node.Fulfilled() && newDaemonStatus != nil { - if !*newDaemonStatus { - // if the daemon status switched to false, we prefer to just unset daemoned status field - // (as opposed to setting it to false) - newDaemonStatus = nil - } - if (newDaemonStatus != nil && node.Daemoned == nil) || (newDaemonStatus == nil && node.Daemoned != nil) { - woc.log.Infof("Setting node %v daemoned: %v -> %v", node.ID, node.Daemoned, newDaemonStatus) - node.Daemoned = newDaemonStatus - updated = true - if pod.Status.PodIP != "" && pod.Status.PodIP != node.PodIP { - // only update Pod IP for daemoned nodes to reduce number of updates - woc.log.Infof("Updating daemon node %s IP %s -> %s", node.ID, node.PodIP, pod.Status.PodIP) - node.PodIP = pod.Status.PodIP + if !node.Completed() { + if newDaemonStatus != nil { + if !*newDaemonStatus { + // if the daemon status switched to false, we prefer to just unset daemoned status field + // (as opposed to setting it to false) + newDaemonStatus = nil + } + if (newDaemonStatus != nil && node.Daemoned == nil) || (newDaemonStatus == nil && node.Daemoned != nil) { + woc.log.Infof("Setting node %v daemoned: %v -> %v", node.ID, node.Daemoned, newDaemonStatus) + node.Daemoned = newDaemonStatus + updated = true + if pod.Status.PodIP != "" && pod.Status.PodIP != node.PodIP { + // only update Pod IP for daemoned nodes to reduce number of updates + woc.log.Infof("Updating daemon node %s IP %s -> %s", node.ID, node.PodIP, pod.Status.PodIP) + node.PodIP = pod.Status.PodIP + } } } - } - // we only need to update these values if the container transitions to complete - if !node.Phase.Fulfilled() && newPhase.Fulfilled() { - // outputs are mixed between the annotation (parameters, artifacts, and result) and the pod's status (exit code) - if exitCode := getExitCode(pod); exitCode != nil { - woc.log.Infof("Updating node %s exit code %d", node.ID, *exitCode) - node.Outputs = &wfv1.Outputs{ExitCode: pointer.StringPtr(fmt.Sprintf("%d", int(*exitCode)))} - if outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]; ok { - woc.log.Infof("Setting node %v outputs: %s", node.ID, outputStr) - if err := json.Unmarshal([]byte(outputStr), node.Outputs); err != nil { // I don't expect an error to ever happen in production - node.Phase = wfv1.NodeError - node.Message = err.Error() + // we only need to update these values if the container transitions to complete + if newPhase.Fulfilled() { + // outputs are mixed between the annotation (parameters, artifacts, and result) and the pod's status (exit code) + if exitCode := getExitCode(pod); exitCode != nil { + woc.log.Infof("Updating node %s exit code %d", node.ID, *exitCode) + node.Outputs = &wfv1.Outputs{ExitCode: pointer.StringPtr(fmt.Sprintf("%d", int(*exitCode)))} + if outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]; ok { + woc.log.Infof("Setting node %v outputs: %s", node.ID, outputStr) + if err := json.Unmarshal([]byte(outputStr), node.Outputs); err != nil { // I don't expect an error to ever happen in production + node.Phase = wfv1.NodeError + node.Message = err.Error() + } } } } - } - if node.Phase != newPhase { - woc.log.Infof("Updating node %s status %s -> %s", node.ID, node.Phase, newPhase) - // if we are transitioning from Pending to a different state, clear out pending message - if node.Phase == wfv1.NodePending { - node.Message = "" + if node.Phase != newPhase { + woc.log.Infof("Updating node %s status %s -> %s", node.ID, node.Phase, newPhase) + // if we are transitioning from Pending to a different state, clear out pending message + if node.Phase == wfv1.NodePending { + node.Message = "" + } + updated = true + node.Phase = newPhase + } + if message != "" && node.Message != message { + woc.log.Infof("Updating node %s message: %s", node.ID, message) + updated = true + node.Message = message } - updated = true - node.Phase = newPhase - } - if message != "" && node.Message != message { - woc.log.Infof("Updating node %s message: %s", node.ID, message) - updated = true - node.Message = message } - if node.Fulfilled() && node.FinishedAt.IsZero() { updated = true node.FinishedAt = getLatestFinishedAt(pod) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 27e1e4ddb467..c5b7bf32e2a6 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1300,10 +1300,24 @@ func TestAssessNodeStatus(t *testing.T) { Phase: apiv1.PodRunning, }, } - node := &wfv1.NodeStatus{Phase: wfv1.NodeFailed} + node := &wfv1.NodeStatus{Daemoned: &daemoned, Phase: wfv1.NodeFailed} woc := newWorkflowOperationCtx(wf, controller) got := woc.assessNodeStatus(pod, node) - assert.True(t, got.Daemoned == nil) + assert.True(t, got.Phase == wfv1.NodeFailed) + }) + + t.Run("Daemon Step finished - Pod running", func(t *testing.T) { + cancel, controller := newController() + defer cancel() + pod := &apiv1.Pod{ + Status: apiv1.PodStatus{ + Phase: apiv1.PodRunning, + }, + } + node := &wfv1.NodeStatus{Daemoned: &daemoned, Phase: wfv1.NodeSucceeded} + woc := newWorkflowOperationCtx(wf, controller) + got := woc.assessNodeStatus(pod, node) + assert.True(t, got.Phase == wfv1.NodeSucceeded) }) }