Skip to content

Commit

Permalink
fix: regression in memoization without outputs (#12130)
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Clucas <alan@clucas.org>
  • Loading branch information
Joibel authored Nov 3, 2023
1 parent 4d062d1 commit c751e66
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 23 deletions.
19 changes: 7 additions & 12 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,20 +351,15 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
return node, err
}
if outputs != nil {
node, err = woc.wf.GetNodeByName(nodeName)
if err != nil {
woc.log.Errorf("unable to get node by name for %s", nodeName)
return nil, err
}
node.Outputs = outputs
woc.wf.Status.Nodes.Set(node.ID, *node)
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}

Expand Down
20 changes: 9 additions & 11 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,19 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
if err != nil {
return node, err
}

if outputs != nil {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
return nil, err
}
node.Outputs = outputs
woc.addOutputsToGlobalScope(node.Outputs)
woc.wf.Status.Nodes.Set(node.ID, *node)
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}

if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil
Expand Down

0 comments on commit c751e66

Please sign in to comment.