Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Add failFast flag to DAG and Step templates #5315

Merged
merged 12 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ GIT_TREE_STATE := $(shell if [ -z "`git status --porcelain`" ]; then echo
RELEASE_TAG := $(shell if [[ "$(GIT_TAG)" =~ ^v[0-9]+\.[0-9]+\.[0-9]+.*$$ ]]; then echo "true"; else echo "false"; fi)
DEV_BRANCH := $(shell [ $(GIT_BRANCH) = master ] || [ `echo $(GIT_BRANCH) | cut -c -8` = release- ] || [ $(RELEASE_TAG) = true ] && echo false || echo true)

GREP_LOGS := ""

Comment on lines +17 to +18
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds a grep string for goreman output

# docker image publishing options
IMAGE_NAMESPACE ?= argoproj
DEV_IMAGE ?= $(shell [ `uname -s` = Darwin ] && echo true || echo false)
Expand Down Expand Up @@ -463,7 +465,7 @@ endif
./hack/port-forward.sh
ifeq ($(RUN_MODE),local)
killall goreman || true
env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) IMAGE_NAMESPACE=$(IMAGE_NAMESPACE) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start controller argo-server $(shell [ $(START_UI) = false ]&& echo ui || echo)
env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) IMAGE_NAMESPACE=$(IMAGE_NAMESPACE) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start controller argo-server $(shell [ $(START_UI) = false ]&& echo ui || echo) $(shell if [ -z $GREP_LOGS ]; then echo; else echo "| grep \"$(GREP_LOGS)\""; fi)
endif

$(GOPATH)/bin/stern:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4255,6 +4255,9 @@ spec:
type: object
type: array
type: object
maxFailed:
format: int64
type: integer
memoize:
properties:
cache:
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/full/argoproj.io_cronworkflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4276,6 +4276,9 @@ spec:
type: object
type: array
type: object
maxFailed:
format: int64
type: integer
memoize:
properties:
cache:
Expand Down
9 changes: 9 additions & 0 deletions manifests/base/crds/full/argoproj.io_workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4264,6 +4264,9 @@ spec:
type: object
type: array
type: object
maxFailed:
format: int64
type: integer
memoize:
properties:
cache:
Expand Down Expand Up @@ -12611,6 +12614,9 @@ spec:
type: object
type: array
type: object
maxFailed:
format: int64
type: integer
memoize:
properties:
cache:
Expand Down Expand Up @@ -19234,6 +19240,9 @@ spec:
type: object
type: array
type: object
maxFailed:
format: int64
type: integer
memoize:
properties:
cache:
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/full/argoproj.io_workflowtemplates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4254,6 +4254,9 @@ spec:
type: object
type: array
type: object
maxFailed:
format: int64
type: integer
memoize:
properties:
cache:
Expand Down
861 changes: 449 additions & 412 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,10 @@ type Template struct {
// pods created by those templates will not be counted towards this total.
Parallelism *int64 `json:"parallelism,omitempty" protobuf:"bytes,23,opt,name=parallelism"`

// FailFast, if specified, will fail this template if any of its child pods has failed. This is useful for when this
// template is expanded with `withItems`, etc.
FailFast *bool `json:"failFast,omitempty" protobuf:"varint,41,opt,name=failFast"`

// Tolerations to apply to workflow pods.
// +patchStrategy=merge
// +patchMergeKey=key
Expand Down
98 changes: 98 additions & 0 deletions workflow/controller/node_counters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package controller

import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

type counter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactoring is a good idea

key counterType
ifNode func(wfv1.NodeStatus) bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment these fields?

}

type counterType string

const (
counterTypeActivePods = "activePods"
counterTypeActiveChildren = "activeChildren"
counterTypeFailedOrErroredChildren = "failedOrErroredChildren"
)

func getActivePodsCounter(boundaryID string) counter {
return counter{
key: counterTypeActivePods,
ifNode: func(node wfv1.NodeStatus) bool {
return node.Type == wfv1.NodeTypePod &&
// Only count pods that match the provided boundaryID, or all if no boundaryID was provided
(boundaryID == "" || node.BoundaryID == boundaryID) &&
// Only count Running or Pending pods
(node.Phase == wfv1.NodePending || node.Phase == wfv1.NodeRunning) &&
// Only count pods that are NOT waiting for a lock
(node.SynchronizationStatus == nil || node.SynchronizationStatus.Waiting == "")
},
}
}

