From b52c4b71afc06c6ba7f9f1c7237e8779f65b08ff Mon Sep 17 00:00:00 2001 From: Alan Clucas Date: Mon, 17 Jul 2023 21:38:58 +0100 Subject: [PATCH] feat: Allow memoization without outputs Please note: I am not totally in favour of this change, it makes memoization work without outputs and I'm not conviced this is actually healthy, but it's what people seem to expect. See the linked issues in the fixes list below. This change will cause memoization to work for all step and dag tasks even without outputs. Note: They were working semi-erroneously for some dag tasks due to implicit outputs before this change. Fixes #11280 (raised to cover this desire) Fixes #10769 (already closed by documentation) Partially addresses #10426: Dags will memoize now, but retries still won't Signed-off-by: Alan Clucas --- docs/memoization.md | 6 ++- docs/work-avoidance.md | 2 +- workflow/controller/dag.go | 16 +++---- workflow/controller/operator_test.go | 71 ++++++++++++++++++++++++++++ workflow/controller/steps.go | 16 +++---- 5 files changed, 90 insertions(+), 21 deletions(-) diff --git a/docs/memoization.md b/docs/memoization.md index fa871952145a..4c39cd3eab0b 100644 --- a/docs/memoization.md +++ b/docs/memoization.md @@ -8,9 +8,11 @@ Workflows often have outputs that are expensive to compute. Memoization reduces cost and workflow execution time by recording the result of previously run steps: it stores the outputs of a template into a specified cache with a variable key. -Memoization only works for steps which have outputs, if you attempt to use it on steps which do not it should not work (there are some cases where it does, but they shouldn't). It is designed for 'pure' steps, where the purpose of running the step is to calculate some outputs based upon the step's inputs, and only the inputs. Pure steps should not interact with the outside world, but workflows won't enforce this on you. +Prior to version 3.5 memoization only works for steps which have outputs, if you attempt to use it on steps which do not it should not work (there are some cases where it does, but they shouldn't). It was designed for 'pure' steps, where the purpose of running the step is to calculate some outputs based upon the step's inputs, and only the inputs. Pure steps should not interact with the outside world, but workflows won't enforce this on you. -If your steps are not there to create outputs, but you'd still like to skip running them, you should look at the [work avoidance](work-avoidance.md) technique instead of memoization. +If you are using workflows prior to version 3.5 you should look at the [work avoidance](work-avoidance.md) technique instead of memoization if your steps don't have outputs. + +In version 3.5 or later all steps can be memoized, whether or not they have outputs. ## Cache Method diff --git a/docs/work-avoidance.md b/docs/work-avoidance.md index 726087ae7097..6909cfa932ba 100644 --- a/docs/work-avoidance.md +++ b/docs/work-avoidance.md @@ -4,7 +4,7 @@ You can make workflows faster and more robust by employing **work avoidance**. A workflow that utilizes this is simply a workflow containing steps that do not run if the work has already been done. -This technique is similar to [memoization](memoization.md) but they have distinct use cases. Work avoidance is totally in your control and you make the decisions as to have to skip the work. [Memoization](memoization.md) is a feature of Argo Workflows to automatically skip steps which generate outputs - it is designed for pure steps which only generate output based on their inputs. +This is a technique is similar to [memoization](memoization.md). Work avoidance is totally in your control and you make the decisions as to have to skip the work. [Memoization](memoization.md) is a feature of Argo Workflows to automatically skip steps which generate outputs. Prior to version 3.5 this required `outputs` to be specified, but you can use memoization for all steps and tasks in version 3.5 or later. This simplest way to do this is to use **marker files**. diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 9668997ff2f9..431149059682 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -313,19 +313,17 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl return node, err } if outputs != nil { - node = woc.wf.GetNodeByName(nodeName) node.Outputs = outputs woc.wf.Status.Nodes[node.ID] = *node - if node.MemoizationStatus != nil { - c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) - err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs) - if err != nil { - woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") - node.Phase = wfv1.NodeError - } + } + if node.MemoizationStatus != nil { + c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) + err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs) + if err != nil { + woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") + node.Phase = wfv1.NodeError } } - woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName) return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 9cd873cb1874..2fcb49b89392 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -5128,6 +5128,77 @@ func TestConfigMapCacheLoadOperate(t *testing.T) { } } +var workflowCachedNoOutputs = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: memoized-workflow-test + namespace: default +spec: + entrypoint: whalesay + arguments: + parameters: + - name: message + value: hi-there-world + templates: + - name: whalesay + inputs: + parameters: + - name: message + memoize: + key: "{{inputs.parameters.message}}" + cache: + configMap: + name: whalesay-cache + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["sleep 10; cowsay {{inputs.parameters.message}} > /tmp/hello_world.txt"] + outputs: + parameters: + - name: hello + valueFrom: + path: /tmp/hello_world.txt +` + +func TestConfigMapCacheLoadOperateNoOutputs(t *testing.T) { + sampleConfigMapCacheEntry := apiv1.ConfigMap{ + Data: map[string]string{ + "hi-there-world": `{"nodeID":"memoized-simple-workflow-5wj2p","outputs":null,"creationTimestamp":"2020-09-21T18:12:56Z"}`, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "whalesay-cache", + ResourceVersion: "1630732", + Labels: map[string]string{ + common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache, + }, + }, + } + wf := wfv1.MustUnmarshalWorkflow(workflowCachedNoOutputs) + cancel, controller := newController() + defer cancel() + + ctx := context.Background() + _, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{}) + assert.NoError(t, err) + _, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{}) + assert.NoError(t, err) + + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + + if assert.Len(t, woc.wf.Status.Nodes, 1) { + for _, node := range woc.wf.Status.Nodes { + assert.Nil(t, node.Outputs) + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + } + } +} + var workflowCachedMaxAge = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index 5ca4ff65ead9..38d27f5af0ce 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -151,20 +151,18 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm return node, err } if outputs != nil { - node := woc.wf.GetNodeByName(nodeName) node.Outputs = outputs woc.addOutputsToGlobalScope(node.Outputs) woc.wf.Status.Nodes[node.ID] = *node - if node.MemoizationStatus != nil { - c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) - err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs) - if err != nil { - woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") - node.Phase = wfv1.NodeError - } + } + if node.MemoizationStatus != nil { + c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) + err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs) + if err != nil { + woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") + node.Phase = wfv1.NodeError } } - return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil }