Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed parent level memoization broken. Fixes #11612 #11623

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,30 +1812,15 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, err), err
}

// Check if this is a fulfilled node for synchronization.
// If so, release synchronization and return this node. No more logic will be executed.
if node != nil {
if node.Fulfilled() {
fulfilledNode := woc.handleNodeFulfilled(nodeName, node, processedTmpl)
if fulfilledNode != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)

woc.log.Debugf("Node %s already completed", nodeName)
if processedTmpl.Metrics != nil {
// Check if this node completed between executions. If it did, emit metrics. If a node completes within
// the same execution, its metrics are emitted below.
// We can infer that this node completed during the current operation, emit metrics
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
}
return node, nil
return fulfilledNode, nil
}
woc.log.Debugf("Executing node %s of %s is %s", nodeName, node.Type, node.Phase)
// Memoized nodes don't have StartedAt.
if node.StartedAt.IsZero() {
node.StartedAt = metav1.Time{Time: time.Now().UTC()}
node.EstimatedDuration = woc.estimateNodeDuration(node.Name)
woc.wf.Status.Nodes.Set(node.ID, *node)
woc.updated = true
}
}

// Check if we took too long operating on this workflow and immediately return if we did
Expand Down Expand Up @@ -1952,6 +1937,22 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}
}

// Check if this is a fulfilled node for memoization.
// If so, just return this node. No more logic will be executed.
if node != nil {
fulfilledNode := woc.handleNodeFulfilled(nodeName, node, processedTmpl)
if fulfilledNode != nil {
return fulfilledNode, nil
}
// Memoized nodes don't have StartedAt.
if node.StartedAt.IsZero() {
node.StartedAt = metav1.Time{Time: time.Now().UTC()}
node.EstimatedDuration = woc.estimateNodeDuration(node.Name)
woc.wf.Status.Nodes.Set(node.ID, *node)
woc.updated = true
}
}

// If the user has specified retries, node becomes a special retry node.
// This node acts as a parent of all retries that will be done for
// the container. The status of this node should be "Success" if any
Expand Down Expand Up @@ -2143,6 +2144,24 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
return node, nil
}

func (woc *wfOperationCtx) handleNodeFulfilled(nodeName string, node *wfv1.NodeStatus, processedTmpl *wfv1.Template) *wfv1.NodeStatus {
if node == nil || !node.Fulfilled() {
return nil
}

woc.log.Debugf("Node %s already completed", nodeName)

if processedTmpl.Metrics != nil {
// Check if this node completed between executions. If it did, emit metrics.
// We can infer that this node completed during the current operation, emit metrics
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
}
return node
}

// Checks if the template has exceeded its deadline
func (woc *wfOperationCtx) checkTemplateTimeout(tmpl *wfv1.Template, node *wfv1.NodeStatus) (*time.Time, error) {
if node == nil {
Expand Down
229 changes: 229 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9288,3 +9288,232 @@ spec:
assert.Equal(t, woc.wf.Status.Phase, wfv1.WorkflowFailed)
assert.Contains(t, woc.wf.Status.Message, "invalid spec")
}

var workflowWithTemplateLevelMemoizationAndChildStep = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
namespace: default
generateName: memoized-entrypoint-
spec:
entrypoint: entrypoint
templates:
- name: entrypoint
memoize:
key: "entrypoint-key-1"
cache:
configMap:
name: cache-top-entrypoint
outputs:
parameters:
- name: url
valueFrom:
expression: |
'https://argo-workflows.company.com/workflows/namepace/' + '{{workflow.name}}' + '?tab=workflow'
steps:
- - name: whalesay
template: whalesay

- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello_world $(date) > /tmp/hello_world.txt"]
outputs:
parameters:
- name: hello
valueFrom:
path: /tmp/hello_world.txt
`

func TestMemoizationTemplateLevelCacheWithStepWithoutCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildStep)

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Expect both workflowTemplate and the step to be executed
for _, node := range woc.wf.Status.Nodes {
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.False(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.True(t, true, "Whalesay step does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
}
}
}

func TestMemoizationTemplateLevelCacheWithStepWithCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildStep)

// Assume cache is already set
sampleConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"entrypoint-key-1": `{"ExpiresAt":"2020-06-18T17:11:05Z","NodeID":"memoize-abx4124-123129321123","Outputs":{}}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "cache-top-entrypoint",
ResourceVersion: "1630732",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

_, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Only parent node should exist and it should be a memoization cache hit
for _, node := range woc.wf.Status.Nodes {
t.Log(node)
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.True(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.False(t, true, "Whalesay step should not have been executed")
}
}
}

var workflowWithTemplateLevelMemoizationAndChildDag = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
namespace: default
generateName: memoized-entrypoint-
spec:
entrypoint: entrypoint
templates:
- name: entrypoint
dag:
tasks:
- name: whalesay-task
template: whalesay
memoize:
key: "entrypoint-key-1"
cache:
configMap:
name: cache-top-entrypoint
outputs:
parameters:
- name: url
valueFrom:
expression: |
'https://argo-workflows.company.com/workflows/namepace/' + '{{workflow.name}}' + '?tab=workflow'

- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello_world $(date) > /tmp/hello_world.txt"]
outputs:
parameters:
- name: hello
valueFrom:
path: /tmp/hello_world.txt
`

func TestMemoizationTemplateLevelCacheWithDagWithoutCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildDag)

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Expect both workflowTemplate and the dag to be executed
for _, node := range woc.wf.Status.Nodes {
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.False(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.True(t, true, "Whalesay dag does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
}
}
}

func TestMemoizationTemplateLevelCacheWithDagWithCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildDag)

// Assume cache is already set
sampleConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"entrypoint-key-1": `{"ExpiresAt":"2020-06-18T17:11:05Z","NodeID":"memoize-abx4124-123129321123","Outputs":{}}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "cache-top-entrypoint",
ResourceVersion: "1630732",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

_, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Only parent node should exist and it should be a memoization cache hit
for _, node := range woc.wf.Status.Nodes {
t.Log(node)
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.True(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.False(t, true, "Whalesay dag should not have been executed")
}
}
}