Skip to content

Commit

Permalink
Spawn update coro directly through dispatcher (#885)
Browse files Browse the repository at this point in the history
Calling Go/GoNamed with a context from outside the workflow will fail
because the context doesn't have a handle to dispatcher state.
  • Loading branch information
Matt McShane authored Aug 19, 2022
1 parent 8d5582c commit 3d111e3
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
6 changes: 3 additions & 3 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func defaultUpdateHandler(
serializedArgs *commonpb.Payloads,
header *commonpb.Header,
callbacks UpdateCallbacks,
goNamed func(Context, string, func(Context)),
spawn func(Context, string, func(Context)) Context,
) {
env := getWorkflowEnvironment(rootCtx)
ctx, err := workflowContextWithHeaderPropagated(rootCtx, header, env.GetContextPropagators())
Expand All @@ -520,7 +520,7 @@ func defaultUpdateHandler(
}
input := UpdateInput{Name: name, Args: args}

goNamed(ctx, name, func(ctx Context) {
spawn(ctx, name, func(ctx Context) {
envInterceptor := getWorkflowEnvironmentInterceptor(ctx)
if err := envInterceptor.inboundInterceptor.ValidateUpdate(ctx, &input); err != nil {
callbacks.Reject(err)
Expand Down Expand Up @@ -584,7 +584,7 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common

getWorkflowEnvironment(d.rootCtx).RegisterUpdateHandler(
func(name string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) {
defaultUpdateHandler(d.rootCtx, name, serializedArgs, header, callbacks, GoNamed)
defaultUpdateHandler(d.rootCtx, name, serializedArgs, header, callbacks, d.dispatcher.NewCoroutine)
})

getWorkflowEnvironment(d.rootCtx).RegisterQueryHandler(
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,7 @@ func TestDefaultUpdateHandler(t *testing.T) {
args, err := dc.ToPayloads(argStr)
require.NoError(t, err)

runOnCallingThread := func(ctx Context, _ string, f func(Context)) { f(ctx) }
runOnCallingThread := func(ctx Context, _ string, f func(Context)) Context { f(ctx); return ctx }

t.Run("no handler registered", func(t *testing.T) {
MustSetUpdateHandler(t, ctx, "unused_handler", func() error { panic("not called") }, UpdateOptions{})
Expand Down

0 comments on commit 3d111e3

Please sign in to comment.