-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
buffer: wake tasks waiting for channel capacity when terminating #480
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
LucioFranco
approved these changes
Oct 28, 2020
This was referenced Feb 11, 2021
teor2345
added a commit
to teor2345/zebra
that referenced
this pull request
Feb 17, 2021
When other tower-batch tasks drop, wake any tasks that are waiting for a semaphore permit. Otherwise, tower-batch can hang. We currently pin tower in our workspace to: d4d1c67 hedge: use auto-resizing histograms (tower-rs/tower#484) Copy tower/src/semaphore.rs from that commit, to pick up tower-rs/tower#480.
dconnolly
pushed a commit
to ZcashFoundation/zebra
that referenced
this pull request
Feb 17, 2021
When other tower-batch tasks drop, wake any tasks that are waiting for a semaphore permit. Otherwise, tower-batch can hang. We currently pin tower in our workspace to: d4d1c67 hedge: use auto-resizing histograms (tower-rs/tower#484) Copy tower/src/semaphore.rs from that commit, to pick up tower-rs/tower#480.
hawkw
added a commit
to linkerd/linkerd2-proxy
that referenced
this pull request
Feb 17, 2021
…922) The proxy currently has its own implementation of a `tower` `Service` that makes an inner service `Clone`able by driving it in a spawned task and buffering requests on a channel. This also exists upstream, as `tower::buffer`. We implemented our own version for a couple of reasons: to avoid an upstream issue where memory was leaked when a buffered request was cancelled, and to implement an idle timeout when the buffered service has been unready for too long. However, it's no longer necessary to reimplement our own buffer service for these reasons: the upstream bug was fixed in `tower` 0.4 (see tower-rs/tower#476, tower-rs/tower#480, and tower-rs/tower#556); and we no longer actually use the buffer idle timeout (instead, we idle out unresponsive services with the separate `Failfast` middleware, note that `push_spawn_buffer_with_idle_timeout` is never actually used). Therefore, we can remove our _sui generis_ implementation in favour of `tower::buffer` from upstream. This eliminates dead code for the idle timeout, which we never actually use, and reduces duplication (since `tonic` uses `tower::buffer` internally, its code is already compiled into the proxy). It also reduces the amount of code I'm personally responsible for maintaining in two separate places ;) Since the `linkerd-buffer` crate erases the type of the buffered service, while `tower::buffer` does not, I've changed the `push_spawn_buffer`/`spawn_buffer` helpers to also include a `BoxService` layer. This required adding a `BoxServiceLayer` type, since `BoxService::layer` returns a `LayerFn` with an unnameable type. Also, this change ran into issues due to a compiler bug where generators (async blocks) sometimes forget concrete lifetimes, rust-lang/rust#64552. In order to resolve this, I had to remove the outermost `async` blocks from the OpenCensus and identity daemon tasks. These async blocks were used only for emitting a tracing event when the task is started, so it wasn't a big deal to remove them; I moved the trace events into the actual daemon task functions, and used a `tracing` span to propagate the remote addresses which aren't known inside the daemon task functions. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
10 tasks
2 tasks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Depends on #479
#476 introduces one regression in
buffer
: tasks waiting for abuffer::Service
to become ready (inpoll_ready
) are no longernotified when the buffer worker task terminates or the inner service
fails. This means that if a buffer is over its channel capacity, and
tasks are waiting for it to become ready again, and the buffered service
fails before sufficient channel capacity is released to wake those
tasks, those tasks are effectively leaked. They will never become ready.
This is because we are no longer using Tokio's bounded MPSC channel, and
are instead implementing our own bounded channel on top of the unbounded
MPSC using a semaphore. While the bounded MPSC sender's
poll_ready
method would notify waiting tasks when the receiver is dropped, that
method no longer exists in Tokio 0.3, and we have to use an external
semaphore to provide a polling based API for waiting for channel
capacity. The semaphore does not wake pending tasks because it must live
in an
Arc
in order for theSemaphore::acquire_owned
method to beuseable, and thus it will never be dropped while there are waiting
tasks. Not dropping the semaphore is necessary to uphold the safety
invariants of the intrusive wait list, but it means that the tasks will
not be woken by the semaphore being dropped.
This branch fixes this issue by adding the ability to wake all tasks
waiting on a semaphore by adding close to the maximum allowed number of
permits to it. This should be sufficient to wake all waiters, but it's
a little bit janky. The ideal solution would be for Tokio to expose the
close
method on the internal semaphore implementation in a public API.However, this works now without requiring an upstream change.
I've also added tests for this behavior.