diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index fc1864c8ad71..2670a48f4473 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -412,8 +412,13 @@ type ShutdownStrategy string const ( ShutdownStrategyTerminate ShutdownStrategy = "Terminate" ShutdownStrategyStop ShutdownStrategy = "Stop" + ShutdownStrategyNone ShutdownStrategy = "" ) +func (s ShutdownStrategy) Enabled() bool { + return s != ShutdownStrategyNone +} + func (s ShutdownStrategy) ShouldExecute(isOnExitPod bool) bool { switch s { case ShutdownStrategyTerminate: diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go index 97d21644d7a6..1df5d4dd3f25 100644 --- a/workflow/controller/exec_control.go +++ b/workflow/controller/exec_control.go @@ -27,18 +27,18 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1 return nil case apiv1.PodPending: // Check if we are currently shutting down - if woc.execWf.Spec.Shutdown != "" { + if woc.GetShutdownStrategy().Enabled() { // Only delete pods that are not part of an onExit handler if we are "Stopping" or all pods if we are "Terminating" _, onExitPod := pod.Labels[common.LabelKeyOnExit] - if !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) { - woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.wf.Spec.Shutdown) + if !woc.GetShutdownStrategy().ShouldExecute(onExitPod) { + woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.GetShutdownStrategy()) err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err == nil { wfNodesLock.Lock() defer wfNodesLock.Unlock() node := woc.wf.Status.Nodes[pod.Name] - woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.execWf.Spec.Shutdown)) + woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy())) return nil } // If we fail to delete the pod, fall back to setting the annotation @@ -75,8 +75,8 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1 } for _, c := range woc.findTemplate(pod).GetMainContainerNames() { - if woc.wf.Spec.Shutdown != "" { - if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) { + if woc.GetShutdownStrategy().Enabled() { + if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.GetShutdownStrategy().ShouldExecute(onExitPod) { podExecCtl.Deadline = &time.Time{} woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name) return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index b16154ec0f22..030aaffdbc8f 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -101,9 +101,7 @@ type wfOperationCtx struct { // execWf holds the Workflow for use in execution. // In Normal workflow scenario: It holds copy of workflow object // In Submit From WorkflowTemplate: It holds merged workflow with WorkflowDefault, Workflow and WorkflowTemplate - // 'execWf.Spec' should usually be used instead `wf.Spec`, with two exceptions for user editable fields: - // 1. `wf.Spec.Suspend` - // 2. `wf.Spec.Shutdown` + // 'execWf.Spec' should usually be used instead `wf.Spec` execWf *wfv1.Workflow } @@ -299,7 +297,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } } - if woc.wf.Spec.Suspend != nil && *woc.wf.Spec.Suspend { + if woc.ShouldSuspend() { woc.log.Infof("workflow suspended") return } @@ -373,7 +371,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { }[node.Phase] var onExitNode *wfv1.NodeStatus - if woc.execWf.Spec.OnExit != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) { + if woc.execWf.Spec.OnExit != "" && woc.GetShutdownStrategy().ShouldExecute(true) { woc.globalParams[common.GlobalVarWorkflowStatus] = string(workflowStatus) var failures []failedNodeStatus @@ -412,8 +410,8 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } var workflowMessage string - if node.FailedOrError() && woc.execWf.Spec.Shutdown != "" { - workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown) + if node.FailedOrError() && woc.GetShutdownStrategy().Enabled() { + workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy()) } else { workflowMessage = node.Message } @@ -753,10 +751,10 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil } - if woc.execWf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) { + if woc.GetShutdownStrategy().Enabled() || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) { var message string - if woc.execWf.Spec.Shutdown != "" { - message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown) + if woc.GetShutdownStrategy().Enabled() { + message = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy()) } else { message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline) } @@ -1018,12 +1016,12 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool { // fails any suspended and pending nodes if the workflow deadline has passed func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() { deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) - if woc.execWf.Spec.Shutdown != "" || deadlineExceeded { + if woc.GetShutdownStrategy().Enabled() || deadlineExceeded { for _, node := range woc.wf.Status.Nodes { if node.IsActiveSuspendNode() || (node.Phase == wfv1.NodePending && deadlineExceeded) { var message string - if woc.execWf.Spec.Shutdown != "" { - message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown) + if woc.GetShutdownStrategy().Enabled() { + message = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy()) } else { message = "Step exceeded its deadline" } @@ -2927,7 +2925,7 @@ func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resou } func (woc *wfOperationCtx) runOnExitNode(ctx context.Context, templateRef, parentDisplayName, parentNodeName, boundaryID string, tmplCtx *templateresolution.Context) (bool, *wfv1.NodeStatus, error) { - if templateRef != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) { + if templateRef != "" && woc.GetShutdownStrategy().ShouldExecute(true) { woc.log.Infof("Running OnExit handler: %s", templateRef) onExitNodeName := common.GenerateOnExitNodeName(parentDisplayName) onExitNode, err := woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: templateRef}, tmplCtx, woc.execWf.Spec.Arguments, &executeTemplateOpts{ @@ -3182,15 +3180,31 @@ func (woc *wfOperationCtx) setExecWorkflow() error { return nil } +func (woc *wfOperationCtx) GetShutdownStrategy() wfv1.ShutdownStrategy { + return woc.execWf.Spec.Shutdown +} + +func (woc *wfOperationCtx) ShouldSuspend() bool { + return woc.execWf.Spec.Suspend != nil && *woc.execWf.Spec.Suspend +} + +func (woc *wfOperationCtx) needsStoredWfSpecUpdate() bool { + // woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x + // Need to recalculate StoredWorkflowSpec in 2.12.x format. + // This check can be removed once all user migrated from 2.11.x to 2.12.x + return woc.wf.Status.StoredWorkflowSpec == nil || (woc.wf.Spec.Entrypoint != "" && woc.wf.Status.StoredWorkflowSpec.Entrypoint == "") || + (woc.wf.Spec.Suspend != nil && woc.wf.Status.StoredWorkflowSpec.Suspend == nil) || + (woc.wf.Spec.Shutdown != "" && woc.wf.Status.StoredWorkflowSpec.Shutdown == "") || + (woc.wf.Spec.Shutdown != woc.wf.Status.StoredWorkflowSpec.Shutdown) +} + func (woc *wfOperationCtx) setStoredWfSpec() error { wfDefault := woc.controller.Config.WorkflowDefaults if wfDefault == nil { wfDefault = &wfv1.Workflow{} } - // woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x - // Need to recalculate StoredWorkflowSpec in 2.12.x format. - // This check can be removed once all user migrated from 2.11.x to 2.12.x - if woc.wf.Status.StoredWorkflowSpec == nil || woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" { + + if woc.needsStoredWfSpecUpdate() { wftHolder, err := woc.fetchWorkflowSpec() if err != nil { return err diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 00aeaa3021e5..0621278a951e 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -5890,6 +5890,71 @@ func TestWorkflowScheduledTimeVariable(t *testing.T) { assert.Equal(t, "2006-01-02T15:04:05-07:00", woc.globalParams[common.GlobalVarWorkflowCronScheduleTime]) } +func TestWorkflowShutdownStrategy(t *testing.T) { + wf := unmarshalWF(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: whalesay + namespace: default +spec: + entrypoint: whalesay + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["cowsay hellow"]`) + + cancel, controller := newController() + defer cancel() + wf1 := wf.DeepCopy() + t.Run("StopStrategy", func(t *testing.T) { + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + + for _, node := range woc.wf.Status.Nodes { + assert.Equal(t, wfv1.NodePending, node.Phase) + } + // Updating Pod state + makePodsPhase(ctx, woc, apiv1.PodPending) + // Simulate the Stop command + wf1 := woc.wf + wf1.Spec.Shutdown = wfv1.ShutdownStrategyStop + woc1 := newWorkflowOperationCtx(wf1, controller) + woc1.operate(ctx) + + node := woc1.wf.Status.Nodes.FindByDisplayName("whalesay") + if assert.NotNil(t, node) { + assert.Contains(t, node.Message, "workflow shutdown with strategy: Stop") + } + }) + + t.Run("TerminateStrategy", func(t *testing.T) { + ctx := context.Background() + woc := newWorkflowOperationCtx(wf1, controller) + woc.operate(ctx) + + for _, node := range woc.wf.Status.Nodes { + assert.Equal(t, wfv1.NodePending, node.Phase) + } + // Updating Pod state + makePodsPhase(ctx, woc, apiv1.PodPending) + // Simulate the Terminate command + wfOut := woc.wf + wfOut.Spec.Shutdown = wfv1.ShutdownStrategyTerminate + woc1 := newWorkflowOperationCtx(wfOut, controller) + woc1.operate(ctx) + for _, node := range woc1.wf.Status.Nodes { + if assert.NotNil(t, node) { + assert.Contains(t, node.Message, "workflow shutdown with strategy") + assert.Contains(t, node.Message, "Terminate") + } + } + }) +} + const resultVarRefWf = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow diff --git a/workflow/controller/operator_workflow_template_ref_test.go b/workflow/controller/operator_workflow_template_ref_test.go index 8af9e222d7e3..7113a6389f3f 100644 --- a/workflow/controller/operator_workflow_template_ref_test.go +++ b/workflow/controller/operator_workflow_template_ref_test.go @@ -5,6 +5,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/utils/pointer" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util" @@ -283,3 +285,75 @@ func TestWorkflowTemplateRefGetArtifactsFromTemplate(t *testing.T) { assert.Equal(t, "data-file", woc.execWf.Spec.Arguments.Artifacts[2].Name) }) } + +func TestWorkflowTemplateRefWithShutdownAndSuspend(t *testing.T) { + cancel, controller := newController(unmarshalWF(wfWithTmplRef), unmarshalWFTmpl(wfTmpl)) + defer cancel() + t.Run("EntrypointMissingInStoredWfSpec", func(t *testing.T) { + ctx := context.Background() + woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller) + woc.operate(ctx) + assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend) + wf1 := woc.wf.DeepCopy() + // Updating Pod state + makePodsPhase(ctx, woc, apiv1.PodPending) + wf1.Status.StoredWorkflowSpec.Entrypoint = "" + woc1 := newWorkflowOperationCtx(wf1, controller) + woc1.operate(ctx) + assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Entrypoint) + assert.Equal(t, woc.wf.Spec.Entrypoint, woc1.wf.Status.StoredWorkflowSpec.Entrypoint) + }) + + t.Run("WorkflowTemplateRefWithSuspend", func(t *testing.T) { + ctx := context.Background() + woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller) + woc.operate(ctx) + assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend) + wf1 := woc.wf.DeepCopy() + // Updating Pod state + makePodsPhase(ctx, woc, apiv1.PodPending) + wf1.Spec.Suspend = pointer.BoolPtr(true) + woc1 := newWorkflowOperationCtx(wf1, controller) + woc1.operate(ctx) + assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Suspend) + assert.True(t, *woc1.wf.Status.StoredWorkflowSpec.Suspend) + }) + t.Run("WorkflowTemplateRefWithShutdownTerminate", func(t *testing.T) { + ctx := context.Background() + woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller) + woc.operate(ctx) + assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown) + wf1 := woc.wf.DeepCopy() + // Updating Pod state + makePodsPhase(ctx, woc, apiv1.PodPending) + wf1.Spec.Shutdown = wfv1.ShutdownStrategyTerminate + woc1 := newWorkflowOperationCtx(wf1, controller) + woc1.operate(ctx) + assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown) + assert.Equal(t, wfv1.ShutdownStrategyTerminate, woc1.wf.Status.StoredWorkflowSpec.Shutdown) + for _, node := range woc1.wf.Status.Nodes { + if assert.NotNil(t, node) { + assert.Contains(t, node.Message, "workflow shutdown with strategy: Terminate") + } + } + }) + t.Run("WorkflowTemplateRefWithShutdownStop", func(t *testing.T) { + ctx := context.Background() + woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller) + woc.operate(ctx) + assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown) + wf1 := woc.wf.DeepCopy() + // Updating Pod state + makePodsPhase(ctx, woc, apiv1.PodPending) + wf1.Spec.Shutdown = wfv1.ShutdownStrategyStop + woc1 := newWorkflowOperationCtx(wf1, controller) + woc1.operate(ctx) + assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown) + assert.Equal(t, wfv1.ShutdownStrategyStop, woc1.wf.Status.StoredWorkflowSpec.Shutdown) + for _, node := range woc1.wf.Status.Nodes { + if assert.NotNil(t, node) { + assert.Contains(t, node.Message, "workflow shutdown with strategy: Stop") + } + } + }) +} diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 7ada9d9f802e..5a0e1c3cb057 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -155,9 +155,9 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin } } - if !woc.execWf.Spec.Shutdown.ShouldExecute(opts.onExitPod) { + if !woc.GetShutdownStrategy().ShouldExecute(opts.onExitPod) { // Do not create pods if we are shutting down - woc.markNodePhase(nodeName, wfv1.NodeSkipped, fmt.Sprintf("workflow shutdown with strategy: %s", woc.execWf.Spec.Shutdown)) + woc.markNodePhase(nodeName, wfv1.NodeSkipped, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy())) return nil, nil }