From 53d6aa252a25a446aef305224df09ec8ebefa43b Mon Sep 17 00:00:00 2001 From: shmruin Date: Sat, 19 Aug 2023 19:18:07 +0900 Subject: [PATCH] fix: Fixed parent level memoization broken. Fixes #11612 Signed-off-by: shmruin --- workflow/controller/operator.go | 59 ++++--- workflow/controller/operator_test.go | 229 +++++++++++++++++++++++++++ 2 files changed, 268 insertions(+), 20 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ef3df0c29eda..1860fe753d4e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1812,30 +1812,15 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, err), err } + // Check if this is a fulfilled node for synchronization. + // If so, release synchronization and return this node. No more logic will be executed. if node != nil { - if node.Fulfilled() { + fulfilledNode := woc.handleNodeFulfilled(nodeName, node, processedTmpl) + if fulfilledNode != nil { woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) - - woc.log.Debugf("Node %s already completed", nodeName) - if processedTmpl.Metrics != nil { - // Check if this node completed between executions. If it did, emit metrics. If a node completes within - // the same execution, its metrics are emitted below. - // We can infer that this node completed during the current operation, emit metrics - if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() { - localScope, realTimeScope := woc.prepareMetricScope(node) - woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false) - } - } - return node, nil + return fulfilledNode, nil } woc.log.Debugf("Executing node %s of %s is %s", nodeName, node.Type, node.Phase) - // Memoized nodes don't have StartedAt. - if node.StartedAt.IsZero() { - node.StartedAt = metav1.Time{Time: time.Now().UTC()} - node.EstimatedDuration = woc.estimateNodeDuration(node.Name) - woc.wf.Status.Nodes.Set(node.ID, *node) - woc.updated = true - } } // Check if we took too long operating on this workflow and immediately return if we did @@ -1952,6 +1937,22 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } } + // Check if this is a fulfilled node for memoization. + // If so, just return this node. No more logic will be executed. + if node != nil { + fulfilledNode := woc.handleNodeFulfilled(nodeName, node, processedTmpl) + if fulfilledNode != nil { + return fulfilledNode, nil + } + // Memoized nodes don't have StartedAt. + if node.StartedAt.IsZero() { + node.StartedAt = metav1.Time{Time: time.Now().UTC()} + node.EstimatedDuration = woc.estimateNodeDuration(node.Name) + woc.wf.Status.Nodes.Set(node.ID, *node) + woc.updated = true + } + } + // If the user has specified retries, node becomes a special retry node. // This node acts as a parent of all retries that will be done for // the container. The status of this node should be "Success" if any @@ -2143,6 +2144,24 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, return node, nil } +func (woc *wfOperationCtx) handleNodeFulfilled(nodeName string, node *wfv1.NodeStatus, processedTmpl *wfv1.Template) *wfv1.NodeStatus { + if node == nil || !node.Fulfilled() { + return nil + } + + woc.log.Debugf("Node %s already completed", nodeName) + + if processedTmpl.Metrics != nil { + // Check if this node completed between executions. If it did, emit metrics. + // We can infer that this node completed during the current operation, emit metrics + if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() { + localScope, realTimeScope := woc.prepareMetricScope(node) + woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false) + } + } + return node +} + // Checks if the template has exceeded its deadline func (woc *wfOperationCtx) checkTemplateTimeout(tmpl *wfv1.Template, node *wfv1.NodeStatus) (*time.Time, error) { if node == nil { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index ed5abc1a6978..5fe8e6cde8b6 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -9288,3 +9288,232 @@ spec: assert.Equal(t, woc.wf.Status.Phase, wfv1.WorkflowFailed) assert.Contains(t, woc.wf.Status.Message, "invalid spec") } + +var workflowWithTemplateLevelMemoizationAndChildStep = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + namespace: default + generateName: memoized-entrypoint- +spec: + entrypoint: entrypoint + templates: + - name: entrypoint + memoize: + key: "entrypoint-key-1" + cache: + configMap: + name: cache-top-entrypoint + outputs: + parameters: + - name: url + valueFrom: + expression: | + 'https://argo-workflows.company.com/workflows/namepace/' + '{{workflow.name}}' + '?tab=workflow' + steps: + - - name: whalesay + template: whalesay + + - name: whalesay + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["cowsay hello_world $(date) > /tmp/hello_world.txt"] + outputs: + parameters: + - name: hello + valueFrom: + path: /tmp/hello_world.txt +` + +func TestMemoizationTemplateLevelCacheWithStepWithoutCache(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildStep) + + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + + woc := newWorkflowOperationCtx(wf, controller) + + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc.operate(ctx) + + // Expect both workflowTemplate and the step to be executed + for _, node := range woc.wf.Status.Nodes { + if node.TemplateName == "entrypoint" { + assert.True(t, true, "Entrypoint node does not exist") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + assert.False(t, node.MemoizationStatus.Hit) + } + if node.Name == "whalesay" { + assert.True(t, true, "Whalesay step does not exist") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + } + } +} + +func TestMemoizationTemplateLevelCacheWithStepWithCache(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildStep) + + // Assume cache is already set + sampleConfigMapCacheEntry := apiv1.ConfigMap{ + Data: map[string]string{ + "entrypoint-key-1": `{"ExpiresAt":"2020-06-18T17:11:05Z","NodeID":"memoize-abx4124-123129321123","Outputs":{}}`, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cache-top-entrypoint", + ResourceVersion: "1630732", + Labels: map[string]string{ + common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache, + }, + }, + } + + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + + _, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{}) + assert.NoError(t, err) + + woc := newWorkflowOperationCtx(wf, controller) + + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc.operate(ctx) + + // Only parent node should exist and it should be a memoization cache hit + for _, node := range woc.wf.Status.Nodes { + t.Log(node) + if node.TemplateName == "entrypoint" { + assert.True(t, true, "Entrypoint node does not exist") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + assert.True(t, node.MemoizationStatus.Hit) + } + if node.Name == "whalesay" { + assert.False(t, true, "Whalesay step should not have been executed") + } + } +} + +var workflowWithTemplateLevelMemoizationAndChildDag = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + namespace: default + generateName: memoized-entrypoint- +spec: + entrypoint: entrypoint + templates: + - name: entrypoint + dag: + tasks: + - name: whalesay-task + template: whalesay + memoize: + key: "entrypoint-key-1" + cache: + configMap: + name: cache-top-entrypoint + outputs: + parameters: + - name: url + valueFrom: + expression: | + 'https://argo-workflows.company.com/workflows/namepace/' + '{{workflow.name}}' + '?tab=workflow' + + - name: whalesay + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["cowsay hello_world $(date) > /tmp/hello_world.txt"] + outputs: + parameters: + - name: hello + valueFrom: + path: /tmp/hello_world.txt +` + +func TestMemoizationTemplateLevelCacheWithDagWithoutCache(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildDag) + + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + + woc := newWorkflowOperationCtx(wf, controller) + + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc.operate(ctx) + + // Expect both workflowTemplate and the dag to be executed + for _, node := range woc.wf.Status.Nodes { + if node.TemplateName == "entrypoint" { + assert.True(t, true, "Entrypoint node does not exist") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + assert.False(t, node.MemoizationStatus.Hit) + } + if node.Name == "whalesay" { + assert.True(t, true, "Whalesay dag does not exist") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + } + } +} + +func TestMemoizationTemplateLevelCacheWithDagWithCache(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildDag) + + // Assume cache is already set + sampleConfigMapCacheEntry := apiv1.ConfigMap{ + Data: map[string]string{ + "entrypoint-key-1": `{"ExpiresAt":"2020-06-18T17:11:05Z","NodeID":"memoize-abx4124-123129321123","Outputs":{}}`, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cache-top-entrypoint", + ResourceVersion: "1630732", + Labels: map[string]string{ + common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache, + }, + }, + } + + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + + _, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{}) + assert.NoError(t, err) + + woc := newWorkflowOperationCtx(wf, controller) + + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc.operate(ctx) + + // Only parent node should exist and it should be a memoization cache hit + for _, node := range woc.wf.Status.Nodes { + t.Log(node) + if node.TemplateName == "entrypoint" { + assert.True(t, true, "Entrypoint node does not exist") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + assert.True(t, node.MemoizationStatus.Hit) + } + if node.Name == "whalesay" { + assert.False(t, true, "Whalesay dag should not have been executed") + } + } +}