Skip to content

Commit

Permalink
fix: shouldn't fail to run cronworkflow because previous got shutdown…
Browse files Browse the repository at this point in the history
… on its own (race condition) (#11845)

Signed-off-by: Julie Vogelman <julievogelman0@gmail.com>
  • Loading branch information
juliev0 authored Sep 21, 2023
1 parent c04873e commit fbe9375
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
7 changes: 6 additions & 1 deletion workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ func (woc *cronWfOperationCtx) terminateOutstandingWorkflows(ctx context.Context
err := util.TerminateWorkflow(ctx, woc.wfClient, wfObjectRef.Name)
if err != nil {
if errors.IsNotFound(err) {
woc.log.Warnf("workflow '%s' not found when trying to terminate outstanding workflows", wfObjectRef.Name)
woc.log.Warnf("workflow %q not found when trying to terminate outstanding workflows", wfObjectRef.Name)
continue
}
alreadyShutdownErr, ok := err.(util.AlreadyShutdownError)
if ok {
woc.log.Warn(alreadyShutdownErr.Error())
continue
}
return fmt.Errorf("error stopping workflow %s: %e", wfObjectRef.Name, err)
Expand Down
11 changes: 10 additions & 1 deletion workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,15 @@ func StopWorkflow(ctx context.Context, wfClient v1alpha1.WorkflowInterface, hydr
return patchShutdownStrategy(ctx, wfClient, name, wfv1.ShutdownStrategyStop)
}

type AlreadyShutdownError struct {
workflowName string
namespace string
}

func (e AlreadyShutdownError) Error() string {
return fmt.Sprintf("cannot shutdown a completed workflow: workflow: %q, namespace: %q", e.workflowName, e.namespace)
}

// patchShutdownStrategy patches the shutdown strategy to a workflow.
func patchShutdownStrategy(ctx context.Context, wfClient v1alpha1.WorkflowInterface, name string, strategy wfv1.ShutdownStrategy) error {
patchObj := map[string]interface{}{
Expand All @@ -1128,7 +1137,7 @@ func patchShutdownStrategy(ctx context.Context, wfClient v1alpha1.WorkflowInterf
return !errorsutil.IsTransientErr(err), err
}
if wf.Status.Fulfilled() {
return true, fmt.Errorf("cannot shutdown a completed workflow")
return true, AlreadyShutdownError{wf.Name, wf.Namespace}
}
_, err = wfClient.Patch(ctx, name, types.MergePatchType, patch, metav1.PatchOptions{})
if apierr.IsConflict(err) {
Expand Down
2 changes: 1 addition & 1 deletion workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func TestStopWorkflowByNodeName(t *testing.T) {
_, err = wfIf.Create(ctx, origWf, metav1.CreateOptions{})
assert.NoError(t, err)
err = StopWorkflow(ctx, wfIf, hydratorfake.Noop, "succeeded-wf", "", "")
assert.EqualError(t, err, "cannot shutdown a completed workflow")
assert.EqualError(t, err, "cannot shutdown a completed workflow: workflow: \"succeeded-wf\", namespace: \"\"")
}

// Regression test for #6478
Expand Down

0 comments on commit fbe9375

Please sign in to comment.