Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix bugs, unable to resolve tasks aggregated outputs in dag outputs. Fixes #6684 #6692

Merged
merged 2 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,19 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
// Can happen when dag.target was specified
continue
}
woc.buildLocalScope(scope, fmt.Sprintf("tasks.%s", task.Name), taskNode)

prefix := fmt.Sprintf("tasks.%s", task.Name)
if taskNode.Type == wfv1.NodeTypeTaskGroup {
childNodes := make([]wfv1.NodeStatus, len(taskNode.Children))
for i, childID := range taskNode.Children {
childNodes[i] = woc.wf.Status.Nodes[childID]
}
err := woc.processAggregateNodeOutputs(scope, prefix, childNodes)
if err != nil {
return nil, errors.InternalWrapError(err)
}
}
woc.buildLocalScope(scope, prefix, taskNode)
woc.addOutputsToGlobalScope(taskNode.Outputs)
}
outputs, err := getTemplateOutputsFromScope(tmpl, scope)
Expand Down
171 changes: 171 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3150,3 +3150,174 @@ func TestLeafContinueOn(t *testing.T) {
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

var dagOutputsReferTaskAggregatedOuputs = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: parameter-aggregation-dag-h8b82
spec:

entrypoint: parameter-aggregation
templates:
-
dag:
tasks:
- arguments:
parameters:
- name: num
value: '{{item}}'
name: odd-or-even
template: odd-or-even
withItems:
- 1
- 2
inputs: {}
metadata: {}
name: parameter-aggregation
outputs:
parameters:
- name: dag-nums
valueFrom:
parameter: '{{tasks.odd-or-even.outputs.parameters.num}}'
- name: dag-evenness
valueFrom:
parameter: '{{tasks.odd-or-even.outputs.parameters.evenness}}'
-
container:
args:
- |
sleep 1 &&
echo {{inputs.parameters.num}} > /tmp/num &&
if [ $(({{inputs.parameters.num}}%2)) -eq 0 ]; then
echo "even" > /tmp/even;
else
echo "odd" > /tmp/even;
fi
command:
- sh
- -c
image: alpine:latest
name: ""
resources: {}
inputs:
parameters:
- name: num
metadata: {}
name: odd-or-even
outputs:
parameters:
- name: num
valueFrom:
path: /tmp/num
- name: evenness
valueFrom:
path: /tmp/even
status:
nodes:
parameter-aggregation-dag-h8b82:
children:
- parameter-aggregation-dag-h8b82-3379492521
displayName: parameter-aggregation-dag-h8b82
finishedAt: "2020-12-09T15:37:07Z"
id: parameter-aggregation-dag-h8b82
name: parameter-aggregation-dag-h8b82
outboundNodes:
- parameter-aggregation-dag-h8b82-3175470584
- parameter-aggregation-dag-h8b82-2243926302
phase: Running
startedAt: "2020-12-09T15:36:46Z"
templateName: parameter-aggregation
templateScope: local/parameter-aggregation-dag-h8b82
type: DAG
parameter-aggregation-dag-h8b82-1440345089:
boundaryID: parameter-aggregation-dag-h8b82
displayName: odd-or-even(1:2)
finishedAt: "2020-12-09T15:36:54Z"
hostNodeName: minikube
id: parameter-aggregation-dag-h8b82-1440345089
inputs:
parameters:
- name: num
value: "2"
name: parameter-aggregation-dag-h8b82.odd-or-even(1:2)
outputs:
exitCode: "0"
parameters:
- name: num
value: "2"
valueFrom:
path: /tmp/num
- name: evenness
value: even
valueFrom:
path: /tmp/even
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
templateName: odd-or-even
templateScope: local/parameter-aggregation-dag-h8b82
type: Pod
parameter-aggregation-dag-h8b82-3379492521:
boundaryID: parameter-aggregation-dag-h8b82
children:
- parameter-aggregation-dag-h8b82-3572919299
- parameter-aggregation-dag-h8b82-1440345089
displayName: odd-or-even
finishedAt: "2020-12-09T15:36:55Z"
id: parameter-aggregation-dag-h8b82-3379492521
name: parameter-aggregation-dag-h8b82.odd-or-even
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
templateName: odd-or-even
templateScope: local/parameter-aggregation-dag-h8b82
type: TaskGroup
parameter-aggregation-dag-h8b82-3572919299:
boundaryID: parameter-aggregation-dag-h8b82
displayName: odd-or-even(0:1)
finishedAt: "2020-12-09T15:36:53Z"
hostNodeName: minikube
id: parameter-aggregation-dag-h8b82-3572919299
inputs:
parameters:
- name: num
value: "1"
name: parameter-aggregation-dag-h8b82.odd-or-even(0:1)
outputs:
exitCode: "0"
parameters:
- name: num
value: "1"
valueFrom:
path: /tmp/num
- name: evenness
value: odd
valueFrom:
path: /tmp/even
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
templateName: odd-or-even
templateScope: local/parameter-aggregation-dag-h8b82
type: Pod
phase: Succeeded
startedAt: "2020-12-09T15:36:46Z"
`

func TestDAGReferTaskAggregatedOutputs(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(dagOutputsReferTaskAggregatedOuputs)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

dagNode := woc.wf.Status.Nodes.FindByDisplayName("parameter-aggregation-dag-h8b82")
if assert.NotNil(t, dagNode) {
if assert.NotNil(t, dagNode.Outputs) {
if assert.Len(t, dagNode.Outputs.Parameters, 2) {
assert.Equal(t, `["1","2"]`, dagNode.Outputs.Parameters[0].Value.String())
assert.Equal(t, `["odd","even"]`, dagNode.Outputs.Parameters[1].Value.String())
}
}
}
}
3 changes: 2 additions & 1 deletion workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,8 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
resolvedTemplates[task.Name] = resolvedTmpl

prefix := fmt.Sprintf("tasks.%s", task.Name)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false)
aggregate := len(task.WithItems) > 0 || task.WithParam != ""
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, aggregate, false)

err = common.ValidateTaskResults(&task)
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions workflow/validate/validate_dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,3 +943,93 @@ func TestDAGWithDigitNameNoDepends(t *testing.T) {
_, err := validate(dagWithDigitNoDepends)
assert.NoError(t, err)
}

var dagOutputsResolveTaskAggregatedOutputs = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
spec:
serviceAccountName: argo
entrypoint: dag
templates:
- name: dag
dag:
tasks:
- name: fanout
template: fanout
arguments:
parameters:
- name: input
value: "[1, 2]"
- name: dag-process
template: sub-dag
depends: fanout
arguments:
parameters:
- name: item
value: '{{item}}'
- name: input
value: '{{tasks.fanout.outputs.parameters.output}}'
withParam: "{{tasks.fanout.outputs.parameters.output}}"

- name: sub-dag
inputs:
parameters:
- name: input
- name: item
outputs:
parameters:
- name: output
valueFrom:
parameter: "{{tasks.process.outputs.parameters}}"
dag:
tasks:
- name: fanout
template: fanout
arguments:
parameters:
- name: input
value: '{{inputs.parameters.input}}'
- name: process
template: process
depends: fanout
arguments:
parameters:
- name: item
value: '{{item}}'
withParam: "{{tasks.fanout.outputs.parameters.output}}"

- name: fanout
inputs:
parameters:
- name: input
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo {{inputs.parameters.input}} | tee /tmp/output"]
outputs:
parameters:
- name: output
valueFrom:
path: /tmp/output

- name: process
inputs:
parameters:
- name: item
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo {{inputs.parameters.item}} | tee /tmp/output"]
outputs:
parameters:
- name: output
valueFrom:
path: /tmp/output
`

func TestDAGOutputsResolveTaskAggregatedOutputs(t *testing.T) {
_, err := validate(dagOutputsResolveTaskAggregatedOutputs)
assert.NoError(t, err)
}