diff --git a/workflow/controller/agent.go b/workflow/controller/agent.go index a207ed931d49..459c512e9f98 100644 --- a/workflow/controller/agent.go +++ b/workflow/controller/agent.go @@ -69,11 +69,9 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro podName := woc.getAgentPodName() obj, exists, err := woc.controller.podInformer.GetStore().Get(cache.ExplicitKey(woc.wf.Namespace + "/" + podName)) - if err != nil { return nil, fmt.Errorf("failed to get pod from informer store: %w", err) } - if exists { existing, ok := obj.(*apiv1.Pod) if ok { diff --git a/workflow/controller/exit_handler_test.go b/workflow/controller/exit_handler_test.go index 9e39042dbaa2..294d34a00b95 100644 --- a/workflow/controller/exit_handler_test.go +++ b/workflow/controller/exit_handler_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -687,3 +688,63 @@ func TestDagOnExitAndRetryStrategy(t *testing.T) { assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) } + +var testWorkflowOnExitHttpReconciliation = `apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world-sx6lw +spec: + entrypoint: whalesay + onExit: exit-handler + templates: + - container: + args: + - hello world + command: + - cowsay + image: docker/whalesay:latest + name: whalesay + - http: + url: https://example.com + name: exit-handler +status: + nodes: + hello-world-sx6lw: + displayName: hello-world-sx6lw + finishedAt: "2021-10-27T14:38:30Z" + hostNodeName: k3d-k3s-default-server-0 + id: hello-world-sx6lw + name: hello-world-sx6lw + phase: Succeeded + progress: 1/1 + resourcesDuration: + cpu: 2 + memory: 1 + startedAt: "2021-10-27T14:38:27Z" + templateName: whalesay + templateScope: local/hello-world-sx6lw + type: Pod + phase: Running + startedAt: "2021-10-27T14:38:27Z" +` + +func TestWorkflowOnExitHttpReconciliation(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(testWorkflowOnExitHttpReconciliation) + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + + taskSets, err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets("").List(ctx, v1.ListOptions{}) + if assert.NoError(t, err) { + assert.Len(t, taskSets.Items, 0) + } + woc.operate(ctx) + + assert.Len(t, woc.wf.Status.Nodes, 2) + taskSets, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets("").List(ctx, v1.ListOptions{}) + if assert.NoError(t, err) { + assert.Len(t, taskSets.Items, 1) + } +} diff --git a/workflow/controller/http_template.go b/workflow/controller/http_template.go index 65ffbca85754..6a545c5afdee 100644 --- a/workflow/controller/http_template.go +++ b/workflow/controller/http_template.go @@ -1,6 +1,8 @@ package controller import ( + "context" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -12,3 +14,37 @@ func (woc *wfOperationCtx) executeHTTPTemplate(nodeName string, templateScope st } return node } + +func (woc *wfOperationCtx) httpReconciliation(ctx context.Context) { + err := woc.reconcileTaskSet(ctx) + if err != nil { + woc.log.WithError(err).Error("error in workflowtaskset reconciliation") + return + } + + err = woc.reconcileAgentPod(ctx) + if err != nil { + woc.log.WithError(err).Error("error in agent pod reconciliation") + woc.markWorkflowError(ctx, err) + return + } +} + +func (woc *wfOperationCtx) nodeRequiresHttpReconciliation(nodeName string) bool { + node := woc.wf.GetNodeByName(nodeName) + if node == nil { + return false + } + // If this node is of type HTTP, it will need an HTTP reconciliation + if node.Type == wfv1.NodeTypeHTTP { + return true + } + for _, child := range node.Children { + // If any of the node's children need an HTTP reconciliation, the parent node will also need one + if woc.nodeRequiresHttpReconciliation(child) { + return true + } + } + // If neither of the children need one -- or if there are no children -- no HTTP reconciliation is needed. + return false +} diff --git a/workflow/controller/http_template_test.go b/workflow/controller/http_template_test.go new file mode 100644 index 000000000000..9fc8f84d86ff --- /dev/null +++ b/workflow/controller/http_template_test.go @@ -0,0 +1,41 @@ +package controller + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" +) + +func TestNodeRequiresHttpReconciliation(t *testing.T) { + woc := &wfOperationCtx{ + wf: &v1alpha1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-wf", + }, + Status: v1alpha1.WorkflowStatus{ + Nodes: v1alpha1.Nodes{ + "test-wf-1996333140": v1alpha1.NodeStatus{ + Name: "not-needed", + Type: v1alpha1.NodeTypePod, + }, + "test-wf-3939368189": v1alpha1.NodeStatus{ + Name: "parent", + Type: v1alpha1.NodeTypeSteps, + Children: []string{"child-http"}, + }, + "test-wf-1430055856": v1alpha1.NodeStatus{ + Name: "child-http", + Type: v1alpha1.NodeTypeHTTP, + }, + }, + }, + }, + } + + assert.False(t, woc.nodeRequiresHttpReconciliation("not-needed")) + assert.True(t, woc.nodeRequiresHttpReconciliation("child-http")) + assert.True(t, woc.nodeRequiresHttpReconciliation("parent")) +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 37cb8eb9834d..35ef9aa618e6 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -352,18 +352,8 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { return } - err = woc.taskSetReconciliation(ctx) - if err != nil { - woc.log.WithError(err).Error("error in workflowtaskset reconciliation") - return - } - - err = woc.reconcileAgentPod(ctx) - if err != nil { - woc.log.WithError(err).Error("error in agent pod reconciliation") - woc.markWorkflowError(ctx, err) - return - } + // Reconcile TaskSet and Agent for HTTP templates + woc.httpReconciliation(ctx) if node == nil || !node.Fulfilled() { // node can be nil if a workflow created immediately in a parallelism == 0 state @@ -423,6 +413,12 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } return } + + // If the onExit node (or any child of the onExit node) requires HTTP reconciliation, do it here + if onExitNode != nil && woc.nodeRequiresHttpReconciliation(onExitNode.Name) { + woc.httpReconciliation(ctx) + } + if onExitNode == nil || !onExitNode.Fulfilled() { return } diff --git a/workflow/controller/taskset.go b/workflow/controller/taskset.go index 0def937824bd..18a7d52fa736 100644 --- a/workflow/controller/taskset.go +++ b/workflow/controller/taskset.go @@ -69,32 +69,33 @@ func (woc *wfOperationCtx) completeTaskSet(ctx context.Context) error { } func (woc *wfOperationCtx) getWorkflowTaskSet() (*wfv1.WorkflowTaskSet, error) { - taskSet, exist, err := woc.controller.wfTaskSetInformer.Informer().GetIndexer().GetByKey(woc.wf.Namespace + "/" + woc.wf.Name) + taskSet, exists, err := woc.controller.wfTaskSetInformer.Informer().GetIndexer().GetByKey(woc.wf.Namespace + "/" + woc.wf.Name) if err != nil { return nil, err } - if !exist { + if !exists { return nil, nil } - return taskSet.(*wfv1.WorkflowTaskSet), nil } -func (woc *wfOperationCtx) taskSetReconciliation(ctx context.Context) error { - workflowTaskset, err := woc.getWorkflowTaskSet() +func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error { + workflowTaskSet, err := woc.getWorkflowTaskSet() if err != nil { return err } woc.log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Infof("TaskSet Reconciliation") - if workflowTaskset != nil && len(workflowTaskset.Status.Nodes) > 0 { - for nodeID, taskResult := range workflowTaskset.Status.Nodes { + if workflowTaskSet != nil && len(workflowTaskSet.Status.Nodes) > 0 { + for nodeID, taskResult := range workflowTaskSet.Status.Nodes { node := woc.wf.Status.Nodes[nodeID] + node.Outputs = taskResult.Outputs.DeepCopy() node.Phase = taskResult.Phase node.Message = taskResult.Message - woc.wf.Status.Nodes[nodeID] = node node.FinishedAt = metav1.Now() + + woc.wf.Status.Nodes[nodeID] = node woc.updated = true } } @@ -105,6 +106,7 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error { if len(woc.taskSet) == 0 { return nil } + key := fmt.Sprintf("%s/%s", woc.wf.Namespace, woc.wf.Name) log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", key).Infof("Creating TaskSet") taskSet := wfv1.WorkflowTaskSet{ @@ -143,7 +145,6 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error { log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to patch WorkflowTaskSet") return fmt.Errorf("failed to patch TaskSet. %v", err) } - } else if err != nil { log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to create WorkflowTaskSet") return err diff --git a/workflow/controller/taskset_test.go b/workflow/controller/taskset_test.go index 6b1ff83945ae..32a14bc2949e 100644 --- a/workflow/controller/taskset_test.go +++ b/workflow/controller/taskset_test.go @@ -317,9 +317,9 @@ func TestNonHTTPTemplateScenario(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(helloWorldWf) woc := newWorkflowOperationCtx(wf, controller) ctx := context.Background() - t.Run("taskSetReconciliation", func(t *testing.T) { + t.Run("reconcileTaskSet", func(t *testing.T) { woc.operate(ctx) - err := woc.taskSetReconciliation(ctx) + err := woc.reconcileTaskSet(ctx) assert.NoError(t, err) }) t.Run("completeTaskSet", func(t *testing.T) {