Skip to content

Commit

Permalink
Workaround for updates delivered in first WFT (#1026)
Browse files Browse the repository at this point in the history
Workaround for updates delivered in first WFT

If the first WFT includes an update we can have a problem because the
workflow function itself hasn't run yet to register update handlers and
the update will be rejected. Adding this yield allows the scheduler to
execute the workflow function up to the first blocking point, at which
point the handler(s) will be registered by user code.

Co-authored-by: Chad Retz <chad.retz@gmail.com>
  • Loading branch information
Matt McShane and cretz authored Feb 1, 2023
1 parent 18824b6 commit f037c9d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 24 deletions.
62 changes: 41 additions & 21 deletions internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ type (
Complete(success interface{}, err error)
}

// UpdateScheduluer allows an update state machine to spawn coroutines and
// yield itself as necessary.
UpdateScheduler interface {
// Spawn starts a new named coroutine, executing the given function f.
Spawn(ctx Context, name string, f func(ctx Context)) Context

// Yield returns control to the scheduler.
Yield(ctx Context, status string)
}

// updateEnv encapsulates the utility functions needed by update protocol
// instance in order to implement the UpdateCallbacks interface. This
// interface is conveniently implemented by
Expand Down Expand Up @@ -207,37 +217,47 @@ func defaultUpdateHandler(
serializedArgs *commonpb.Payloads,
header *commonpb.Header,
callbacks UpdateCallbacks,
spawn func(Context, string, func(Context)) Context,
scheduler UpdateScheduler,
) {
env := getWorkflowEnvironment(rootCtx)
ctx, err := workflowContextWithHeaderPropagated(rootCtx, header, env.GetContextPropagators())
if err != nil {
callbacks.Reject(err)
return
}
eo := getWorkflowEnvOptions(ctx)
handler, ok := eo.updateHandlers[name]
if !ok {
keys := make([]string, 0, len(eo.updateHandlers))
for k := range eo.updateHandlers {
keys = append(keys, k)
scheduler.Spawn(ctx, name, func(ctx Context) {
eo := getWorkflowEnvOptions(ctx)

// If we suspect that handler registration has not occurred (e.g.
// because this update is part of the first workflow task and is being
// delivered before the workflow function itself has run and had a
// chance to register update handlers) then we yield control back to the
// scheduler to allow handler registration to occur. The scheduler will
// resume this coroutine after others have run to a blocking point.
if len(eo.updateHandlers) == 0 {
scheduler.Yield(ctx, "yielding for initial handler registration")
}
handler, ok := eo.updateHandlers[name]
if !ok {
keys := make([]string, 0, len(eo.updateHandlers))
for k := range eo.updateHandlers {
keys = append(keys, k)
}
callbacks.Reject(fmt.Errorf("unknown update %v. KnownUpdates=%v", name, keys))
return
}
callbacks.Reject(fmt.Errorf("unknown update %v. KnownUpdates=%v", name, keys))
return
}

args, err := decodeArgsToRawValues(
env.GetDataConverter(),
reflect.TypeOf(handler.fn),
serializedArgs,
)
if err != nil {
callbacks.Reject(fmt.Errorf("unable to decode the input for update %q: %w", name, err))
return
}
input := UpdateInput{Name: name, Args: args}
args, err := decodeArgsToRawValues(
env.GetDataConverter(),
reflect.TypeOf(handler.fn),
serializedArgs,
)
if err != nil {
callbacks.Reject(fmt.Errorf("unable to decode the input for update %q: %w", name, err))
return
}
input := UpdateInput{Name: name, Args: args}

spawn(ctx, name, func(ctx Context) {
envInterceptor := getWorkflowEnvironmentInterceptor(ctx)
if !IsReplaying(ctx) {
// we don't execute update validation during replay so that
Expand Down
73 changes: 71 additions & 2 deletions internal/internal_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func mustSetUpdateHandler(
require.NoError(t, SetUpdateHandler(ctx, name, handler, opts))
}

type testUpdateScheduler struct {
SpawnImpl func(Context, string, func(Context)) Context
YieldImpl func(Context, string)
}

type testUpdateCallbacks struct {
AcceptImpl func()
RejectImpl func(err error)
Expand All @@ -60,6 +65,22 @@ func (tuc *testUpdateCallbacks) Complete(success interface{}, err error) {
tuc.CompleteImpl(success, err)
}

func (tus *testUpdateScheduler) Spawn(ctx Context, name string, f func(Context)) Context {
return tus.SpawnImpl(ctx, name, f)
}

func (tus *testUpdateScheduler) Yield(ctx Context, status string) {
tus.YieldImpl(ctx, status)
}

var runOnCallingThread = &testUpdateScheduler{
SpawnImpl: func(ctx Context, _ string, f func(Context)) Context {
f(ctx)
return ctx
},
YieldImpl: func(Context, string) {},
}

func TestUpdateHandlerPanicSafety(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -151,8 +172,6 @@ func TestDefaultUpdateHandler(t *testing.T) {
args, err := dc.ToPayloads(argStr)
require.NoError(t, err)

runOnCallingThread := func(ctx Context, _ string, f func(Context)) Context { f(ctx); return ctx }

t.Run("no handler registered", func(t *testing.T) {
mustSetUpdateHandler(
t,
Expand Down Expand Up @@ -244,6 +263,56 @@ func TestDefaultUpdateHandler(t *testing.T) {
expectedResult, _ := updateFunc(ctx, argStr)
require.Equal(t, expectedResult, result)
})

t.Run("update before handlers registered", func(t *testing.T) {
// same test as above except that we don't set the update handler for
// t.Name() until the first Yield. This emulates the situation where
// there is an update in the first WFT of a workflow so the SDK needs to
// wait for the workflow function to execute up to the point where it
// has registered some update handlers. If the SDK attempts to deliver
// the update before the first run of the workflow function, no handlers
// will be registered yet.

// don't reuse the context that has all the other update handlers
// registered because the code under test will think the handler
// registration at workflow start time has already occurred
_, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err)

updateFunc := func(ctx Context, s string) (string, error) { return s + " success!", nil }
var (
resultErr error
rejectErr error
accepted bool
result interface{}
)
sched := &testUpdateScheduler{
SpawnImpl: func(ctx Context, _ string, f func(Context)) Context {
f(ctx)
return ctx
},
YieldImpl: func(ctx Context, _ string) {
// set the handler in place here
mustSetUpdateHandler(t, ctx, t.Name(), updateFunc, UpdateHandlerOptions{})
},
}
defaultUpdateHandler(ctx, t.Name(), args, hdr, &testUpdateCallbacks{
RejectImpl: func(err error) { rejectErr = err },
AcceptImpl: func() { accepted = true },
CompleteImpl: func(success interface{}, err error) {
resultErr = err
result = success
},
}, sched)

require.True(t, accepted)
require.Nil(t, resultErr)
require.Nil(t, rejectErr)

expectedResult, _ := updateFunc(ctx, argStr)
require.Equal(t, expectedResult, result)
})

}

func TestInvalidUpdateStateTransitions(t *testing.T) {
Expand Down
18 changes: 17 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ type (
queryType string
dataConverter converter.DataConverter
}

// coroScheduler adapts the coro dispatcher to the UpdateScheduler interface
coroScheduler struct {
dispatcher dispatcher
}
)

const (
Expand Down Expand Up @@ -533,7 +538,7 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common

getWorkflowEnvironment(d.rootCtx).RegisterUpdateHandler(
func(name string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) {
defaultUpdateHandler(d.rootCtx, name, serializedArgs, header, callbacks, d.dispatcher.NewCoroutine)
defaultUpdateHandler(d.rootCtx, name, serializedArgs, header, callbacks, coroScheduler{d.dispatcher})
})

getWorkflowEnvironment(d.rootCtx).RegisterQueryHandler(
Expand Down Expand Up @@ -1579,3 +1584,14 @@ func (wg *waitGroupImpl) Wait(ctx Context) {
}
wg.future, wg.settable = NewFuture(ctx)
}

// Spawn starts a new coroutine with Dispatcher.NewCoroutine
func (cs coroScheduler) Spawn(ctx Context, name string, f func(Context)) Context {
return cs.dispatcher.NewCoroutine(ctx, name, f)
}

// Yield calls the yield function on the coroutineState associated with the
// supplied workflow context.
func (cs coroScheduler) Yield(ctx Context, reason string) {
getState(ctx).yield(reason)
}

0 comments on commit f037c9d

Please sign in to comment.