Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ipc): use single buffer and remove manual wakers #69

Merged
merged 3 commits into from
Dec 12, 2023

Conversation

mattsse
Copy link
Member

@mattsse mattsse commented Dec 11, 2023

refactors the ReadJsonStream:

  • yield decoded items immediately
  • no manual wakers
  • remove BufReader and use single buffer

@Evalir Evalir changed the title use single buffer and remove manual wakers feat(ipc): use single buffer and remove manual wakers Dec 11, 2023
@Evalir Evalir changed the title feat(ipc): use single buffer and remove manual wakers refactor(ipc): use single buffer and remove manual wakers Dec 11, 2023
Copy link
Member

@Evalir Evalir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this simplifies things, so lgtm. deferring to @prestwich for merge

@@ -113,19 +112,17 @@ impl IpcBackend {
pub struct ReadJsonStream<T> {
/// The underlying reader.
#[pin]
reader: BufReader<T>,
/// A buffer of bytes read from the reader.
reader: tokio_util::compat::Compat<T>,
Copy link
Member

@Evalir Evalir Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, although, what a name lol. I wasted some time looking for something like this 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!! I was looking for that! i found the old 0.3 compat crate but couldn't find the new one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously wasted a few hours on this myself -.-

same with finding a properly maintained tokio::codec Json implementation.
using SerializeStream is an adequate solution I think

match item {
Some(Ok(response)) => {
this.items.push(response);
return Ready(Some(response));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont this now fail to wake up again until the socket has more data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the thing here is that you assume the read has EXACTLY ONE item in it. but that may not be the case if geth has sent multiple notifications

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh then i guess the followup would be that the underlying stream re-wakes the task as there's more data?

Copy link
Member Author

@mattsse mattsse Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont this now fail to wake up again until the socket has more data?

this yields a new value, the caller is supposed to call poll_next again, streams should only register the waker if they are pending

https://docs.rs/futures/latest/futures/stream/trait.Stream.html#tymethod.poll_next

Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted.

the way this loop works is:

it drains the buffer by yielding deserialized values (advancing the buffer)
if there are no more items to serialize it fills the buffer by reading from the reader until pending

Copy link
Member

@prestwich prestwich Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont this now fail to wake up again until the socket has more data?

this yields a new value, the caller is supposed to call poll_next again, streams should only register the waker if they are pending

this doesn't mesh with my understanding of my understanding of wakers. AIUI the waker exists to notify the executor that there is additional work (i.e. that something that made us pending has become unblocked), and the executor may never poll a task that does not have a call to Waker::wake or Waker::wake_by_ref. E.g. the example executor would not re-poll this future until the waker stored in the underlying IO re-wakes the task because more data is available to read. So the observed behavior with that executor would be that any time 2 responses are in the buffer, the 2nd one is not read until the ipc socket has even more data available (and that data would not be read until more is available)

when they say "register for waking" I'm pretty sure what they mean is "store a copy of the waker in the local state. this is how, e.g., the tokio::time::sleep works under the hood. it stores the waker from the context of any task passed into it.

as a side note, your code may work in prod because tokio has an optimization where it polls futures again immediately after they return ready (even if the waker is not triggered), in case there's more work. While it may work, I'm not sure it is correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio does not poll the future directly, the user of the stream does by calling poll_next. This happens in a Future, or poll function

the executor deals with the top-level task which contains the stream state machine, and any futures or other streams or other structs that contain that stream, and schedules that task for execution. futures and streams use the wakers to communicate to the executor that the task should be scheduled.

while let Some(next) = stream.next().await

this is a busy loop if the stream is always ready, yes. but as soon as the stream returns pending and the executor switches tasks, there is no guarantee that the executor returns to task before the task's Waker is used to re-wake it.

the implicit "guarantee" that poll_next registers the waker only exists if the stream is pending, per docs.

For example: mpsc::UnboundedReceiver Stream:

https://docs.rs/futures-channel/0.3.29/src/futures_channel/mpsc/mod.rs.html#1084-1090

this is "registering" the waker by adding it to task state to be re-woken when the channel has more items, ensuring that the executor calls poll when items are available, as i described above. ctx.waker().wake_by_ref() is not registering the waker in this way, it is requesting that the task be polled again by the executor immediately

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.waker().wake_by_ref() is not registering the waker in this way, it is requesting that the task be polled again by the executor immediately

right, that's why it isn't necessary to call it when returning an item

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is necessary if our buffer isn't empty, as we have work to do immediately, which wont get done if the task isnt rewoken

Copy link
Member Author

@mattsse mattsse Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new poll loop has 4 exits:

  1. item decoded -> return the item
  2. decoding failed -> terminate
  3. no item decoded, nothing read from the reader -> pending (this ensures we're getting called again as soon as there's more to read)
  4. reading failed -> terminate

I fail to see where we'd need to request another call.
But after writing this I think we can optimize this a bit more and skip the decoding attempt one time when we previously returned pending

Copy link
Member

@prestwich prestwich Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we read every item as soon as it's written, then we don't have a problem

however, if the other end of the socket ever writes 2 items between polls of this stream, thne then next time the stream is polled it will fill the buffer (getting the 2 items), and then deser and return an item. At that point, we have a serialized item in the buffer. If we do not re-wake properly, then the next poll will occur when the asyncread is ready to give us more serialized items for our buffer. That wake will again result in only 1 item being read from the buffer

we need to re-wake until the buffer is empty, otherwise we risk falling significantly behind and filling our buffer

@@ -113,19 +112,17 @@ impl IpcBackend {
pub struct ReadJsonStream<T> {
/// The underlying reader.
#[pin]
reader: BufReader<T>,
/// A buffer of bytes read from the reader.
reader: tokio_util::compat::Compat<T>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!! I was looking for that! i found the old 0.3 compat crate but couldn't find the new one


loop {
// try decoding from the buffer
tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data");
let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instantiating the deserializer performs a new alloc every time the poll is invoked. instead, let's check if the buffer is empty before instantiating, and then after instantiating, drain the whole buffer into an items: Vec<PubSubItem>. Will reduce total allocs significantly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a toggle drained that is set when we have more data/no more items

@mattsse mattsse merged commit 51ee830 into main Dec 12, 2023
17 checks passed
@DaniPopes DaniPopes deleted the matt/work-on-ipc-reader branch January 15, 2024 18:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants