-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Call the ShutdownWorker API as part of workflow worker cleanup #1645
Conversation
internal/internal_worker.go
Outdated
// Best-effort cleanup, used to signal to the server that the sticky queue will | ||
// no longer be polled. Errors are ignored. | ||
defer func() { | ||
_ = ww.poller.Cleanup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop
should not complete until this call is finished. The contract for stopping a worker is once a worker is stopped all resources associated with the worker should be cleaned up and the worker will no longer use the connection/client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer
is synchronous, Stop
won't complete until ww.poller.Cleanup()
does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll probably want to call this after all the pollers have stopped as well, otherwise couldn't a sticky poll request come in after you call Shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why this call is in a defer
; is the implication that workflowTaskPoller
is still running after workflowWorker
? It shouldn't be, since:
baseWorker.Stop()
is synchronous, and responsible for new calls toPollTask
, andclose(bw.stopCh)
occurs withinbaseWorker.Stop()
,- All polling requests are
select
'd against their results, as well asbw.stopCh
, to immediately cancel the gRPC. Additionally, before a task is actually processed,isStopping()
is checked, to account for the interleaving between polling and processing.
There's a slight interleaving where the gRPC call completes service side, right as a gRPC context cancel
occurs client-side, but I'm not worried about that as an edge case, given that this is a best-effort optimization anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the implication that workflowTaskPoller is still running after workflowWorker?
Sorry, I should have clarified a task worker could still respond to a sticky poll request, since your running this regardless of if the worker stop actually succeeded. It is probably pointless to make this call if the worker didn't stop right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't see any issue with it being in the base worker , that was what I did in my POC unless I am missing something?
Note: My branch is incorrect because it is not waiting for the pollers to shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't realize you could access the task worker from bw.options
. Sure, I'll update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I noticed after running the tests is that the default WorkerStopTimeout
is 0
; that means that a timer.After
on it will return immediately, preventing our cleanup goroutine from getting a chance to run. Should we set a default WorkerStopTimeout
(or even enforce a tiny minimum, like 1-3s) for users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set a default WorkerStopTimeout (or even enforce a tiny minimum, like 1-3s) for users?
Hm we can't start enforcing a minimum now, that would break uses who aren't setting one. We could add a default let me double check what other SDKs do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline I think your current approach is fine , apologize for wasting time on another path
grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler(wtp.metricsHandler)) | ||
defer cancel() | ||
|
||
_, err := wtp.service.ShutdownWorker(grpcCtx, &workflowservice.ShutdownWorkerRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way for us to test in our integration tests that this works and has effect? Even if not now, can we open an issue for when it is available server side to confirm it has effect (e.g. that sticky was reset across two workers)? My fear is that we may do something wrong on the request building or something and this gRPC call always errors and we would just never know that it never worked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way for us to test in our integration tests that this works and has effect? Even if not now, can we open an issue for when it is available server side to confirm it has effect (e.g. that sticky was reset across two workers)?
Not exactly. The call signals Matching to route tasks away from a shutdown worker's sticky queue (where the total timeout is 10 seconds). However, the caller that would actually see the effect from the rerouting isn't the end client, it's History service, which handles the rerouting. The end result is that any future tasks previously destined for the sticky queue will have a lower schedule-to-start latency.
The important takeaway is that this isn't very observable from a testing point of view this far away in the stack - the signals between success and failure are fuzzy, it's just latency. If the call fails, the worst case scenario is that schedule-to-start is more latent (by up to 10s). If the call succeeds, we know schedule-to-start is hopefully less latent, but that isn't guaranteed by any means. ShutdownWorker
could work perfectly, and the task's ScheduleToStart
may be slow from an entirely unrelated reason in History or Matching services.
My fear is that we may do something wrong on the request building or something and this gRPC call always errors and we would just never know that it never worked.
Because the effect of this call takes place entirely within the service, asynchronously, IMO the testing needs to verify two independent facts:
- Does the worker call
ShutdownWorker
and receive positive acknowledgement from the service? The call doesn't have an effect directly visible to the worker, so here we instead need to verify that the server will validate and accept the request. - Does the service process
ShutdownWorker
properly/does the call have an effect? Because we're testing deeper within the service, we get a strong discrete signal for if the call had an effect; I can directly callMatchingService
and assert that it returnsStickyWorkerUnavailable
for a shut down queue. This is a much better signal for test success than merely trying to guess based on downstream latency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I wonder if we can demonstrate a user-visible effect, e.g. such as having two workers, and the worker that got a sticky workflow is able to be shutdown and have the next workflow task immediately go to the next worker without penalty (meaning confirm it happens in <= 9s). I think that kind of <= 9s leeway is ok to assert in the tests, but agreed we don't want to assert timing to any detail because of the other moving parts.
Regardless, I understand we cannot assert this behavior in integration tests yet anyways, so this would be a future issue (or a features repo test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lina-temporal did you add any test to verify ShutdownWoker
is only treated as best effort? I was expecting to see a unit test that mocked the service stub returned an error for ShutdownWoker
and verify that from the users perspective the shutdown still succeeded. If I missed it please just point me at the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added a test to assert that Cleanup
is best effort (TestCleanupIsBestEffort
).
If at all possible, would like this to wait on temporalio/temporal#6511 to be merged just to confirm we don't need any backwards-incompatible API changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking approved because I made a comment or two on this PR and don't want there to be confusion that any of it is blocking. Please wait for @Quinn-With-Two-Ns's approval before merging.
FYI, server implementation of the API has been merged. |
…ralio#1645) Call the ShutdownWorker API as part of workflow worker cleanup
What was changed
ShutdownWorker API
is now called from the Go SDK as part of workflow worker shutdown.Why?
ShutdownWorker
is primarily used to signal to the Temporal Matching service that a given sticky task queue will no longer be polled. This lets Matching immediately direct tasks to the regular task queue when a workflow worker shuts down, rather than waiting for the sticky queue's timeout to elapse.Checklist
go test ./...
cd internal/cmd/build && go run . check