-
Notifications
You must be signed in to change notification settings - Fork 231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(ipc
): use single buffer and remove manual wakers
#69
Conversation
ipc
): use single buffer and remove manual wakers
ipc
): use single buffer and remove manual wakersipc
): use single buffer and remove manual wakers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this simplifies things, so lgtm. deferring to @prestwich for merge
@@ -113,19 +112,17 @@ impl IpcBackend { | |||
pub struct ReadJsonStream<T> { | |||
/// The underlying reader. | |||
#[pin] | |||
reader: BufReader<T>, | |||
/// A buffer of bytes read from the reader. | |||
reader: tokio_util::compat::Compat<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL, although, what a name lol. I wasted some time looking for something like this 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!!! I was looking for that! i found the old 0.3 compat crate but couldn't find the new one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I previously wasted a few hours on this myself -.-
same with finding a properly maintained tokio::codec
Json
implementation.
using SerializeStream is an adequate solution I think
crates/transport-ipc/src/lib.rs
Outdated
match item { | ||
Some(Ok(response)) => { | ||
this.items.push(response); | ||
return Ready(Some(response)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wont this now fail to wake up again until the socket has more data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the thing here is that you assume the read has EXACTLY ONE item in it. but that may not be the case if geth has sent multiple notifications
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh then i guess the followup would be that the underlying stream re-wakes the task as there's more data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wont this now fail to wake up again until the socket has more data?
this yields a new value, the caller is supposed to call poll_next again, streams should only register the waker if they are pending
https://docs.rs/futures/latest/futures/stream/trait.Stream.html#tymethod.poll_next
Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted.
the way this loop works is:
it drains the buffer by yielding deserialized values (advancing the buffer)
if there are no more items to serialize it fills the buffer by reading from the reader
until pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wont this now fail to wake up again until the socket has more data?
this yields a new value, the caller is supposed to call poll_next again, streams should only register the waker if they are pending
this doesn't mesh with my understanding of my understanding of wakers. AIUI the waker exists to notify the executor that there is additional work (i.e. that something that made us pending has become unblocked), and the executor may never poll a task that does not have a call to Waker::wake
or Waker::wake_by_ref
. E.g. the example executor would not re-poll this future until the waker stored in the underlying IO re-wakes the task because more data is available to read. So the observed behavior with that executor would be that any time 2 responses are in the buffer, the 2nd one is not read until the ipc socket has even more data available (and that data would not be read until more is available)
when they say "register for waking" I'm pretty sure what they mean is "store a copy of the waker in the local state. this is how, e.g., the tokio::time::sleep
works under the hood. it stores the waker from the context of any task passed into it.
as a side note, your code may work in prod because tokio has an optimization where it polls futures again immediately after they return ready (even if the waker is not triggered), in case there's more work. While it may work, I'm not sure it is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio does not poll the future directly, the user of the stream does by calling poll_next. This happens in a Future, or poll function
the executor deals with the top-level task which contains the stream state machine, and any futures or other streams or other structs that contain that stream, and schedules that task for execution. futures and streams use the wakers to communicate to the executor that the task should be scheduled.
while let Some(next) = stream.next().await
this is a busy loop if the stream is always ready, yes. but as soon as the stream returns pending and the executor switches tasks, there is no guarantee that the executor returns to task before the task's Waker is used to re-wake it.
the implicit "guarantee" that
poll_next
registers the waker only exists if the stream is pending, per docs.For example: mpsc::UnboundedReceiver Stream:
https://docs.rs/futures-channel/0.3.29/src/futures_channel/mpsc/mod.rs.html#1084-1090
this is "registering" the waker by adding it to task state to be re-woken when the channel has more items, ensuring that the executor calls poll when items are available, as i described above. ctx.waker().wake_by_ref()
is not registering the waker in this way, it is requesting that the task be polled again by the executor immediately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctx.waker().wake_by_ref() is not registering the waker in this way, it is requesting that the task be polled again by the executor immediately
right, that's why it isn't necessary to call it when returning an item
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is necessary if our buffer isn't empty, as we have work to do immediately, which wont get done if the task isnt rewoken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the new poll loop has 4 exits:
- item decoded -> return the item
- decoding failed -> terminate
- no item decoded, nothing read from the reader -> pending (this ensures we're getting called again as soon as there's more to read)
- reading failed -> terminate
I fail to see where we'd need to request another call.
But after writing this I think we can optimize this a bit more and skip the decoding attempt one time when we previously returned pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we read every item as soon as it's written, then we don't have a problem
however, if the other end of the socket ever writes 2 items between polls of this stream, thne then next time the stream is polled it will fill the buffer (getting the 2 items), and then deser and return an item. At that point, we have a serialized item in the buffer. If we do not re-wake properly, then the next poll will occur when the asyncread is ready to give us more serialized items for our buffer. That wake will again result in only 1 item being read from the buffer
we need to re-wake until the buffer is empty, otherwise we risk falling significantly behind and filling our buffer
@@ -113,19 +112,17 @@ impl IpcBackend { | |||
pub struct ReadJsonStream<T> { | |||
/// The underlying reader. | |||
#[pin] | |||
reader: BufReader<T>, | |||
/// A buffer of bytes read from the reader. | |||
reader: tokio_util::compat::Compat<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!!! I was looking for that! i found the old 0.3 compat crate but couldn't find the new one
crates/transport-ipc/src/lib.rs
Outdated
|
||
loop { | ||
// try decoding from the buffer | ||
tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data"); | ||
let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instantiating the deserializer performs a new alloc every time the poll is invoked. instead, let's check if the buffer is empty before instantiating, and then after instantiating, drain the whole buffer into an items: Vec<PubSubItem>
. Will reduce total allocs significantly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a toggle drained
that is set when we have more data/no more items
refactors the
ReadJsonStream
: