-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(server): remove MethodSinkPermit
to fix backpressure issue on concurrent subscriptions
#1126
Conversation
After a method call has been received, this PR doesn't keep the `Permit` anymore and requires each response to await for a slot in the mpsc buffer. This is especially important for subscriptions which previously two slots were unintentionally required because the server kept one around until the subscription was "accepted" and the subscription itself waited for a slot. If many concurrent subscriptions were received "concurrently" and "filled" the buffer no subscription could make progress.
@@ -310,7 +310,6 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, mut receiver: Rec | |||
max_log_length, | |||
max_response_body_size, | |||
sink: sink.clone(), | |||
sink_permit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand the point of acquiring the permis above now? Might it be safer to not try to reserve a slot at all, and just to .await
until there is an opening each time we want to send an actual message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without "the reserve" the server will read the underlying socket and then the client is not "forced" to read its end of the socket to send new messages.
That's is the entire reason of reserve here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise, it will not propagated all the way down to the TCP level when the window size is adjusted etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your concern, this permit could interfere with the stuff trying to send, so we could do:
// Wait until there is a slot in the bounded channel.
//
// This will force the client to read socket on the other side
// otherwise the socket will not be read again.
let shutdown = loop {
if stop_handle.shutdown_requested() {
break true;
}
if sink.capacity() != 0 {
break false;
}
};
if shutdown {
break Ok(Shutdown::Stopped);
}
but feels a bit annoying to waste cycles doing busy looping :)
Permit
for responsesMethodSinkPermit
to fix backpressure issue on concurrent subscriptions
@@ -505,7 +504,7 @@ async fn execute_unchecked_call<L: Logger>(params: ExecuteCallParams<L>) { | |||
|
|||
CallOrSubscription::Call(r) => { | |||
logger.on_response(&r.result, request_start, TransportProtocol::WebSocket); | |||
sink_permit.send_raw(r.result); | |||
_ = sink.send(r.result).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will only fail if the connection is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thankyou for your explanations; this now makes sense to me and I get that the point of the permit is to stop messages from being read from the socket if the client is too slow to receive the responses back. Code looks like a definite improvement from before :)
Co-authored-by: James Wilson <james@jsdw.me>
Co-authored-by: James Wilson <james@jsdw.me>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Amazing work here! 👍
After a method call has been received, this PR removes the
MethodSinkPermit
because it wasn't dropped properly and might "deadlock" if all slots in the mpsc channel were acquired by subscriptions which hadn't been "accepted". Asasync fn accept
tries to acquire a slot in the mpsc buffer was well.Refactored to code a little bit as well to make things more understable