Skip to content

Commit

Permalink
Revert "fix: regression in memoization without outputs (#12130)"
Browse files Browse the repository at this point in the history
This reverts commit c751e66.
  • Loading branch information
terrytangyuan committed Nov 15, 2023
1 parent bdc1b25 commit e48b1f5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
19 changes: 12 additions & 7 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,15 +351,20 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
return node, err
}
if outputs != nil {
node, err = woc.wf.GetNodeByName(nodeName)
if err != nil {
woc.log.Errorf("unable to get node by name for %s", nodeName)
return nil, err
}
node.Outputs = outputs
woc.wf.Status.Nodes.Set(node.ID, *node)
}
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
}

Expand Down
20 changes: 11 additions & 9 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,21 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
if err != nil {
return node, err
}

if outputs != nil {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
return nil, err
}
node.Outputs = outputs
woc.addOutputsToGlobalScope(node.Outputs)
woc.wf.Status.Nodes.Set(node.ID, *node)
}

if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
}

Expand Down

0 comments on commit e48b1f5

Please sign in to comment.