Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow memoization without outputs #11379

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/memoization.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ Workflows often have outputs that are expensive to compute.
Memoization reduces cost and workflow execution time by recording the result of previously run steps:
it stores the outputs of a template into a specified cache with a variable key.

Memoization only works for steps which have outputs, if you attempt to use it on steps which do not it should not work (there are some cases where it does, but they shouldn't). It is designed for 'pure' steps, where the purpose of running the step is to calculate some outputs based upon the step's inputs, and only the inputs. Pure steps should not interact with the outside world, but workflows won't enforce this on you.
Prior to version 3.5 memoization only works for steps which have outputs, if you attempt to use it on steps which do not it should not work (there are some cases where it does, but they shouldn't). It was designed for 'pure' steps, where the purpose of running the step is to calculate some outputs based upon the step's inputs, and only the inputs. Pure steps should not interact with the outside world, but workflows won't enforce this on you.

If your steps are not there to create outputs, but you'd still like to skip running them, you should look at the [work avoidance](work-avoidance.md) technique instead of memoization.
If you are using workflows prior to version 3.5 you should look at the [work avoidance](work-avoidance.md) technique instead of memoization if your steps don't have outputs.

In version 3.5 or later all steps can be memoized, whether or not they have outputs.

## Cache Method

Expand Down
2 changes: 1 addition & 1 deletion docs/work-avoidance.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

You can make workflows faster and more robust by employing **work avoidance**. A workflow that utilizes this is simply a workflow containing steps that do not run if the work has already been done.

This technique is similar to [memoization](memoization.md) but they have distinct use cases. Work avoidance is totally in your control and you make the decisions as to have to skip the work. [Memoization](memoization.md) is a feature of Argo Workflows to automatically skip steps which generate outputs - it is designed for pure steps which only generate output based on their inputs.
This is a technique is similar to [memoization](memoization.md). Work avoidance is totally in your control and you make the decisions as to have to skip the work. [Memoization](memoization.md) is a feature of Argo Workflows to automatically skip steps which generate outputs. Prior to version 3.5 this required `outputs` to be specified, but you can use memoization for all steps and tasks in version 3.5 or later.

This simplest way to do this is to use **marker files**.

Expand Down
16 changes: 7 additions & 9 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,17 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
return node, err
}
if outputs != nil {
node = woc.wf.GetNodeByName(nodeName)
node.Outputs = outputs
woc.wf.Status.Nodes[node.ID] = *node
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}

woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName)

return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil
Expand Down
71 changes: 71 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5128,6 +5128,77 @@ func TestConfigMapCacheLoadOperate(t *testing.T) {
}
}

var workflowCachedNoOutputs = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: memoized-workflow-test
namespace: default
spec:
entrypoint: whalesay
arguments:
parameters:
- name: message
value: hi-there-world
templates:
- name: whalesay
inputs:
parameters:
- name: message
memoize:
key: "{{inputs.parameters.message}}"
cache:
configMap:
name: whalesay-cache
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["sleep 10; cowsay {{inputs.parameters.message}} > /tmp/hello_world.txt"]
outputs:
parameters:
- name: hello
valueFrom:
path: /tmp/hello_world.txt
`

func TestConfigMapCacheLoadOperateNoOutputs(t *testing.T) {
sampleConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"hi-there-world": `{"nodeID":"memoized-simple-workflow-5wj2p","outputs":null,"creationTimestamp":"2020-09-21T18:12:56Z"}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "whalesay-cache",
ResourceVersion: "1630732",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}
wf := wfv1.MustUnmarshalWorkflow(workflowCachedNoOutputs)
cancel, controller := newController()
defer cancel()

ctx := context.Background()
_, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

if assert.Len(t, woc.wf.Status.Nodes, 1) {
for _, node := range woc.wf.Status.Nodes {
assert.Nil(t, node.Outputs)
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
}
}
}

var workflowCachedMaxAge = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
16 changes: 7 additions & 9 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,18 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
return node, err
}
if outputs != nil {
node := woc.wf.GetNodeByName(nodeName)
node.Outputs = outputs
woc.addOutputsToGlobalScope(node.Outputs)
woc.wf.Status.Nodes[node.ID] = *node
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}

return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil
}

Expand Down