Skip to content

Commit

Permalink
fix: Memozie for Step and DAG level (#7028)
Browse files Browse the repository at this point in the history
* fix: Memozie for Step and DAG level

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
sarabala1979 authored and alexec committed Nov 17, 2021
1 parent 7256dac commit f43d8b0
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
131 changes: 131 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,134 @@ spec:
func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}

func (s *FunctionalSuite) TestStepLevelMemozie() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-memozie-
spec:
entrypoint: hello-hello-hello
templates:
- name: hello-hello-hello
steps:
- - name: hello1
template: memostep
arguments:
parameters: [{name: message, value: "hello1"}]
- - name: hello2a
template: memostep
arguments:
parameters: [{name: message, value: "hello1"}]
- name: memostep
inputs:
parameters:
- name: message
memoize:
key: "{{inputs.parameters.message}}"
maxAge: "10s"
cache:
configMap:
name: my-config-memo-step
steps:
- - name: cache
template: whalesay
arguments:
parameters: [{name: message, value: "{{inputs.parameters.message}}"}]
outputs:
parameters:
- name: output
valueFrom:
Parameter: "{{steps.cache.outputs.result}}"
- name: whalesay
inputs:
parameters:
- name: message
container:
image: argoproj/argosay:v2
command: [echo]
args: ["{{inputs.parameters.message}}"]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
memoHit := false
for _, node := range status.Nodes {
if node.MemoizationStatus != nil && node.MemoizationStatus.Hit {
memoHit = true
}
}
assert.True(t, memoHit)

})

}

func (s *FunctionalSuite) TestDAGLevelMemozie() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-memozie-
spec:
entrypoint: hello-hello-hello
templates:
- name: hello-hello-hello
steps:
- - name: hello1
template: memostep
arguments:
parameters: [{name: message, value: "hello1"}]
- - name: hello2a
template: memostep
arguments:
parameters: [{name: message, value: "hello1"}]
- name: memostep
inputs:
parameters:
- name: message
memoize:
key: "{{inputs.parameters.message}}"
maxAge: "10s"
cache:
configMap:
name: my-config-memo-dag
dag:
tasks:
- name: cache
template: whalesay
arguments:
parameters: [{name: message, value: "{{inputs.parameters.message}}"}]
outputs:
parameters:
- name: output
valueFrom:
Parameter: "{{tasks.cache.outputs.result}}"
- name: whalesay
inputs:
parameters:
- name: message
container:
image: argoproj/argosay:v2
command: [echo]
args: ["{{inputs.parameters.message}}"]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
memoHit := false
for _, node := range status.Nodes {
if node.MemoizationStatus != nil && node.MemoizationStatus.Hit {
memoHit = true
}
}
assert.True(t, memoHit)

})

}
10 changes: 10 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"time"

"github.com/antonmedv/expr"
log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/template"
"github.com/argoproj/argo-workflows/v3/workflow/common"
controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

Expand Down Expand Up @@ -299,6 +301,14 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
node = woc.wf.GetNodeByName(nodeName)
node.Outputs = outputs
woc.wf.Status.Nodes[node.ID] = *node
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
}

woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName)
Expand Down
11 changes: 11 additions & 0 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"time"

"github.com/Knetic/govaluate"
log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/template"
"github.com/argoproj/argo-workflows/v3/workflow/common"
controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

Expand Down Expand Up @@ -152,7 +154,16 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
node.Outputs = outputs
woc.addOutputsToGlobalScope(node.Outputs)
woc.wf.Status.Nodes[node.ID] = *node
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
}

return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil
}

Expand Down

0 comments on commit f43d8b0

Please sign in to comment.