Skip to content

Commit

Permalink
fix: render template vars in DAGTask before releasing lock.. Fixes #9395
Browse files Browse the repository at this point in the history
 (#9405)

fix: render template vars in DAGTask before releasing lock. Fixes #9395

Signed-off-by: jsvk <850037+jsvk@users.noreply.github.com>

Signed-off-by: jsvk <850037+jsvk@users.noreply.github.com>
  • Loading branch information
jsvk authored Sep 27, 2022
1 parent b214161 commit 84c19ea
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
7 changes: 6 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,14 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}
}

processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.configMapInformer)
if err != nil {
woc.markNodeError(node.Name, err)
}

// Release acquired lock completed task.
if tmpl != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, tmpl.Synchronization)
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)
}

task := dagCtx.GetTask(taskName)
Expand Down
73 changes: 73 additions & 0 deletions workflow/controller/operator_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,79 @@ func TestMutexInDAG(t *testing.T) {
})
}

var DAGWithInterpolatedMutex = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-mutex
namespace: default
spec:
entrypoint: diamond
templates:
- name: diamond
dag:
tasks:
- name: A
template: mutex
arguments:
parameters:
- name: message
value: foo/bar
- name: B
depends: A
template: mutex
arguments:
parameters:
- name: message
value: foo/bar
- name: mutex
synchronization:
mutex:
name: '{{=sprig.replace("/", "-", inputs.parameters.message)}}'
inputs:
parameters:
- name: message
container:
image: alpine:3.7
command: [sh, -c, "echo {{inputs.parameters.message}}"]
`

func TestMutexInDAGWithInterpolation(t *testing.T) {
assert := assert.New(t)

cancel, controller := newController()
defer cancel()
ctx := context.Background()
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
}, workflowExistenceFunc)
t.Run("InterpolatedMutexWithDAG", func(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(DAGWithInterpolatedMutex)
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
for _, node := range woc.wf.Status.Nodes {
if node.Name == "dag-mutex.A" {
assert.Equal(wfv1.NodePending, node.Phase)
}
}
assert.Equal(wfv1.WorkflowRunning, woc.wf.Status.Phase)
makePodsPhase(ctx, woc, v1.PodSucceeded)

woc1 := newWorkflowOperationCtx(woc.wf, controller)
woc1.operate(ctx)
for _, node := range woc1.wf.Status.Nodes {
assert.NotEqual(wfv1.NodeError, node.Phase)
if node.Name == "dag-mutex.B" {
assert.Nil(node.SynchronizationStatus)
assert.Equal(wfv1.NodePending, node.Phase)
}
}
})
}

const RetryWfWithSemaphore = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down

0 comments on commit 84c19ea

Please sign in to comment.