func getActiveChildrenCounter(boundaryID string) counter {
return counter{
key: counterTypeActiveChildren,
ifNode: func(node wfv1.NodeStatus) bool {
return node.BoundaryID == boundaryID &&
// Only count Pods, Steps, or DAGs
(node.Type == wfv1.NodeTypePod || node.Type == wfv1.NodeTypeSteps || node.Type == wfv1.NodeTypeDAG) &&
// Only count Running or Pending nodes
(node.Phase == wfv1.NodePending || node.Phase == wfv1.NodeRunning)
},
}
}

func getFailedOrErroredChildrenCounter(boundaryID string) counter {
return counter{
key: counterTypeFailedOrErroredChildren,
ifNode: func(node wfv1.NodeStatus) bool {
return node.BoundaryID == boundaryID &&
// Only count Pods, Steps, or DAGs
(node.Type == wfv1.NodeTypePod || node.Type == wfv1.NodeTypeSteps || node.Type == wfv1.NodeTypeDAG) &&
// Only count Failed or Errored nodes
(node.Phase == wfv1.NodeFailed || node.Phase == wfv1.NodeError)
},
}
}

type count map[counterType]int

func (c count) addKeyIfNotPresent(key counterType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d merge this logic into “count”

if _, ok := c[key]; !ok {
c[key] = 0
}
}

func (c count) count(key counterType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this “inc”

c[key]++
}

func (c count) getCount() int {
if len(c) != 1 {
panic("getCount applied to a count with multiple types")
}
for _, val := range c {
return val
}
panic("unreachable: we know count has exactly one element and it wasn't returned")
}

func (c count) getCountType(key counterType) int {
return c[key]
}

func (woc *wfOperationCtx) countNodes(counters ...counter) count {
count := make(count)
for _, node := range woc.wf.Status.Nodes {
for _, counter := range counters {
count.addKeyIfNotPresent(counter.key)
if counter.ifNode(node) {
count.count(counter.key)
}
}
}
return count
}
61 changes: 61 additions & 0 deletions workflow/controller/node_counters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package controller

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func getWfOperationCtx() *wfOperationCtx {
return &wfOperationCtx{
wf: &v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
Nodes: map[string]v1alpha1.NodeStatus{
"1": {Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodeSucceeded, BoundaryID: "1"},
"2": {Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodeFailed, BoundaryID: "1"},
"3": {Type: v1alpha1.NodeTypeSteps, Phase: v1alpha1.NodeFailed, BoundaryID: "1"},
"4": {Type: v1alpha1.NodeTypeDAG, Phase: v1alpha1.NodeError, BoundaryID: "1"},
"5": {Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodeRunning, BoundaryID: "1"},
"5a": {Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodeRunning, BoundaryID: "1", SynchronizationStatus: &v1alpha1.NodeSynchronizationStatus{Waiting: "yes"}},
"6": {Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodePending, BoundaryID: "1"},
"7": {Type: v1alpha1.NodeTypeSteps, Phase: v1alpha1.NodeRunning, BoundaryID: "1"},
"8": {Type: v1alpha1.NodeTypeDAG, Phase: v1alpha1.NodePending, BoundaryID: "1"},

"9": {Type: v1alpha1.NodeTypeSteps, Phase: v1alpha1.NodeFailed, BoundaryID: "2"},
"10": {Type: v1alpha1.NodeTypeDAG, Phase: v1alpha1.NodeError, BoundaryID: "2"},
"11": {Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodeRunning, BoundaryID: "2"},
"12": {Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodePending, BoundaryID: "2"},
},
},
},
}
}

func TestCounters(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m assuming you’ve checked your code coverage.

woc := getWfOperationCtx()

activePod := woc.countNodes(getActivePodsCounter("1")).getCount()
assert.Equal(t, 2, activePod)

// No BoundaryID requested
activePod = woc.countNodes(getActivePodsCounter("")).getCount()
assert.Equal(t, 4, activePod)

activeChild := woc.countNodes(getActiveChildrenCounter("1")).getCount()
assert.Equal(t, 5, activeChild)

failedOrErroredChildren := woc.countNodes(getFailedOrErroredChildrenCounter("1")).getCount()
assert.Equal(t, 3, failedOrErroredChildren)

counts := woc.countNodes(getActivePodsCounter("1"), getActiveChildrenCounter("1"), getFailedOrErroredChildrenCounter("1"))
assert.Len(t, counts, 3)
assert.Panics(t, func() {
// counts has more than one element, the getCount shortcut shouldn't work
counts.getCount()
})
assert.Equal(t, 2, counts.getCountType(counterTypeActivePods))
assert.Equal(t, 5, counts.getCountType(counterTypeActiveChildren))
assert.Equal(t, 3, counts.getCountType(counterTypeFailedOrErroredChildren))
}
Loading