diff --git a/examples/dag-coinflip.yaml b/examples/dag-coinflip.yaml
new file mode 100644
index 000000000000..ab0ff76a4fce
--- /dev/null
+++ b/examples/dag-coinflip.yaml
@@ -0,0 +1,44 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-diamond-coinflip-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: coinflip
+ steps:
+ - - name: flip-coin
+ template: flip-coin
+ - - name: heads
+ template: heads
+ when: "{{steps.flip-coin.outputs.result}} == heads"
+ - name: tails
+ template: coinflip
+ when: "{{steps.flip-coin.outputs.result}} == tails"
+ - name: flip-coin
+ script:
+ image: python:3.6
+ command: [python]
+ source: |
+ import random
+ result = "heads" if random.randint(0,1) == 0 else "tails"
+ print(result)
+ - name: heads
+ container:
+ image: alpine:3.6
+ command: [sh, -c]
+ args: ["echo \"it was heads\""]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: coinflip
+ - name: B
+ dependencies: [A]
+ template: coinflip
+ - name: C
+ dependencies: [A]
+ template: coinflip
+ - name: D
+ dependencies: [B, C]
+ template: coinflip
diff --git a/examples/dag-diamond-steps.yaml b/examples/dag-diamond-steps.yaml
new file mode 100644
index 000000000000..3a91bec17970
--- /dev/null
+++ b/examples/dag-diamond-steps.yaml
@@ -0,0 +1,69 @@
+# The following workflow executes a diamond workflow, with each
+# node comprising of three parallel fan-in fan-out steps.
+#
+# *
+# / | \
+# A1 A2 A3
+# \ | /
+# *
+# / \
+# / \
+# * *
+# / | \ / | \
+# B1 B2 B3 C1 C2 C3
+# \ | / \ | /
+# * *
+# \ /
+# \ /
+# *
+# / | \
+# D1 D2 D3
+# \ | /
+# *
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-diamond-steps-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: echo-thrice
+ inputs:
+ parameters:
+ - name: message
+ steps:
+ - - name: echo
+ template: echo
+ arguments:
+ parameters:
+ - {name: message, value: "{{inputs.parameters.message}}{{item}}"}
+ withItems: [1,2,3]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies: [A]
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [B, C]
+ template: echo-thrice
+ arguments:
+ parameters: [{name: message, value: D}]
diff --git a/examples/dag-diamond.yaml b/examples/dag-diamond.yaml
new file mode 100644
index 000000000000..bdc644c6dd23
--- /dev/null
+++ b/examples/dag-diamond.yaml
@@ -0,0 +1,43 @@
+# The following workflow executes a diamond workflow
+#
+# A
+# / \
+# B C
+# \ /
+# D
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-diamond-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [B, C]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: D}]
diff --git a/examples/dag-multiroot.yaml b/examples/dag-multiroot.yaml
new file mode 100644
index 000000000000..f5e553af5507
--- /dev/null
+++ b/examples/dag-multiroot.yaml
@@ -0,0 +1,41 @@
+# The following workflow executes a multi-root workflow
+#
+# A B
+# / \ /
+# C D
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-multiroot-
+spec:
+ entrypoint: multiroot
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: multiroot
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies:
+ template: echo
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [A, B]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: D}]
diff --git a/examples/dag-nested.yaml b/examples/dag-nested.yaml
new file mode 100644
index 000000000000..ab755bd15daa
--- /dev/null
+++ b/examples/dag-nested.yaml
@@ -0,0 +1,61 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-nested-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: A}]
+ - name: B
+ dependencies: [A]
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: B}]
+ - name: C
+ dependencies: [A]
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: C}]
+ - name: D
+ dependencies: [B, C]
+ template: nested-diamond
+ arguments:
+ parameters: [{name: message, value: D}]
+ - name: nested-diamond
+ inputs:
+ parameters:
+ - name: message
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}A"}]
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}B"}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}C"}]
+ - name: D
+ dependencies: [B, C]
+ template: echo
+ arguments:
+ parameters: [{name: message, value: "{{inputs.parameters.message}}D"}]
diff --git a/hack/wfgraph.py b/hack/wfgraph.py
new file mode 100755
index 000000000000..96074ca9c54d
--- /dev/null
+++ b/hack/wfgraph.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python3
+
+import argparse
+import json
+import subprocess
+import tempfile
+
+from subprocess import run
+
+template = '''
+
+
+
+
%s
+
+
+
+
+
+
+
+%s
+
+
+
+
+
+
+'''
+
+def main():
+ parser = argparse.ArgumentParser(description='Visualize graph of a workflow')
+ parser.add_argument('workflow', type=str, help='workflow name')
+ args = parser.parse_args()
+ res = run(["kubectl", "get", "workflow", "-o", "json", args.workflow ], stdout=subprocess.PIPE)
+ wf = json.loads(res.stdout.decode("utf-8"))
+ nodes = []
+ edges = []
+ colors = {
+ 'Pending': 'fill: #D0D0D0',
+ 'Running': 'fill: #A0FFFF',
+ 'Failed': 'fill: #f77',
+ 'Succeeded': 'fill: #afa',
+ 'Skipped': 'fill: #D0D0D0',
+ 'Error': 'fill: #f77',
+ }
+ wf_name = wf['metadata']['name']
+ for node_id, node_status in wf['status']['nodes'].items():
+ if node_status['name'] == wf_name:
+ label = node_status['name']
+ else:
+ label = node_status['name'].replace(wf_name, "")
+ node = {'id': node_id, 'label': label, 'color': colors[node_status['phase']]}
+ nodes.append(node)
+ if 'children' in node_status:
+ for child_id in node_status['children']:
+ edge = {'from': node_id, 'to': child_id, 'arrows': 'to'}
+ edges.append(edge)
+ html = template % (wf_name, wf_name, json.dumps(nodes), json.dumps(edges))
+ tmpfile = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
+ tmpfile.write(html.encode())
+ tmpfile.flush()
+ run(["open", tmpfile.name])
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go
index 17ea15815e8a..61680f4131f7 100644
--- a/pkg/apis/workflow/v1alpha1/types.go
+++ b/pkg/apis/workflow/v1alpha1/types.go
@@ -18,6 +18,7 @@ const (
TemplateTypeSteps TemplateType = "Steps"
TemplateTypeScript TemplateType = "Script"
TemplateTypeResource TemplateType = "Resource"
+ TemplateTypeDAG TemplateType = "DAG"
)
// NodePhase is a label for the condition of a node at the current time.
@@ -115,9 +116,12 @@ type Template struct {
// Sidecar containers
Sidecars []Sidecar `json:"sidecars,omitempty"`
- // Resource is the resource template type
+ // Resource template subtype which can run k8s resources
Resource *ResourceTemplate `json:"resource,omitempty"`
+ // DAG template subtype which runs a DAG
+ DAG *DAG `json:"dag,omitempty"`
+
// Location in which all files related to the step will be stored (logs, artifacts, etc...).
// Can be overridden by individual items in Outputs. If omitted, will use the default
// artifact repository location configured in the controller, appended with the
@@ -312,6 +316,9 @@ type NodeStatus struct {
// Time at which this node completed
FinishedAt metav1.Time `json:"finishedAt,omitempty"`
+ // IsPod indicates if this node is a pod or not
+ IsPod bool `json:"isPod,omitempty"`
+
// PodIP captures the IP of the pod for daemoned steps
PodIP string `json:"podIP,omitempty"`
@@ -325,6 +332,20 @@ type NodeStatus struct {
// Children is a list of child node IDs
Children []string `json:"children,omitempty"`
+
+ // OutboundNodes tracks the node IDs which are considered "outbound" nodes to a template invocation.
+ // For every invocation of a template, there are nodes which we considered as "outbound". Essentially,
+ // these are last nodes in the execution sequence to run, before the template is considered completed.
+ // These nodes are then connected as parents to a following step.
+ //
+ // In the case of single pod steps (i.e. container, script, resource templates), this list will be nil
+ // since the pod itself is already considered the "outbound" node.
+ // In the case of DAGs, outbound nodes are the "target" tasks (tasks with no children).
+ // In the case of steps, outbound nodes are all the containers involved in the last step group.
+ // NOTE: since templates are composable, the list of outbound nodes are carried upwards when
+ // a DAG/steps template invokes another DAG/steps template. In other words, the outbound nodes of
+ // a template, will be a superset of the outbound nodes of its last children.
+ OutboundNodes []string `json:"outboundNodes,omitempty"`
}
func (n NodeStatus) String() string {
@@ -429,6 +450,9 @@ func (tmpl *Template) GetType() TemplateType {
if tmpl.Steps != nil {
return TemplateTypeSteps
}
+ if tmpl.DAG != nil {
+ return TemplateTypeDAG
+ }
if tmpl.Script != nil {
return TemplateTypeScript
}
@@ -438,6 +462,30 @@ func (tmpl *Template) GetType() TemplateType {
return "Unknown"
}
+// DAG is a template subtype for directed acyclic graph templates
+type DAG struct {
+ // Target are one or more names of targets to execute in a DAG
+ Targets string `json:"target,omitempty"`
+
+ // Tasks are a list of DAG tasks
+ Tasks []DAGTask `json:"tasks"`
+}
+
+// DAGTask represents a node in the graph during DAG execution
+type DAGTask struct {
+ // Name is the name of the target
+ Name string `json:"name"`
+
+ // Name of template to execute
+ Template string `json:"template"`
+
+ // Arguments are the parameter and artifact arguments to the template
+ Arguments Arguments `json:"arguments,omitempty"`
+
+ // Dependencies are name of other targets which this depends on
+ Dependencies []string `json:"dependencies,omitempty"`
+}
+
func (in *Inputs) GetArtifactByName(name string) *Artifact {
for _, art := range in.Artifacts {
if art.Name == name {
diff --git a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
index f923712c6c65..577bb35c3e1b 100644
--- a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
@@ -177,6 +177,51 @@ func (in *ArtifactoryAuth) DeepCopy() *ArtifactoryAuth {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *DAG) DeepCopyInto(out *DAG) {
+ *out = *in
+ if in.Tasks != nil {
+ in, out := &in.Tasks, &out.Tasks
+ *out = make([]DAGTask, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DAG.
+func (in *DAG) DeepCopy() *DAG {
+ if in == nil {
+ return nil
+ }
+ out := new(DAG)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *DAGTask) DeepCopyInto(out *DAGTask) {
+ *out = *in
+ in.Arguments.DeepCopyInto(&out.Arguments)
+ if in.Dependencies != nil {
+ in, out := &in.Dependencies, &out.Dependencies
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DAGTask.
+func (in *DAGTask) DeepCopy() *DAGTask {
+ if in == nil {
+ return nil
+ }
+ out := new(DAGTask)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *GitArtifact) DeepCopyInto(out *GitArtifact) {
*out = *in
@@ -294,6 +339,11 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) {
*out = make([]string, len(*in))
copy(*out, *in)
}
+ if in.OutboundNodes != nil {
+ in, out := &in.OutboundNodes, &out.OutboundNodes
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
return
}
@@ -622,6 +672,15 @@ func (in *Template) DeepCopyInto(out *Template) {
**out = **in
}
}
+ if in.DAG != nil {
+ in, out := &in.DAG, &out.DAG
+ if *in == nil {
+ *out = nil
+ } else {
+ *out = new(DAG)
+ (*in).DeepCopyInto(*out)
+ }
+ }
if in.ArchiveLocation != nil {
in, out := &in.ArchiveLocation, &out.ArchiveLocation
if *in == nil {
diff --git a/test/e2e/expectedfailures/dag-fail.yaml b/test/e2e/expectedfailures/dag-fail.yaml
new file mode 100644
index 000000000000..21d085ba804b
--- /dev/null
+++ b/test/e2e/expectedfailures/dag-fail.yaml
@@ -0,0 +1,44 @@
+# The following workflow executes a diamond workflow where C fails
+#
+# A
+# / \
+# B C
+# \ /
+# D
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-fail-
+spec:
+ entrypoint: diamond
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: cmd
+ container:
+ image: alpine:3.7
+ command: [sh, -c]
+ args: ["{{inputs.parameters.cmd}}"]
+ - name: diamond
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo A}]
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo B}]
+ - name: C
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo C; exit 1}]
+ - name: D
+ dependencies: [B, C]
+ template: echo
+ arguments:
+ parameters: [{name: cmd, value: echo D}]
diff --git a/test/e2e/functional/dag-argument-passing.yaml b/test/e2e/functional/dag-argument-passing.yaml
new file mode 100644
index 000000000000..21335a695944
--- /dev/null
+++ b/test/e2e/functional/dag-argument-passing.yaml
@@ -0,0 +1,34 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-arg-passing-
+spec:
+ entrypoint: dag-arg-passing
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ outputs:
+ parameters:
+ - name: hosts
+ path: /etc/hosts
+ - name: dag-arg-passing
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: val
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: "{{dependencies.A.outputs.parameters.hosts}}"
\ No newline at end of file
diff --git a/workflow/common/validate.go b/workflow/common/validate.go
index 9c04925d63ae..e2a48de29400 100644
--- a/workflow/common/validate.go
+++ b/workflow/common/validate.go
@@ -53,7 +53,6 @@ func ValidateWorkflow(wf *wfv1.Workflow) error {
for _, param := range ctx.wf.Spec.Arguments.Parameters {
ctx.globalParams["workflow.parameters."+param.Name] = placeholderValue
}
-
if ctx.wf.Spec.Entrypoint == "" {
return errors.New(errors.CodeBadRequest, "spec.entrypoint is required")
}
@@ -61,7 +60,6 @@ func ValidateWorkflow(wf *wfv1.Workflow) error {
if entryTmpl == nil {
return errors.Errorf(errors.CodeBadRequest, "spec.entrypoint template '%s' undefined", ctx.wf.Spec.Entrypoint)
}
-
err = ctx.validateTemplate(entryTmpl, ctx.wf.Spec.Arguments)
if err != nil {
return err
@@ -113,17 +111,23 @@ func (ctx *wfValidationCtx) validateTemplate(tmpl *wfv1.Template, args wfv1.Argu
if tmpl.Resource != nil {
tmplTypes++
}
+ if tmpl.DAG != nil {
+ tmplTypes++
+ }
switch tmplTypes {
case 0:
- return errors.New(errors.CodeBadRequest, "template type unspecified. choose one of: container, steps, script, resource")
+ return errors.New(errors.CodeBadRequest, "template type unspecified. choose one of: container, steps, script, resource, dag")
case 1:
default:
- return errors.New(errors.CodeBadRequest, "multiple template types specified. choose one of: container, steps, script, resource")
+ return errors.New(errors.CodeBadRequest, "multiple template types specified. choose one of: container, steps, script, resource, dag")
}
- if tmpl.Steps == nil {
- err = validateLeaf(scope, tmpl)
- } else {
+ switch tmpl.GetType() {
+ case wfv1.TemplateTypeSteps:
err = ctx.validateSteps(scope, tmpl)
+ case wfv1.TemplateTypeDAG:
+ err = ctx.validateDAG(scope, tmpl)
+ default:
+ err = validateLeaf(scope, tmpl)
}
if err != nil {
return err
@@ -138,11 +142,11 @@ func (ctx *wfValidationCtx) validateTemplate(tmpl *wfv1.Template, args wfv1.Argu
func validateInputs(tmpl *wfv1.Template) (map[string]interface{}, error) {
err := validateWorkflowFieldNames(tmpl.Inputs.Parameters)
if err != nil {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' inputs.parameters%s", tmpl.Name, err.Error())
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.inputs.parameters%s", tmpl.Name, err.Error())
}
err = validateWorkflowFieldNames(tmpl.Inputs.Artifacts)
if err != nil {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' inputs.artifacts%s", tmpl.Name, err.Error())
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.inputs.artifacts%s", tmpl.Name, err.Error())
}
scope := make(map[string]interface{})
for _, param := range tmpl.Inputs.Parameters {
@@ -154,17 +158,17 @@ func validateInputs(tmpl *wfv1.Template) (map[string]interface{}, error) {
scope[artRef] = true
if isLeaf {
if art.Path == "" {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path not specified", tmpl.Name, artRef)
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path not specified", tmpl.Name, artRef)
}
} else {
if art.Path != "" {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path only valid in container/script templates", tmpl.Name, artRef)
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path only valid in container/script templates", tmpl.Name, artRef)
}
}
if art.From != "" {
- return nil, errors.Errorf(errors.CodeBadRequest, "template '%s' %s.from not valid in inputs", tmpl.Name, artRef)
+ return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.from not valid in inputs", tmpl.Name, artRef)
}
- errPrefix := fmt.Sprintf("template '%s' %s", tmpl.Name, artRef)
+ errPrefix := fmt.Sprintf("templates.%s.%s", tmpl.Name, artRef)
err = validateArtifactLocation(errPrefix, art)
if err != nil {
return nil, err
@@ -211,7 +215,7 @@ func validateLeaf(scope map[string]interface{}, tmpl *wfv1.Template) error {
}
err = resolveAllVariables(scope, string(tmplBytes))
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' %s", tmpl.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "template.%s: %s", tmpl.Name, err.Error())
}
if tmpl.Container != nil {
// Ensure there are no collisions with volume mountPaths and artifact load paths
@@ -256,19 +260,19 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
for i, stepGroup := range tmpl.Steps {
for _, step := range stepGroup {
if step.Name == "" {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].name is required", tmpl.Name, i)
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name is required", tmpl.Name, i)
}
_, ok := stepNames[step.Name]
if ok {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
}
if errs := IsValidWorkflowFieldName(step.Name); len(errs) != 0 {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
}
stepNames[step.Name] = true
err := addItemsToScope(&step, scope)
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
}
stepBytes, err := json.Marshal(stepGroup)
if err != nil {
@@ -276,13 +280,13 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
}
err = resolveAllVariables(scope, string(stepBytes))
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
}
childTmpl := ctx.wf.GetTemplate(step.Template)
if childTmpl == nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' steps[%d].%s.template '%s' undefined", tmpl.Name, i, step.Name, step.Template)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s.template '%s' undefined", tmpl.Name, i, step.Name, step.Template)
}
- err = validateArguments(fmt.Sprintf("template '%s' steps[%d].%s.arguments.", tmpl.Name, i, step.Name), step.Arguments)
+ err = validateArguments(fmt.Sprintf("templates.%s.steps[%d].%s.arguments.", tmpl.Name, i, step.Name), step.Arguments)
if err != nil {
return err
}
@@ -292,7 +296,7 @@ func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wf
}
}
for _, step := range stepGroup {
- ctx.addOutputsToScope(step.Template, step.Name, scope)
+ ctx.addOutputsToScope(step.Template, fmt.Sprintf("steps.%s", step.Name), scope)
}
}
return nil
@@ -320,30 +324,30 @@ func addItemsToScope(step *wfv1.WorkflowStep, scope map[string]interface{}) erro
return nil
}
-func (ctx *wfValidationCtx) addOutputsToScope(templateName string, stepName string, scope map[string]interface{}) {
+func (ctx *wfValidationCtx) addOutputsToScope(templateName string, prefix string, scope map[string]interface{}) {
tmpl := ctx.wf.GetTemplate(templateName)
if tmpl.Daemon != nil && *tmpl.Daemon {
- scope[fmt.Sprintf("steps.%s.ip", stepName)] = true
+ scope[fmt.Sprintf("%s.ip", prefix)] = true
}
if tmpl.Script != nil {
- scope[fmt.Sprintf("steps.%s.outputs.result", stepName)] = true
+ scope[fmt.Sprintf("%s.outputs.result", prefix)] = true
}
for _, param := range tmpl.Outputs.Parameters {
- scope[fmt.Sprintf("steps.%s.outputs.parameters.%s", stepName, param.Name)] = true
+ scope[fmt.Sprintf("%s.outputs.parameters.%s", prefix, param.Name)] = true
}
for _, art := range tmpl.Outputs.Artifacts {
- scope[fmt.Sprintf("steps.%s.outputs.artifacts.%s", stepName, art.Name)] = true
+ scope[fmt.Sprintf("%s.outputs.artifacts.%s", prefix, art.Name)] = true
}
}
func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
err := validateWorkflowFieldNames(tmpl.Outputs.Parameters)
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' outputs.parameters%s", tmpl.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.parameters%s", tmpl.Name, err.Error())
}
err = validateWorkflowFieldNames(tmpl.Outputs.Artifacts)
if err != nil {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' outputs.artifacts%s", tmpl.Name, err.Error())
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts%s", tmpl.Name, err.Error())
}
outputBytes, err := json.Marshal(tmpl.Outputs)
if err != nil {
@@ -359,11 +363,11 @@ func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
artRef := fmt.Sprintf("outputs.artifacts.%s", art.Name)
if isLeaf {
if art.Path == "" {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path not specified", tmpl.Name, artRef)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path not specified", tmpl.Name, artRef)
}
} else {
if art.Path != "" {
- return errors.Errorf(errors.CodeBadRequest, "template '%s' %s.path only valid in container/script templates", tmpl.Name, artRef)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path only valid in container/script templates", tmpl.Name, artRef)
}
}
}
@@ -415,3 +419,107 @@ func validateWorkflowFieldNames(slice interface{}) error {
}
return nil
}
+
+func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1.Template) error {
+ err := validateWorkflowFieldNames(tmpl.DAG.Tasks)
+ if err != nil {
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks%s", tmpl.Name, err.Error())
+ }
+ nameToTask := make(map[string]wfv1.DAGTask)
+ for _, task := range tmpl.DAG.Tasks {
+ nameToTask[task.Name] = task
+ }
+
+ // Verify dependencies for all tasks can be resolved as well as template names
+ for _, task := range tmpl.DAG.Tasks {
+ if task.Template == "" {
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s.template is required", tmpl.Name, task.Name)
+ }
+ taskTmpl := ctx.wf.GetTemplate(task.Template)
+ if taskTmpl == nil {
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks%s.template '%s' undefined", tmpl.Name, task.Name, task.Template)
+ }
+ dupDependencies := make(map[string]bool)
+ for j, depName := range task.Dependencies {
+ if _, ok := dupDependencies[depName]; ok {
+ return errors.Errorf(errors.CodeBadRequest,
+ "templates.%s.tasks.%s.dependencies[%d] dependency '%s' duplicated",
+ tmpl.Name, task.Name, j, depName)
+ }
+ dupDependencies[depName] = true
+ if _, ok := nameToTask[depName]; !ok {
+ return errors.Errorf(errors.CodeBadRequest,
+ "templates.%s.tasks.%s.dependencies[%d] dependency '%s' not defined",
+ tmpl.Name, task.Name, j, depName)
+ }
+ }
+ }
+
+ err = verifyNoCycles(tmpl, nameToTask)
+ if err != nil {
+ return err
+ }
+
+ for _, task := range tmpl.DAG.Tasks {
+ taskBytes, err := json.Marshal(task)
+ if err != nil {
+ return errors.InternalWrapError(err)
+ }
+ // add outputs of all our dependencies to scope
+ taskScope := make(map[string]interface{})
+ for k, v := range scope {
+ taskScope[k] = v
+ }
+ for _, depName := range task.Dependencies {
+ ctx.addOutputsToScope(nameToTask[depName].Template, fmt.Sprintf("dependencies.%s", depName), taskScope)
+ }
+ err = resolveAllVariables(taskScope, string(taskBytes))
+ if err != nil {
+ return errors.Errorf(errors.CodeBadRequest, "template.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
+ }
+ taskTmpl := ctx.wf.GetTemplate(task.Template)
+ err = ctx.validateTemplate(taskTmpl, task.Arguments)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// verifyNoCycles verifies there are no cycles in the DAG graph
+func verifyNoCycles(tmpl *wfv1.Template, nameToTask map[string]wfv1.DAGTask) error {
+ visited := make(map[string]bool)
+ var noCyclesHelper func(taskName string, cycle []string) error
+ noCyclesHelper = func(taskName string, cycle []string) error {
+ if _, ok := visited[taskName]; ok {
+ return nil
+ }
+ task := nameToTask[taskName]
+ for _, depName := range task.Dependencies {
+ for _, name := range cycle {
+ if name == depName {
+ return errors.Errorf(errors.CodeBadRequest,
+ "templates.%s.tasks dependency cycle detected: %s->%s",
+ tmpl.Name, strings.Join(cycle, "->"), name)
+ }
+ }
+ cycle = append(cycle, depName)
+ err := noCyclesHelper(depName, cycle)
+ if err != nil {
+ return err
+ }
+ cycle = cycle[0 : len(cycle)-1]
+ }
+ visited[taskName] = true
+ return nil
+ }
+
+ for _, task := range tmpl.DAG.Tasks {
+ err := noCyclesHelper(task.Name, []string{})
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/workflow/common/validate_dag_test.go b/workflow/common/validate_dag_test.go
new file mode 100644
index 000000000000..55a058c9f1a4
--- /dev/null
+++ b/workflow/common/validate_dag_test.go
@@ -0,0 +1,174 @@
+package common
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var dagCycle = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-cycle-
+spec:
+ entrypoint: cycle
+ templates:
+ - name: echo
+ container:
+ image: alpine:3.7
+ command: [echo, hello]
+ - name: cycle
+ dag:
+ tasks:
+ - name: A
+ dependencies: [C]
+ template: echo
+ - name: B
+ dependencies: [A]
+ template: echo
+ - name: C
+ dependencies: [A]
+ template: echo
+`
+
+func TestDAGCycle(t *testing.T) {
+ err := validate(dagCycle)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "cycle")
+ }
+}
+
+var duplicateDependencies = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-dup-depends-
+spec:
+ entrypoint: cycle
+ templates:
+ - name: echo
+ container:
+ image: alpine:3.7
+ command: [echo, hello]
+ - name: cycle
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ - name: B
+ dependencies: [A, A]
+ template: echo
+`
+
+func TestDuplicateDependencies(t *testing.T) {
+ err := validate(duplicateDependencies)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "duplicate")
+ }
+}
+
+var dagUndefinedTemplate = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-undefined-
+spec:
+ entrypoint: undef
+ templates:
+ - name: undef
+ dag:
+ tasks:
+ - name: A
+ template: echo
+`
+
+func TestDAGUndefinedTemplate(t *testing.T) {
+ err := validate(dagUndefinedTemplate)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "undefined")
+ }
+}
+
+var dagUnresolvedVar = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-cycle-
+spec:
+ entrypoint: unresolved
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ outputs:
+ parameters:
+ - name: hosts
+ path: /etc/hosts
+ - name: unresolved
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: val
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: "{{dependencies.A.outputs.parameters.unresolvable}}"
+`
+
+var dagResolvedVar = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: dag-cycle-
+spec:
+ entrypoint: unresolved
+ templates:
+ - name: echo
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: alpine:3.7
+ command: [echo, "{{inputs.parameters.message}}"]
+ outputs:
+ parameters:
+ - name: hosts
+ path: /etc/hosts
+ - name: unresolved
+ dag:
+ tasks:
+ - name: A
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: val
+ - name: B
+ dependencies: [A]
+ template: echo
+ arguments:
+ parameters:
+ - name: message
+ value: "{{dependencies.A.outputs.parameters.hosts}}"
+`
+
+func TestDAGVariableResolution(t *testing.T) {
+ err := validate(dagUnresolvedVar)
+ if assert.NotNil(t, err) {
+ assert.Contains(t, err.Error(), "failed to resolve {{dependencies.A.outputs.parameters.unresolvable}}")
+ }
+ err = validate(dagResolvedVar)
+ assert.Nil(t, err)
+}
diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go
new file mode 100644
index 000000000000..0526e474ab7f
--- /dev/null
+++ b/workflow/controller/dag.go
@@ -0,0 +1,289 @@
+package controller
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ "github.com/argoproj/argo/errors"
+ wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
+ "github.com/argoproj/argo/workflow/common"
+ "github.com/valyala/fasttemplate"
+)
+
+// dagContext holds context information about this context's DAG
+type dagContext struct {
+ // encompasser is the node name of the encompassing node to this DAG.
+ // This is used to incorporate into each of the task's node names.
+ encompasser string
+
+ // tasks are all the tasks in the template
+ tasks []wfv1.DAGTask
+
+ // visited keeps track of tasks we have already visited during an invocation of executeDAG
+ // in order to avoid duplicating work
+ visited map[string]bool
+
+ // tmpl is the template spec. it is needed to resolve hard-wired artifacts
+ tmpl *wfv1.Template
+
+ // wf is stored to formulate nodeIDs
+ wf *wfv1.Workflow
+}
+
+func (d *dagContext) getTask(taskName string) *wfv1.DAGTask {
+ for _, task := range d.tasks {
+ if task.Name == taskName {
+ return &task
+ }
+ }
+ panic("target " + taskName + " does not exist")
+}
+
+// taskNodeName formulates the nodeName for a dag task
+func (d *dagContext) taskNodeName(taskName string) string {
+ return fmt.Sprintf("%s.%s", d.encompasser, taskName)
+}
+
+// taskNodeID formulates the node ID for a dag task
+func (d *dagContext) taskNodeID(taskName string) string {
+ nodeName := d.taskNodeName(taskName)
+ return d.wf.NodeID(nodeName)
+}
+
+func (d *dagContext) getTaskNode(taskName string) *wfv1.NodeStatus {
+ nodeID := d.taskNodeID(taskName)
+ node, ok := d.wf.Status.Nodes[nodeID]
+ if !ok {
+ return nil
+ }
+ return &node
+}
+
+func (woc *wfOperationCtx) executeDAG(nodeName string, tmpl *wfv1.Template) *wfv1.NodeStatus {
+ nodeID := woc.wf.NodeID(nodeName)
+ node, nodeInitialized := woc.wf.Status.Nodes[nodeID]
+ if nodeInitialized && node.Completed() {
+ return &node
+ }
+ dagCtx := &dagContext{
+ encompasser: nodeName,
+ tasks: tmpl.DAG.Tasks,
+ visited: make(map[string]bool),
+ tmpl: tmpl,
+ wf: woc.wf,
+ }
+ var targetTasks []string
+ if tmpl.DAG.Targets == "" {
+ targetTasks = findLeafTaskNames(tmpl.DAG.Tasks)
+ } else {
+ targetTasks = strings.Split(tmpl.DAG.Targets, " ")
+ }
+
+ if !nodeInitialized {
+ node = *woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ rootTasks := findRootTaskNames(dagCtx, targetTasks)
+ woc.log.Infof("Root tasks of %s identified as %s", nodeName, rootTasks)
+ for _, rootTaskName := range rootTasks {
+ woc.addChildNode(node.Name, dagCtx.taskNodeName(rootTaskName))
+ }
+ }
+ // kick off execution of each target task asynchronously
+ for _, taskNames := range targetTasks {
+ woc.executeDAGTask(dagCtx, taskNames)
+ }
+ // return early if we have yet to complete execution of any one of our dependencies
+ for _, depName := range targetTasks {
+ depNode := dagCtx.getTaskNode(depName)
+ if depNode == nil || !depNode.Completed() {
+ return &node
+ }
+ }
+ // all desired tasks completed. now it is time to assess state
+ for _, depName := range targetTasks {
+ depNode := dagCtx.getTaskNode(depName)
+ if !depNode.Successful() {
+ // TODO: consider creating a virtual fan-in node
+ return woc.markNodePhase(nodeName, depNode.Phase)
+ }
+ }
+
+ // set the outbound nodes from the target tasks
+ node = *woc.getNodeByName(nodeName)
+ outbound := make([]string, 0)
+ for _, depName := range targetTasks {
+ depNode := dagCtx.getTaskNode(depName)
+ outboundNodeIDs := woc.getOutboundNodes(depNode.ID)
+ for _, outNodeID := range outboundNodeIDs {
+ outbound = append(outbound, outNodeID)
+ }
+ }
+ woc.log.Infof("Outbound nodes of %s set to %s", node.ID, outbound)
+ node.OutboundNodes = outbound
+ woc.wf.Status.Nodes[node.ID] = node
+
+ return woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
+}
+
+// findRootTaskNames finds the names of all tasks which have no dependencies.
+// Once identified, these root tasks are marked as children to the encompassing node.
+func findRootTaskNames(dagCtx *dagContext, targetTasks []string) []string {
+ //rootTaskNames := make(map[string]bool)
+ rootTaskNames := make([]string, 0)
+ visited := make(map[string]bool)
+ var findRootHelper func(s string)
+ findRootHelper = func(taskName string) {
+ if _, ok := visited[taskName]; ok {
+ return
+ }
+ visited[taskName] = true
+ task := dagCtx.getTask(taskName)
+ if len(task.Dependencies) == 0 {
+ rootTaskNames = append(rootTaskNames, taskName)
+ return
+ }
+ for _, depName := range task.Dependencies {
+ findRootHelper(depName)
+ }
+ }
+ for _, targetTaskName := range targetTasks {
+ findRootHelper(targetTaskName)
+ }
+ return rootTaskNames
+}
+
+// executeDAGTask traverses and executes the upward chain of dependencies of a task
+func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
+ if _, ok := dagCtx.visited[taskName]; ok {
+ return
+ }
+ dagCtx.visited[taskName] = true
+
+ node := dagCtx.getTaskNode(taskName)
+ if node != nil && node.Completed() {
+ return
+ }
+ // Check if our dependencies completed. If not, recurse our parents executing them if necessary
+ task := dagCtx.getTask(taskName)
+ dependenciesCompleted := true
+ dependenciesSuccessful := true
+ nodeName := dagCtx.taskNodeName(taskName)
+ for _, depName := range task.Dependencies {
+ depNode := dagCtx.getTaskNode(depName)
+ if depNode != nil {
+ if depNode.Completed() {
+ if !depNode.Successful() {
+ dependenciesSuccessful = false
+ }
+ continue
+ }
+ }
+ dependenciesCompleted = false
+ dependenciesSuccessful = false
+ // recurse our dependency
+ woc.executeDAGTask(dagCtx, depName)
+ }
+ if !dependenciesCompleted {
+ return
+ }
+
+ // All our dependencies completed. Now add the child relationship from our dependency's
+ // outbound nodes to this node.
+ node = dagCtx.getTaskNode(taskName)
+ if node == nil {
+ woc.log.Infof("All of node %s dependencies %s completed", nodeName, task.Dependencies)
+ // Add all outbound nodes of our dependencies as parents to this node
+ for _, depName := range task.Dependencies {
+ depNode := dagCtx.getTaskNode(depName)
+ woc.log.Infof("node %s outbound nodes: %s", depNode, depNode.OutboundNodes)
+ if depNode.IsPod {
+ woc.addChildNode(depNode.Name, nodeName)
+ } else {
+ for _, outNodeID := range depNode.OutboundNodes {
+ woc.addChildNode(woc.wf.Status.Nodes[outNodeID].Name, nodeName)
+ }
+ }
+ }
+ }
+
+ if !dependenciesSuccessful {
+ woc.log.Infof("Task %s being marked %s due to dependency failure", taskName, wfv1.NodeSkipped)
+ _ = woc.markNodePhase(nodeName, wfv1.NodeSkipped)
+ return
+ }
+
+ // All our dependencies were satisfied and successful. It's our turn to run
+ // Substitute params/artifacts from our dependencies and execute the template
+ newTask, err := woc.resolveDependencyReferences(dagCtx, task)
+ if err != nil {
+ woc.markNodeError(nodeName, err)
+ return
+ }
+ _ = woc.executeTemplate(newTask.Template, newTask.Arguments, nodeName)
+}
+
+// resolveDependencyReferences replaces any references to outputs of task dependencies, or artifacts in the inputs
+// NOTE: by now, input parameters should have been substituted throughout the template
+func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task *wfv1.DAGTask) (*wfv1.DAGTask, error) {
+ // build up the scope
+ scope := wfScope{
+ tmpl: dagCtx.tmpl,
+ scope: make(map[string]interface{}),
+ }
+ for _, depName := range task.Dependencies {
+ depNode := dagCtx.getTaskNode(depName)
+ prefix := fmt.Sprintf("dependencies.%s", depName)
+ scope.addNodeOutputsToScope(prefix, depNode)
+ }
+
+ // Perform replacement
+ taskBytes, err := json.Marshal(task)
+ if err != nil {
+ return nil, errors.InternalWrapError(err)
+ }
+ fstTmpl := fasttemplate.New(string(taskBytes), "{{", "}}")
+ newTaskStr, err := common.Replace(fstTmpl, scope.replaceMap(), false, "")
+ if err != nil {
+ return nil, err
+ }
+ var newTask wfv1.DAGTask
+ err = json.Unmarshal([]byte(newTaskStr), &newTask)
+ if err != nil {
+ return nil, errors.InternalWrapError(err)
+ }
+
+ // replace all artifact references
+ for j, art := range newTask.Arguments.Artifacts {
+ if art.From == "" {
+ continue
+ }
+ resolvedArt, err := scope.resolveArtifact(art.From)
+ if err != nil {
+ return nil, err
+ }
+ resolvedArt.Name = art.Name
+ newTask.Arguments.Artifacts[j] = *resolvedArt
+ }
+ return &newTask, nil
+}
+
+// findLeafTaskNames finds the names of all tasks whom no other nodes depend on.
+// This list of tasks is used as the the default list of targets when dag.targets is omitted.
+func findLeafTaskNames(tasks []wfv1.DAGTask) []string {
+ taskIsLeaf := make(map[string]bool)
+ for _, task := range tasks {
+ if _, ok := taskIsLeaf[task.Name]; !ok {
+ taskIsLeaf[task.Name] = true
+ }
+ for _, dependency := range task.Dependencies {
+ taskIsLeaf[dependency] = false
+ }
+ }
+ leafTaskNames := make([]string, 0)
+ for taskName, isLeaf := range taskIsLeaf {
+ if isLeaf {
+ leafTaskNames = append(leafTaskNames, taskName)
+ }
+ }
+ return leafTaskNames
+}
diff --git a/workflow/controller/dag_test.go b/workflow/controller/dag_test.go
new file mode 100644
index 000000000000..b0b429f89997
--- /dev/null
+++ b/workflow/controller/dag_test.go
@@ -0,0 +1 @@
+package controller
diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go
index b33fcece1161..7d2b06569888 100644
--- a/workflow/controller/operator.go
+++ b/workflow/controller/operator.go
@@ -133,6 +133,8 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
woc.markWorkflowError(err, true)
return
}
+ var workflowStatus wfv1.NodePhase
+ var workflowMessage string
err = woc.executeTemplate(wf.Spec.Entrypoint, wf.Spec.Arguments, wf.ObjectMeta.Name)
if err != nil {
if errors.IsCode(errors.CodeTimeout, err) {
@@ -144,18 +146,20 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
}
woc.log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
}
- node := woc.wf.Status.Nodes[woc.wf.NodeID(wf.ObjectMeta.Name)]
+ node := woc.getNodeByName(wf.ObjectMeta.Name)
if !node.Completed() {
return
}
+ workflowStatus = node.Phase
+ workflowMessage = node.Message
var onExitNode *wfv1.NodeStatus
if wf.Spec.OnExit != "" {
- if node.Phase == wfv1.NodeSkipped {
+ if workflowStatus == wfv1.NodeSkipped {
// treat skipped the same as Succeeded for workflow.status
woc.globalParams[common.GlobalVarWorkflowStatus] = string(wfv1.NodeSucceeded)
} else {
- woc.globalParams[common.GlobalVarWorkflowStatus] = string(node.Phase)
+ woc.globalParams[common.GlobalVarWorkflowStatus] = string(workflowStatus)
}
woc.log.Infof("Running OnExit handler: %s", wf.Spec.OnExit)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
@@ -187,7 +191,7 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
// If we get here, the workflow completed, all PVCs were deleted successfully, and
// exit handlers were executed. We now need to infer the workflow phase from the
// node phase.
- switch node.Phase {
+ switch workflowStatus {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if onExitNode != nil && !onExitNode.Successful() {
// if main workflow succeeded, but the exit node was unsuccessful
@@ -197,9 +201,9 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
woc.markWorkflowSuccess()
}
case wfv1.NodeFailed:
- woc.markWorkflowFailed(node.Message)
+ woc.markWorkflowFailed(workflowMessage)
case wfv1.NodeError:
- woc.markWorkflowPhase(wfv1.NodeError, true, node.Message)
+ woc.markWorkflowPhase(wfv1.NodeError, true, workflowMessage)
default:
// NOTE: we should never make it here because if the the node was 'Running'
// we should have returned earlier.
@@ -208,7 +212,16 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
}
}
-// persistUpdates will update a workflow with any updates made during workflow operation.
+func (woc *wfOperationCtx) getNodeByName(nodeName string) *wfv1.NodeStatus {
+ nodeID := woc.wf.NodeID(nodeName)
+ node, ok := woc.wf.Status.Nodes[nodeID]
+ if !ok {
+ return nil
+ }
+ return &node
+}
+
+// persistUpdates will PATCH a workflow with any updates made during workflow operation.
// It also labels any pods as completed if we have extracted everything we need from it.
func (woc *wfOperationCtx) persistUpdates() {
if !woc.updated {
@@ -372,7 +385,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// is now impossible to infer status. The only thing we can do at this point is
// to mark the node with Error.
for nodeID, node := range woc.wf.Status.Nodes {
- if len(node.Children) > 0 || node.Completed() {
+ if !node.IsPod || node.Completed() {
// node is not a pod, or it is already complete
continue
}
@@ -728,21 +741,10 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt
return &lastChildNode, nil
}
-func (woc *wfOperationCtx) getNode(nodeName string) wfv1.NodeStatus {
- nodeID := woc.wf.NodeID(nodeName)
- node, ok := woc.wf.Status.Nodes[nodeID]
- if !ok {
- panic("Failed to find node " + nodeName)
- }
-
- return node
-}
-
func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string) error {
woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName)
- nodeID := woc.wf.NodeID(nodeName)
- node, ok := woc.wf.Status.Nodes[nodeID]
- if ok && node.Completed() {
+ node := woc.getNodeByName(nodeName)
+ if node != nil && node.Completed() {
woc.log.Debugf("Node %s already completed", nodeName)
return nil
}
@@ -761,19 +763,18 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume
switch tmpl.GetType() {
case wfv1.TemplateTypeContainer:
- if ok {
+ if node != nil {
if node.RetryStrategy != nil {
- if err = woc.processNodeRetries(&node); err != nil {
+ if err = woc.processNodeRetries(node); err != nil {
return err
}
-
// The updated node status could've changed. Get the latest copy of the node.
- node = woc.getNode(node.Name)
- log.Infof("Node %s: Status: %s", node.Name, node.Phase)
+ node = woc.getNodeByName(node.Name)
+ fmt.Printf("Node %s: Status: %s\n", node.Name, node.Phase)
if node.Completed() {
return nil
}
- lastChildNode, err := woc.getLastChildNode(&node)
+ lastChildNode, err := woc.getLastChildNode(node)
if err != nil {
return err
}
@@ -803,7 +804,7 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume
retries := wfv1.RetryStrategy{}
node.RetryStrategy = &retries
node.RetryStrategy.Limit = tmpl.RetryStrategy.Limit
- woc.wf.Status.Nodes[nodeID] = *node
+ woc.wf.Status.Nodes[node.ID] = *node
// Create new node as child of 'node'
newContainerName := fmt.Sprintf("%s(%d)", nodeName, len(node.Children))
@@ -815,21 +816,19 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume
// We have not yet created the pod
err = woc.executeContainer(nodeToExecute, tmpl)
case wfv1.TemplateTypeSteps:
- if !ok {
- node = *woc.markNodePhase(nodeName, wfv1.NodeRunning)
- woc.log.Infof("Initialized workflow node %v", node)
- }
err = woc.executeSteps(nodeName, tmpl)
case wfv1.TemplateTypeScript:
- if !ok {
+ if node == nil {
err = woc.executeScript(nodeName, tmpl)
}
case wfv1.TemplateTypeResource:
- if !ok {
+ if node == nil {
err = woc.executeResource(nodeName, tmpl)
}
+ case wfv1.TemplateTypeDAG:
+ _ = woc.executeDAG(nodeName, tmpl)
default:
- err = errors.Errorf("Template '%s' missing specification", tmpl.Name)
+ err = errors.Errorf(errors.CodeBadRequest, "Template '%s' missing specification", tmpl.Name)
woc.markNodeError(nodeName, err)
}
if err != nil {
@@ -905,18 +904,28 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase,
Phase: phase,
StartedAt: metav1.Time{Time: time.Now().UTC()},
}
+ woc.log.Infof("node %s initialized %s", node, node.Phase)
+ woc.updated = true
} else {
- node.Phase = phase
+ if node.Phase != phase {
+ woc.log.Infof("node %s phase %s -> %s", node, node.Phase, phase)
+ node.Phase = phase
+ woc.updated = true
+ }
}
if len(message) > 0 {
- node.Message = message[0]
+ if message[0] != node.Message {
+ woc.log.Infof("node %s message: %s", node, message[0])
+ node.Message = message[0]
+ woc.updated = true
+ }
}
if node.Completed() && node.FinishedAt.IsZero() {
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
+ woc.log.Infof("node %s finished: %s", node, node.FinishedAt)
+ woc.updated = true
}
woc.wf.Status.Nodes[nodeID] = node
- woc.updated = true
- woc.log.Debugf("Marked node %s %s", nodeName, phase)
return &node
}
@@ -926,17 +935,44 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS
}
func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template) error {
+ node := woc.getNodeByName(nodeName)
+ if node != nil && node.Phase == wfv1.NodeRunning {
+ // we already marked the node running. pod should have already been created
+ return nil
+ }
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl)
if err != nil {
woc.markNodeError(nodeName, err)
return err
}
- node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node = woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node.IsPod = true
+ woc.wf.Status.Nodes[node.ID] = *node
woc.log.Infof("Initialized container node %v", node)
return nil
}
+func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
+ node := woc.wf.Status.Nodes[nodeID]
+ if node.IsPod {
+ return []string{node.ID}
+ }
+ outbound := make([]string, 0)
+ for _, outboundNodeID := range node.OutboundNodes {
+ outNode := woc.wf.Status.Nodes[outboundNodeID]
+ if outNode.IsPod {
+ outbound = append(outbound, outboundNodeID)
+ } else {
+ subOutIDs := woc.getOutboundNodes(outboundNodeID)
+ for _, subOutID := range subOutIDs {
+ outbound = append(outbound, subOutID)
+ }
+ }
+ }
+ return outbound
+}
+
// getTemplateOutputsFromScope resolves a template's outputs from the scope of the template
func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Outputs, error) {
if !tmpl.Outputs.HasOutputs() {
@@ -981,10 +1017,46 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template) e
return err
}
node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node.IsPod = true
+ woc.wf.Status.Nodes[node.ID] = *node
woc.log.Infof("Initialized script node %v", node)
return nil
}
+// addNodeOutputsToScope adds all of a nodes outputs to the scope with the given prefix
+func (wfs *wfScope) addNodeOutputsToScope(prefix string, node *wfv1.NodeStatus) {
+ if node.PodIP != "" {
+ key := fmt.Sprintf("%s.ip", prefix)
+ wfs.addParamToScope(key, node.PodIP)
+ }
+ if node.Outputs != nil {
+ if node.Outputs.Result != nil {
+ key := fmt.Sprintf("%s.outputs.result", prefix)
+ wfs.addParamToScope(key, *node.Outputs.Result)
+ }
+ for _, outParam := range node.Outputs.Parameters {
+ key := fmt.Sprintf("%s.outputs.parameters.%s", prefix, outParam.Name)
+ wfs.addParamToScope(key, *outParam.Value)
+ }
+ for _, outArt := range node.Outputs.Artifacts {
+ key := fmt.Sprintf("%s.outputs.artifacts.%s", prefix, outArt.Name)
+ wfs.addArtifactToScope(key, outArt)
+ }
+ }
+}
+
+// replaceMap returns a replacement map of strings intended to be used simple string substitution
+func (wfs *wfScope) replaceMap() map[string]string {
+ replaceMap := make(map[string]string)
+ for key, val := range wfs.scope {
+ valStr, ok := val.(string)
+ if ok {
+ replaceMap[key] = valStr
+ }
+ }
+ return replaceMap
+}
+
func (wfs *wfScope) addParamToScope(key, val string) {
wfs.scope[key] = val
}
@@ -1037,6 +1109,7 @@ func (wfs *wfScope) resolveArtifact(v string) (*wfv1.Artifact, error) {
}
// addChildNode adds a nodeID as a child to a parent
+// parent and child are both node names
func (woc *wfOperationCtx) addChildNode(parent string, child string) {
parentID := woc.wf.NodeID(parent)
childID := woc.wf.NodeID(child)
@@ -1075,6 +1148,8 @@ func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template)
return err
}
node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ node.IsPod = true
+ woc.wf.Status.Nodes[node.ID] = *node
woc.log.Infof("Initialized resource node %v", node)
return nil
}
diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go
index 35321033c7cc..028b980494fe 100644
--- a/workflow/controller/operator_test.go
+++ b/workflow/controller/operator_test.go
@@ -47,40 +47,40 @@ func TestProcessNodesWithRetries(t *testing.T) {
woc.addChildNode(nodeName, childNode)
}
- n := woc.getNode(nodeName)
- lastChild, err = woc.getLastChildNode(&n)
+ n := woc.getNodeByName(nodeName)
+ lastChild, err = woc.getLastChildNode(n)
assert.Nil(t, err)
assert.NotNil(t, lastChild)
// Last child is still running. processNodesWithRetries() should return false since
// there should be no retries at this point.
- err = woc.processNodeRetries(&n)
+ err = woc.processNodeRetries(n)
assert.Nil(t, err)
- n = woc.getNode(nodeName)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeRunning)
// Mark lastChild as successful.
woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded)
- err = woc.processNodeRetries(&n)
+ err = woc.processNodeRetries(n)
assert.Nil(t, err)
// The parent node also gets marked as Succeeded.
- n = woc.getNode(nodeName)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeSucceeded)
// Mark the parent node as running again and the lastChild as failed.
woc.markNodePhase(n.Name, wfv1.NodeRunning)
woc.markNodePhase(lastChild.Name, wfv1.NodeFailed)
- woc.processNodeRetries(&n)
- n = woc.getNode(nodeName)
+ woc.processNodeRetries(n)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeRunning)
// Add a third node that has failed.
childNode := "child-node-3"
woc.markNodePhase(childNode, wfv1.NodeFailed)
woc.addChildNode(nodeName, childNode)
- n = woc.getNode(nodeName)
- err = woc.processNodeRetries(&n)
+ n = woc.getNodeByName(nodeName)
+ err = woc.processNodeRetries(n)
assert.Nil(t, err)
- n = woc.getNode(nodeName)
+ n = woc.getNodeByName(nodeName)
assert.Equal(t, n.Phase, wfv1.NodeFailed)
}
diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go
index 60abff4021be..be41e4c9cb44 100644
--- a/workflow/controller/steps.go
+++ b/workflow/controller/steps.go
@@ -16,7 +16,11 @@ import (
)
func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) error {
+ node := woc.getNodeByName(nodeName)
nodeID := woc.wf.NodeID(nodeName)
+ if node == nil {
+ node = woc.markNodePhase(nodeName, wfv1.NodeRunning)
+ }
defer func() {
if woc.wf.Status.Nodes[nodeID].Completed() {
_ = woc.killDeamonedChildren(nodeID)
@@ -28,29 +32,45 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) er
}
for i, stepGroup := range tmpl.Steps {
sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i)
- woc.addChildNode(nodeName, sgNodeName)
+ sgNode := woc.getNodeByName(sgNodeName)
+ if sgNode == nil {
+ // initialize the step group
+ sgNode = woc.markNodePhase(sgNodeName, wfv1.NodeRunning)
+ if i == 0 {
+ woc.addChildNode(nodeName, sgNodeName)
+ } else {
+ // This logic will connect all the outbound nodes of the previous
+ // step group as parents to the current step group node
+ prevStepGroupName := fmt.Sprintf("%s[%d]", nodeName, i-1)
+ prevStepGroupNode := woc.getNodeByName(prevStepGroupName)
+ for _, childID := range prevStepGroupNode.Children {
+ outboundNodeIDs := woc.getOutboundNodes(childID)
+ woc.log.Infof("SG Outbound nodes of %s are %s", childID, outboundNodeIDs)
+ for _, outNodeID := range outboundNodeIDs {
+ woc.addChildNode(woc.wf.Status.Nodes[outNodeID].Name, sgNodeName)
+ }
+ }
+ }
+ }
err := woc.executeStepGroup(stepGroup, sgNodeName, &scope)
if err != nil {
- if errors.IsCode(errors.CodeTimeout, err) {
- return err
+ if !errors.IsCode(errors.CodeTimeout, err) {
+ woc.markNodeError(nodeName, err)
}
- woc.markNodeError(nodeName, err)
return err
}
- sgNodeID := woc.wf.NodeID(sgNodeName)
- if !woc.wf.Status.Nodes[sgNodeID].Completed() {
- woc.log.Infof("Workflow step group node %v not yet completed", woc.wf.Status.Nodes[sgNodeID])
+ if !sgNode.Completed() {
+ woc.log.Infof("Workflow step group node %v not yet completed", sgNode)
return nil
}
- if !woc.wf.Status.Nodes[sgNodeID].Successful() {
- failMessage := fmt.Sprintf("step group %s was unsuccessful", sgNodeName)
+ if !sgNode.Successful() {
+ failMessage := fmt.Sprintf("step group %s was unsuccessful", sgNode)
woc.log.Info(failMessage)
woc.markNodePhase(nodeName, wfv1.NodeFailed, failMessage)
return nil
}
- // HACK: need better way to add children to scope
for _, step := range stepGroup {
childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)
childNodeID := woc.wf.NodeID(childNodeName)
@@ -61,26 +81,11 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) er
// are not easily referenceable by user.
continue
}
- if childNode.PodIP != "" {
- key := fmt.Sprintf("steps.%s.ip", step.Name)
- scope.addParamToScope(key, childNode.PodIP)
- }
- if childNode.Outputs != nil {
- if childNode.Outputs.Result != nil {
- key := fmt.Sprintf("steps.%s.outputs.result", step.Name)
- scope.addParamToScope(key, *childNode.Outputs.Result)
- }
- for _, outParam := range childNode.Outputs.Parameters {
- key := fmt.Sprintf("steps.%s.outputs.parameters.%s", step.Name, outParam.Name)
- scope.addParamToScope(key, *outParam.Value)
- }
- for _, outArt := range childNode.Outputs.Artifacts {
- key := fmt.Sprintf("steps.%s.outputs.artifacts.%s", step.Name, outArt.Name)
- scope.addArtifactToScope(key, outArt)
- }
- }
+ prefix := fmt.Sprintf("steps.%s", step.Name)
+ scope.addNodeOutputsToScope(prefix, &childNode)
}
}
+ // If this template has outputs from any of its steps, copy them to this node here
outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
if err != nil {
woc.markNodeError(nodeName, err)
@@ -91,6 +96,21 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) er
node.Outputs = outputs
woc.wf.Status.Nodes[nodeID] = node
}
+ // Now that we have completed, set the outbound nodes from the last step group
+ outbound := make([]string, 0)
+ lastSGNode := woc.getNodeByName(fmt.Sprintf("%s[%d]", nodeName, len(tmpl.Steps)-1))
+ for _, childID := range lastSGNode.Children {
+ outboundNodeIDs := woc.getOutboundNodes(childID)
+ woc.log.Infof("Outbound nodes of %s is %s", childID, outboundNodeIDs)
+ for _, outNodeID := range outboundNodeIDs {
+ outbound = append(outbound, outNodeID)
+ }
+ }
+ node = woc.getNodeByName(nodeName)
+ woc.log.Infof("Outbound nodes of %s is %s", node.ID, outbound)
+ node.OutboundNodes = outbound
+ woc.wf.Status.Nodes[node.ID] = *node
+
woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return nil
}