diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 99fcfd5b92f6..4d8359ba336d 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -280,7 +280,19 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl // Can happen when dag.target was specified continue } - woc.buildLocalScope(scope, fmt.Sprintf("tasks.%s", task.Name), taskNode) + + prefix := fmt.Sprintf("tasks.%s", task.Name) + if taskNode.Type == wfv1.NodeTypeTaskGroup { + childNodes := make([]wfv1.NodeStatus, len(taskNode.Children)) + for i, childID := range taskNode.Children { + childNodes[i] = woc.wf.Status.Nodes[childID] + } + err := woc.processAggregateNodeOutputs(scope, prefix, childNodes) + if err != nil { + return nil, errors.InternalWrapError(err) + } + } + woc.buildLocalScope(scope, prefix, taskNode) woc.addOutputsToGlobalScope(taskNode.Outputs) } outputs, err := getTemplateOutputsFromScope(tmpl, scope) diff --git a/workflow/controller/dag_test.go b/workflow/controller/dag_test.go index 6eca425bd5fa..10f24d861edb 100644 --- a/workflow/controller/dag_test.go +++ b/workflow/controller/dag_test.go @@ -3150,3 +3150,174 @@ func TestLeafContinueOn(t *testing.T) { woc.operate(ctx) assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) } + +var dagOutputsReferTaskAggregatedOuputs = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: parameter-aggregation-dag-h8b82 +spec: + + entrypoint: parameter-aggregation + templates: + - + dag: + tasks: + - arguments: + parameters: + - name: num + value: '{{item}}' + name: odd-or-even + template: odd-or-even + withItems: + - 1 + - 2 + inputs: {} + metadata: {} + name: parameter-aggregation + outputs: + parameters: + - name: dag-nums + valueFrom: + parameter: '{{tasks.odd-or-even.outputs.parameters.num}}' + - name: dag-evenness + valueFrom: + parameter: '{{tasks.odd-or-even.outputs.parameters.evenness}}' + - + container: + args: + - | + sleep 1 && + echo {{inputs.parameters.num}} > /tmp/num && + if [ $(({{inputs.parameters.num}}%2)) -eq 0 ]; then + echo "even" > /tmp/even; + else + echo "odd" > /tmp/even; + fi + command: + - sh + - -c + image: alpine:latest + name: "" + resources: {} + inputs: + parameters: + - name: num + metadata: {} + name: odd-or-even + outputs: + parameters: + - name: num + valueFrom: + path: /tmp/num + - name: evenness + valueFrom: + path: /tmp/even +status: + nodes: + parameter-aggregation-dag-h8b82: + children: + - parameter-aggregation-dag-h8b82-3379492521 + displayName: parameter-aggregation-dag-h8b82 + finishedAt: "2020-12-09T15:37:07Z" + id: parameter-aggregation-dag-h8b82 + name: parameter-aggregation-dag-h8b82 + outboundNodes: + - parameter-aggregation-dag-h8b82-3175470584 + - parameter-aggregation-dag-h8b82-2243926302 + phase: Running + startedAt: "2020-12-09T15:36:46Z" + templateName: parameter-aggregation + templateScope: local/parameter-aggregation-dag-h8b82 + type: DAG + parameter-aggregation-dag-h8b82-1440345089: + boundaryID: parameter-aggregation-dag-h8b82 + displayName: odd-or-even(1:2) + finishedAt: "2020-12-09T15:36:54Z" + hostNodeName: minikube + id: parameter-aggregation-dag-h8b82-1440345089 + inputs: + parameters: + - name: num + value: "2" + name: parameter-aggregation-dag-h8b82.odd-or-even(1:2) + outputs: + exitCode: "0" + parameters: + - name: num + value: "2" + valueFrom: + path: /tmp/num + - name: evenness + value: even + valueFrom: + path: /tmp/even + phase: Succeeded + startedAt: "2020-12-09T15:36:46Z" + templateName: odd-or-even + templateScope: local/parameter-aggregation-dag-h8b82 + type: Pod + parameter-aggregation-dag-h8b82-3379492521: + boundaryID: parameter-aggregation-dag-h8b82 + children: + - parameter-aggregation-dag-h8b82-3572919299 + - parameter-aggregation-dag-h8b82-1440345089 + displayName: odd-or-even + finishedAt: "2020-12-09T15:36:55Z" + id: parameter-aggregation-dag-h8b82-3379492521 + name: parameter-aggregation-dag-h8b82.odd-or-even + phase: Succeeded + startedAt: "2020-12-09T15:36:46Z" + templateName: odd-or-even + templateScope: local/parameter-aggregation-dag-h8b82 + type: TaskGroup + parameter-aggregation-dag-h8b82-3572919299: + boundaryID: parameter-aggregation-dag-h8b82 + displayName: odd-or-even(0:1) + finishedAt: "2020-12-09T15:36:53Z" + hostNodeName: minikube + id: parameter-aggregation-dag-h8b82-3572919299 + inputs: + parameters: + - name: num + value: "1" + name: parameter-aggregation-dag-h8b82.odd-or-even(0:1) + outputs: + exitCode: "0" + parameters: + - name: num + value: "1" + valueFrom: + path: /tmp/num + - name: evenness + value: odd + valueFrom: + path: /tmp/even + phase: Succeeded + startedAt: "2020-12-09T15:36:46Z" + templateName: odd-or-even + templateScope: local/parameter-aggregation-dag-h8b82 + type: Pod + phase: Succeeded + startedAt: "2020-12-09T15:36:46Z" +` + +func TestDAGReferTaskAggregatedOutputs(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(dagOutputsReferTaskAggregatedOuputs) + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + + dagNode := woc.wf.Status.Nodes.FindByDisplayName("parameter-aggregation-dag-h8b82") + if assert.NotNil(t, dagNode) { + if assert.NotNil(t, dagNode.Outputs) { + if assert.Len(t, dagNode.Outputs.Parameters, 2) { + assert.Equal(t, `["1","2"]`, dagNode.Outputs.Parameters[0].Value.String()) + assert.Equal(t, `["odd","even"]`, dagNode.Outputs.Parameters[1].Value.String()) + } + } + } +} diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index 686bcb4ad189..6ee445f4f84d 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -1192,7 +1192,8 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl resolvedTemplates[task.Name] = resolvedTmpl prefix := fmt.Sprintf("tasks.%s", task.Name) - ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false) + aggregate := len(task.WithItems) > 0 || task.WithParam != "" + ctx.addOutputsToScope(resolvedTmpl, prefix, scope, aggregate, false) err = common.ValidateTaskResults(&task) if err != nil { diff --git a/workflow/validate/validate_dag_test.go b/workflow/validate/validate_dag_test.go index 4bf68ffaea45..a640713896d4 100644 --- a/workflow/validate/validate_dag_test.go +++ b/workflow/validate/validate_dag_test.go @@ -943,3 +943,93 @@ func TestDAGWithDigitNameNoDepends(t *testing.T) { _, err := validate(dagWithDigitNoDepends) assert.NoError(t, err) } + +var dagOutputsResolveTaskAggregatedOutputs = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: loops- +spec: + serviceAccountName: argo + entrypoint: dag + templates: + - name: dag + dag: + tasks: + - name: fanout + template: fanout + arguments: + parameters: + - name: input + value: "[1, 2]" + - name: dag-process + template: sub-dag + depends: fanout + arguments: + parameters: + - name: item + value: '{{item}}' + - name: input + value: '{{tasks.fanout.outputs.parameters.output}}' + withParam: "{{tasks.fanout.outputs.parameters.output}}" + + - name: sub-dag + inputs: + parameters: + - name: input + - name: item + outputs: + parameters: + - name: output + valueFrom: + parameter: "{{tasks.process.outputs.parameters}}" + dag: + tasks: + - name: fanout + template: fanout + arguments: + parameters: + - name: input + value: '{{inputs.parameters.input}}' + - name: process + template: process + depends: fanout + arguments: + parameters: + - name: item + value: '{{item}}' + withParam: "{{tasks.fanout.outputs.parameters.output}}" + + - name: fanout + inputs: + parameters: + - name: input + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["echo {{inputs.parameters.input}} | tee /tmp/output"] + outputs: + parameters: + - name: output + valueFrom: + path: /tmp/output + + - name: process + inputs: + parameters: + - name: item + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["echo {{inputs.parameters.item}} | tee /tmp/output"] + outputs: + parameters: + - name: output + valueFrom: + path: /tmp/output +` + +func TestDAGOutputsResolveTaskAggregatedOutputs(t *testing.T) { + _, err := validate(dagOutputsResolveTaskAggregatedOutputs) + assert.NoError(t, err) +}