Skip to content

Commit

Permalink
fix: Fix bugs, unable to resolve tasks aggregated outputs in dag outp…
Browse files Browse the repository at this point in the history
…uts. Fixes #6684 (#6692)

Signed-off-by: smile-luobin <smile.luobin@gmail.com>
  • Loading branch information
smile-luobin authored and sarabala1979 committed Sep 28, 2021
1 parent 6e93af0 commit 3a98174
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 2 deletions.
14 changes: 13 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,19 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
// Can happen when dag.target was specified
continue
}
woc.buildLocalScope(scope, fmt.Sprintf("tasks.%s", task.Name), taskNode)

prefix := fmt.Sprintf("tasks.%s", task.Name)
if taskNode.Type == wfv1.NodeTypeTaskGroup {
childNodes := make([]wfv1.NodeStatus, len(taskNode.Children))
for i, childID := range taskNode.Children {
childNodes[i] = woc.wf.Status.Nodes[childID]
}
err := woc.processAggregateNodeOutputs(scope, prefix, childNodes)
if err != nil {
return nil, errors.InternalWrapError(err)
}
}
woc.buildLocalScope(scope, prefix, taskNode)
woc.addOutputsToGlobalScope(taskNode.Outputs)
}
outputs, err := getTemplateOutputsFromScope(tmpl, scope)
Expand Down
171 changes: 171 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3150,3 +3150,174 @@ func TestLeafContinueOn(t *testing.T) {
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

var dagOutputsReferTaskAggregatedOuputs = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: parameter-aggregation-dag-h8b82
spec:
entrypoint: parameter-aggregation
templates:
-
dag:
tasks:
- arguments:
parameters:
- name: num
value: '{{item}}'
name: odd-or-even
template: odd-or-even
withItems:
- 1
- 2
inputs: {}
metadata: {}
name: parameter-aggregation
outputs:
parameters:
- name: dag-nums
valueFrom:
parameter: '{{tasks.odd-or-even.outputs.parameters.num}}'
- name: dag-evenness
valueFrom:
parameter: '{{tasks.odd-or-even.outputs.parameters.evenness}}'
-
container:
args:
- |
sleep 1 &&
echo {{inputs.parameters.num}} > /tmp/num &&
if [ $(({{inputs.parameters.num}}%2)) -eq 0 ]; then
echo "even" > /tmp/even;
else
echo "odd" > /tmp/even;
fi
command:
- sh
- -c
image: alpine:latest
name: ""
resources: {}
inputs:
parameters:
- name: num
metadata: {}
name: odd-or-even
outputs:
parameters:
- name: num
valueFrom:
path: /tmp/num
- name: evenness
valueFrom:
path: /tmp/even
status:
nodes:
parameter-aggregation-dag-h8b82:
children:
- parameter-aggregation-dag-h8b82-3379492521
displayName: parameter-aggregation-dag-h8b82
finishedAt: "2020-12-09T15:37:07Z"
id: parameter-aggregation-dag-h8b82
name: parameter-aggregation-dag-h8b82
outboundNodes:
- parameter-aggregation-dag-h8b82-3175470584
- parameter-aggregation-dag-h8b82-2243926302
phase: Running
startedAt: "2020-12-09T15:36:46Z"
templateName: parameter-aggregation
templateScope: local/parameter-aggregation-dag-h8b82
type: DAG
parameter-aggregation-dag-h8b82-1440345089:
boundaryID: parameter-aggregation-dag-h8b82
displayName: odd-or-even(1:2)
finishedAt: "2020-12-09T15:36:54Z"
hostNodeName: minikube
id: parameter-aggregation-dag-h8b82-1440345089
inputs:
parameters:
- name: num
value: "2"
name: parameter-aggregation-dag-h8b82.odd-or-even(1:2)
outputs:
exitCode: "0"
parameters:
- name: num
value: "2"
valueFrom:
path: /tmp/num
- name: evenness
value: even
valueFrom:
path: /tmp/even
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
templateName: odd-or-even
templateScope: local/parameter-aggregation-dag-h8b82
type: Pod
parameter-aggregation-dag-h8b82-3379492521:
boundaryID: parameter-aggregation-dag-h8b82
children:
- parameter-aggregation-dag-h8b82-3572919299
- parameter-aggregation-dag-h8b82-1440345089
displayName: odd-or-even
finishedAt: "2020-12-09T15:36:55Z"
id: parameter-aggregation-dag-h8b82-3379492521
name: parameter-aggregation-dag-h8b82.odd-or-even
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
templateName: odd-or-even
templateScope: local/parameter-aggregation-dag-h8b82
type: TaskGroup
parameter-aggregation-dag-h8b82-3572919299:
boundaryID: parameter-aggregation-dag-h8b82
displayName: odd-or-even(0:1)
finishedAt: "2020-12-09T15:36:53Z"
hostNodeName: minikube
id: parameter-aggregation-dag-h8b82-3572919299
inputs:
parameters:
- name: num
value: "1"
name: parameter-aggregation-dag-h8b82.odd-or-even(0:1)
outputs:
exitCode: "0"
parameters:
- name: num
value: "1"
valueFrom:
path: /tmp/num
- name: evenness
value: odd
valueFrom:
path: /tmp/even
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
templateName: odd-or-even
templateScope: local/parameter-aggregation-dag-h8b82
type: Pod
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
`

func TestDAGReferTaskAggregatedOutputs(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(dagOutputsReferTaskAggregatedOuputs)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

dagNode := woc.wf.Status.Nodes.FindByDisplayName("parameter-aggregation-dag-h8b82")
if assert.NotNil(t, dagNode) {
if assert.NotNil(t, dagNode.Outputs) {
if assert.Len(t, dagNode.Outputs.Parameters, 2) {
assert.Equal(t, `["1","2"]`, dagNode.Outputs.Parameters[0].Value.String())
assert.Equal(t, `["odd","even"]`, dagNode.Outputs.Parameters[1].Value.String())
}
}
}
}
3 changes: 2 additions & 1 deletion workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,8 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
resolvedTemplates[task.Name] = resolvedTmpl

prefix := fmt.Sprintf("tasks.%s", task.Name)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false)
aggregate := len(task.WithItems) > 0 || task.WithParam != ""
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, aggregate, false)

err = common.ValidateTaskResults(&task)
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions workflow/validate/validate_dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,3 +943,93 @@ func TestDAGWithDigitNameNoDepends(t *testing.T) {
_, err := validate(dagWithDigitNoDepends)
assert.NoError(t, err)
}

var dagOutputsResolveTaskAggregatedOutputs = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
spec:
serviceAccountName: argo
entrypoint: dag
templates:
- name: dag
dag:
tasks:
- name: fanout
template: fanout
arguments:
parameters:
- name: input
value: "[1, 2]"
- name: dag-process
template: sub-dag
depends: fanout
arguments:
parameters:
- name: item
value: '{{item}}'
- name: input
value: '{{tasks.fanout.outputs.parameters.output}}'
withParam: "{{tasks.fanout.outputs.parameters.output}}"
- name: sub-dag
inputs:
parameters:
- name: input
- name: item
outputs:
parameters:
- name: output
valueFrom:
parameter: "{{tasks.process.outputs.parameters}}"
dag:
tasks:
- name: fanout
template: fanout
arguments:
parameters:
- name: input
value: '{{inputs.parameters.input}}'
- name: process
template: process
depends: fanout
arguments:
parameters:
- name: item
value: '{{item}}'
withParam: "{{tasks.fanout.outputs.parameters.output}}"
- name: fanout
inputs:
parameters:
- name: input
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo {{inputs.parameters.input}} | tee /tmp/output"]
outputs:
parameters:
- name: output
valueFrom:
path: /tmp/output
- name: process
inputs:
parameters:
- name: item
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo {{inputs.parameters.item}} | tee /tmp/output"]
outputs:
parameters:
- name: output
valueFrom:
path: /tmp/output
`

func TestDAGOutputsResolveTaskAggregatedOutputs(t *testing.T) {
_, err := validate(dagOutputsResolveTaskAggregatedOutputs)
assert.NoError(t, err)
}

0 comments on commit 3a98174

Please sign in to comment.