-
Notifications
You must be signed in to change notification settings - Fork 930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mplex] Refactoring with Patches #1769
Conversation
Thereby addressing the following issues: * Send a `Reset` frame when open substreams get dropped (313). * Avoid stalls caused by a read operation on one substream reading (and buffering) frames for another substream without notifying the corresponding task. I.e. the tracked read-interest must be scoped to a substream. * Remove dropped substreams from the tracked set of open substreams, to avoid artificially running into substream limits.
By taking the substream state into account. The refined behaviour is modeled after the behaviour of Yamux.
muxers/mplex/src/io.rs
Outdated
/// Sends pending frames, without flushing. | ||
fn send_pending_frames(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
while let Some(frame) = self.pending_frames.pop() { | ||
if self.poll_send_frame(cx, || frame.clone())?.is_pending() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not call self.pending_frames.pop()
within the closure? As far as I understand, if the closure is called, the outcome cannot be Pending
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's permissible due to borrowing. Even if poll_send_frame
were made a standalone function and thus not borrow all of self
, it still calls on_error
in case of an I/O error which also wants a mutable borrow for pending_frames
to drop/clear it on error. I agree that the pop/push as well as the frame cloning is suboptimal, but I didn't consider it that important or inefficient. The frames sent here are only close/reset frames containing only a stream ID. We could of course make poll_send_frame
return the frame in the Pending
case with a custom return enum.
struct NotifierRead { | ||
/// List of wakers to wake when read operations can proceed | ||
/// on a substream (or in general, for the key `None`). | ||
pending: Mutex<FnvHashMap<Option<LocalStreamId>, Waker>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, wakers whose key (the Option<LocalStreamId>
) is Some
come from calling poll_read_stream
, and wakers whose key is None
come from calling poll_next_stream
.
If that's accurate, then it is correct to have a Waker
here rather than a Vec<Waker>
.
I find it quite hard to follow the logic, and it doesn't seem very fool-proof, but I also don't know how to make it clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's accurate, then it is correct to have a Waker here rather than a Vec.
Yes.
I find it quite hard to follow the logic, and it doesn't seem very fool-proof, but I also don't know how to make it clearer.
Not sure what logic you mean, I guess in general the task wakeup logic for reading? Yes, that is, as is mostly the case, subtle. There are these two different "threads of control" for reading by design: 1) Awaiting the next inbound stream and b) Awaiting new data on a specific stream. Thereby 1) and each instance of 2) potentially read and buffer frames for other streams (or for new inbound streams) that need to result in wakeup attempts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can one of you expand on why it is safe to only cache a single waker
for Option<LocalStreamId>::None
?
Say Multiplexed::poll_next_stream
is polled by two different tasks in an alternating fashion. Wouldn't it be desirable if both tasks are woken up once a new Open
frame arrives?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can one of you expand on why it is safe to only cache a single waker for Option::None?
Because the key None
is only used by poll_next_stream
, which is part of the implementation for StreamMuxer::poll_event
whose API contract explicitly states that "Only the latest task that was used to call this method may be notified.". The API contract for StreamMuxer::read_substream
, StreamMuxer::write_substream
etc. is similar. In general, and as far as I know, the reason for StreamMuxer
s to be Sync
is to permit using different substreams from different threads, not to use the same substream from different threads. Only waking the last task that poll
s is generally common for most such APIs, e.g. Future::poll has an analogous contract. In practice, I would think the reason for this contract is a matter of practicality due to ease and efficiency of implementation and the relatively rare usefulness for having to "remember" all tasks to wake, instead of just the last.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the in-depth explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think the reason for this contract is a matter of practicality due to ease and efficiency of implementation and the relatively rare usefulness for having to "remember" all tasks to wake, instead of just the last.
One big problem with remember all tasks to wake up is that your container of wakers might grow a lot.
For instance, imagine that it takes 60 seconds before data is received on a substream and that, due to the nature of polling, we poll a substream 10 times per second. This results in 600 calls to poll_read_substream
before the wakers actually get awakened. This means that we would insert 600 elements in the list of wakers.
The will_wake
method allows reducing the amount of duplicates, and also we're not really supposed to poll a substream 10 times per second. But both these mechanisms are unreliable.
Enforcing one waker per "thing to wake" guarantees a bound to the number of wakers, which is not a bad idea.
* Make the pending frames a FIFO queue. * Take more care to avoid keeping read-wakers around and to notify them when streams close.
It is probably safer to always wake pending wakers.
Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Max Inden <mail@max-inden.de>
@mxinden I assume you're still taking your time to review. There is no rush from my side, I appreciate the time taken for the reviews. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine merging this pull request as is. I have a couple of questions suggestions, but none of them are important. For what my opinion is worth, I find this to be a well-conceived implementation.
Co-authored-by: Max Inden <mail@max-inden.de>
While seemingly duplicating some control flow between `poll_next_strean` and `poll_read_stream`, the individual control flow of each read operation is easier to follow.
These tests don't even use mplex, so this sudden error seems unrelated, but I can look into it separately, if it is somewhat reproducible. |
struct NotifierRead { | ||
/// List of wakers to wake when read operations can proceed | ||
/// on a substream (or in general, for the key `None`). | ||
pending: Mutex<FnvHashMap<Option<LocalStreamId>, Waker>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can one of you expand on why it is safe to only cache a single waker
for Option<LocalStreamId>::None
?
Say Multiplexed::poll_next_stream
is polled by two different tasks in an alternating fashion. Wouldn't it be desirable if both tasks are woken up once a new Open
frame arrives?
/// The configuration. | ||
config: MplexConfig, | ||
/// Buffer of received frames that have not yet been consumed. | ||
buffer: Vec<Frame<RemoteStreamId>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an idea, not sure it is worth pursuing, especially without a benchmark proving it to be an issue: Does the ordering between different StreamId
s matter? If not, wouldn't a HashMap<Option<StreamId>, Vec<Frame<RemoteStreamId>>>
be more efficient as poll_next_stream
and poll_read_stream
would not have to iterate over the entire (worst case 4096 items) buffer on each call, but only a small subset? Next to the additional complexity in the data structure itself, this would also make buffer length tracking a bit harder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle, yes, I'd prefer to have a read buffer per stream, i.e. a map. Not just because of potentially suboptimal performance if just a single stream is slow to consume its frames, but also because it would allow just Reset
ing a single stream if the buffer limit for that stream is hit, instead of failing the entire connection when the shared buffer
is full with the default configuration MaxBufferBehaviour::CloseAll
. Essentially changing MaxBufferBehaviour::CloseAll
to MaxBufferBehaviour::ResetStream
, as this is more along the lines of what the mplex spec prescibes in the implementation notes. The configured buffer limit would then apply to a single substream and together with the substream limit lets one still reason about resource usage bounds. But in any case, the shared buffer was pre-existing code and I'd really prefer to propose such a change in a separate PR. I didn't want to pile up too many semantical changes here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still looks good to me. Thanks for all the comments.
Overview
This is a rather substantial refactoring of
libp2p-mplex
, though the control-flow skeleton remains the same, as dictated by theStreamMuxer
API. In the context of looking into #1758, besides a bug inlibp2p-ping
that only occurs with mplex and for which I will open a separate PR, the following two problems inlibp2p-mplex
became apparent and are intended to be addressed here:Avoid stalls caused by a read operation on one substream reading (and buffering) frames for another substream without
notifying the corresponding task. While testing Ping protocol tests fail when mplex is used instead of yamux. #1758, even when the
ping_pong
tests ran through, there were occasional stalls in between. It turned out that, when a read operation for one substream buffers frames for another, the corresponding task that may be waiting on these buffered frames is not notified. This led to situations in theping_pong
test where both sides sent each other the outbound ping simultaneously, each on its own substream, and after the read operations for the inbound substreams returnedPending
, the read operation for the outbound substreams waiting for the responses would read and buffer the inbound ping of the remote for the other substream and, not finding any frames for themselves yet, again returningPending
without waking up the task(s) interested in the newly buffered frame(s). Only the ping timeout triggered the polling again. To avoid needless polling of the same task, it seemed necessary to me that read-interest for substreams is tracked for the particular substream IDs, instead of "globally".Remove dropped substreams from the tracked set of open substreams, to avoid artificially running into substream limits.
Related Context
Both of the above problems probably relate to #1629 and #1504 which report mplex symptoms that seem highly related (hitting unexpected substream limits and intermittent stalls). Though the latter is closed because panics were fixed, the problem with unexpectedly hitting the substream limit remained unsolved there, I think.
Other Changes
While I was at it, I also went ahead with #313 and thus another change here is:
Reset
frame when an open substream gets dropped.Furthermore, the semantics of the
max_substreams
configuration changed as follows:Poll::Pending
) with a wakeup once an existing substream closes, i.e. the limit results in back-pressure for new outbound substreams. New inbound substreams beyond the limit are immediately answered with aReset
, again a form of back-pressure. If too many (by some internal threshold) pendingReset
frames accumulate, e.g. as a result of an aggressive number of inbound substreams being opened beyond the configured limit, the connection is closed ("DoS protection").Testing
While the multiplexers themselves still need more testing, also compatibility testing (#508), and ideally comparative performance testing, I have so far done the following (besides passing the libp2p test suite where mplex is often used, of course):
libp2p-ping
that fixes a small bug and depends on this PR will randomise the choice of multiplexer used for integration tests, expecting the same behaviour. This kind of testing essentially revealed all the issues mentioned here as a result of looking into Ping protocol tests fail when mplex is used instead of yamux. #1758.