-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle updates immediately when registered #1306
Handle updates immediately when registered #1306
Conversation
inflightUpdates++ | ||
updatesRan++ | ||
err := workflow.Sleep(ctx, time.Second) | ||
inflightUpdates-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoiding using defer here because #1235
internal/internal_event_handlers.go
Outdated
for _, u := range us { | ||
u() | ||
} | ||
rerun = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it definitely true that us
was non-empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but i'll admit that isn't obvious. We could move the line into the range block to cover that case.
// Keep executing until at least one goroutine made some progress | ||
for !allBlocked { | ||
for !allBlocked || d.allBlockedCallback() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this reads a bit oddly ("If not allblocked or allblocked"). I'm trying to think of a better name for the callback. Maybe something like tryAdvance()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me overall. Just a few comments but nothing major.
internal/internal_event_handlers.go
Outdated
// Check if any blocked updates remain when we have no more coroutines to run and let them run so they are rejected. | ||
// Generally iterating a map in workflow code is bad because it is non deterministic | ||
// this case is fine since all these update handles will be rejected and not recorded in history. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't immediately clear to me that rejection would happen because without the context of what the functions in blockedUpdates
really are (wrappers around the update that will also reject if no one registered, not just the user handler) it seems like this might be executing handlers directly.
I'm not sure there's really any great way to deal with that, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also not immediately obvious that it's guaranteed that all updates will be rejected. I guess this is the case because, if the updates were registered, then they would have had their handlers run before all the coroutines block.
That being the case - perhaps it would make more sense for this function to be called RejectUnhandledUpdates
.
Maybe it could even call a function that explicitly rejects the update, rather than the entire handler. I don't feel strongly about that though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I can document more clearly blockedUpdates
doesn't contain the handlers directly, but spawns a coroutine to handle them.
It's also not immediately obvious that it's guaranteed that all updates will be rejected. I guess this is the case because, if the updates were registered, then they would have had their handlers run before all the coroutines block.
Yeah it is up to the handler to reject, I'd rather keep the rejection logic all in the handler rather then sprinkle it around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That being the case - perhaps it would make more sense for this function to be called RejectUnhandledUpdates
I didn't name it that because I thought if I added a dynamic update handle then they would be handled here, we don't support dynamic anything in Go yet so maybe that isn't a good reason.
@@ -1123,8 +1134,10 @@ func (d *dispatcherImpl) ExecuteUntilAllBlocked(deadlockDetectionTimeout time.Du | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Typescript, the Update handler preempts immediately the code that was executing the setHandler
call, whereas here we're going to let all outstanding coroutines run until they're blocked/exit, and only then turn to the queued eager Update handler. I think that's going to result in (subtly) different semantics, e.g. related to whether or not an Update handler will see changes made by other concurrent routines. I'm not necessarily suggesting a change; that may fall well within expected inter-SDK variability. But, it would be technically possible I think to execute the pending eager coroutines inside the inner for loop, rather than waiting for that loop to exit (and I suspect it would bring Typescript and Go semantics closer together, but I haven't done the work to come up with an example.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes no coroutine in the Go SDK executes immediately when created. The same is true if users create there own coroutine or resolve a future. The runtimes schedule work differently. I agree it is probably technically possible to make the updates more "eager", but it isn't required to get the behavior WRT updates running before the main workflow function and allowing updates to run after being registered before the main function is finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't required to get the behavior WRT updates running before the main workflow function and allowing updates to run after being registered before the main function is finished.
Hm, I think that's not true. Our requirements for signal and update hold that pending signals and updates must get an opportunity to influence the workflow return value, and the params passed to CAN. But the workflow return value may be computed by one of the pending coroutines in the inner loop. Similarly, one of the pending coroutines in the inner loop could be a signal or update handler that CANs.
I'm thinking that we can meet that requirement by ensuring that, when we yield back to the executor loop in setUpdateHandler
, the new handler coroutine is the very next coroutine to be handled.
Here's a failing test that demonstrates what I'm saying. I think that because your test workflows are using workflow.Await
before the end, they are passing, but that we actually have a stronger requirement.
func (w *Workflows) UpdateSetHandlerOnly(ctx workflow.Context) (int, error) {
updatesRan := 0
updateHandle := func(ctx workflow.Context) error {
updatesRan++
return nil
}
workflow.SetUpdateHandler(ctx, "update", updateHandle)
return updatesRan, nil
}
func (ts *IntegrationTestSuite) TestUpdateAlwaysHandled() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-always-handled")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSetHandlerOnly)
ts.NoError(err)
// Send an update before the first workflow task
_, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
var result int
ts.NoError(run.Get(ctx, &result))
ts.Equal(1, result)
}
The equivalent test in Typescript passes: https://github.com/dandavison/temporalio-sdk-typescript/blob/125241bbd3a1b3cf29e59d47eaa0e979ad3d7e69/packages/test/src/test-integration-update.ts#L217-L243
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test passes with my changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this test as part of the PR since I think it is a good thing to test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry, moving too fast, I thought that failed. OK, so I I'd have to think more about whether one can construct a situation where a coroutine in the inner loop can CAN with a value that should be influenced by a pending eager coroutine. Can you see that that is impossible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries, I don't see how CAN is different then returning any normal value from the workflow function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great and it's been teaching about sdk-go. I'm currently thinking that we do need to implement maximally eager scheduling in order to uphold the requirements we've agreed upon (explained in comment).
@@ -1123,8 +1134,10 @@ func (d *dispatcherImpl) ExecuteUntilAllBlocked(deadlockDetectionTimeout time.Du | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't required to get the behavior WRT updates running before the main workflow function and allowing updates to run after being registered before the main function is finished.
Hm, I think that's not true. Our requirements for signal and update hold that pending signals and updates must get an opportunity to influence the workflow return value, and the params passed to CAN. But the workflow return value may be computed by one of the pending coroutines in the inner loop. Similarly, one of the pending coroutines in the inner loop could be a signal or update handler that CANs.
I'm thinking that we can meet that requirement by ensuring that, when we yield back to the executor loop in setUpdateHandler
, the new handler coroutine is the very next coroutine to be handled.
Here's a failing test that demonstrates what I'm saying. I think that because your test workflows are using workflow.Await
before the end, they are passing, but that we actually have a stronger requirement.
func (w *Workflows) UpdateSetHandlerOnly(ctx workflow.Context) (int, error) {
updatesRan := 0
updateHandle := func(ctx workflow.Context) error {
updatesRan++
return nil
}
workflow.SetUpdateHandler(ctx, "update", updateHandle)
return updatesRan, nil
}
func (ts *IntegrationTestSuite) TestUpdateAlwaysHandled() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-always-handled")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSetHandlerOnly)
ts.NoError(err)
// Send an update before the first workflow task
_, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
var result int
ts.NoError(run.Get(ctx, &result))
ts.Equal(1, result)
}
The equivalent test in Typescript passes: https://github.com/dandavison/temporalio-sdk-typescript/blob/125241bbd3a1b3cf29e59d47eaa0e979ad3d7e69/packages/test/src/test-integration-update.ts#L217-L243
a5e6c73
to
c43c47f
Compare
7c50575
to
3f57f53
Compare
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) | ||
require.NoError(t, err) | ||
require.True(t, d.IsDone()) | ||
require.Equal(t, []string{"root", "root yield start", "outer eager coroutine", "inner eager coroutine", "coroutine 1", "coroutine 2", "root yield finish"}, history) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Just for the record, as a faithless reviewer, I have double-checked that this fails without the latest updates to the executor loop.
Great, that second modification to the loop was pretty clean. So my only remaining question here is about backwards compatibility: suppose that we decide that we are interested in guaranteeing that updates always execute in the order supplied in the WFT, rather than sometimes executing in the order of handler registration. If we were to release this now, would that make such work more difficult than it would be without having released this change? |
No issue with backwards compatibility. The reason is that history will record the messages in the order the SDK processed the updates not the order supplied in the WFT. So when replaying the order processed is the order supplied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Handle updates immediately when registered. Handle updates before the root coroutine.
2d3382d
to
6c6ab19
Compare
Handle updates immediately when registered and give priority to update request over the main workflow coroutine if a handler is registered. Now when no update handlers have been registered instead of yielding the update coroutine we queue the update request and only create the coroutine when the handler is registered or there is no more coroutines to run. All update coroutines are now created at the front of the scheduler so they run before the root coroutine.
This change is a major change to how the Go SDK handles updates, it should be backwards compatible in the sense old workflows should continue to run, but new workflows will behave observably different to users.
closes #1297