From 149072b9e36722a70619764ae4584abfc6cdd5bd Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 26 Jul 2021 11:41:59 +0200 Subject: [PATCH 1/5] io: add fill_buf and consume --- tokio/src/io/util/async_buf_read_ext.rs | 54 +++++++++++++++++++++++++ tokio/src/io/util/fill_buf.rs | 49 ++++++++++++++++++++++ tokio/src/io/util/mod.rs | 1 + 3 files changed, 104 insertions(+) create mode 100644 tokio/src/io/util/fill_buf.rs diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs index 233ac31c463..f96ac0a40f7 100644 --- a/tokio/src/io/util/async_buf_read_ext.rs +++ b/tokio/src/io/util/async_buf_read_ext.rs @@ -1,3 +1,4 @@ +use crate::io::util::fill_buf::{fill_buf, FillBuf}; use crate::io::util::lines::{lines, Lines}; use crate::io::util::read_line::{read_line, ReadLine}; use crate::io::util::read_until::{read_until, ReadUntil}; @@ -206,6 +207,59 @@ cfg_io_util! { split(self, byte) } + /// Returns the contents of the internal buffer, filling it with more + /// data from the inner reader if it is empty. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`consume`] method to function properly. When calling this method, + /// none of the contents will be "read" in the sense that later calling + /// `read` may return the same contents. As such, [`consume`] must be + /// called with the number of bytes that are consumed from this buffer + /// to ensure that the bytes are never returned twice. + /// + /// An empty buffer returned indicates that the stream has reached EOF. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fill_buf(&mut self) -> io::Result<&[u8]>; + /// ``` + /// + /// # Errors + /// + /// This function will return an I/O error if the underlying reader was + /// read, but returned an error. + /// + /// [`consume`]: crate::io::AsyncBufReadExt::consume + fn fill_buf<'a>(&'a mut self) -> FillBuf<'a, Self> + where + Self: Unpin, + { + fill_buf(self) + } + + /// Tells this buffer that `amt` bytes have been consumed from the + /// buffer, so they should no longer be returned in calls to [`read`]. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`fill_buf`] method to function properly. This function does not + /// perform any I/O, it simply informs this object that some amount of + /// its buffer, returned from [`fill_buf`], has been consumed and should + /// no longer be returned. As such, this function may do odd things if + /// [`fill_buf`] isn't called before calling it. + /// + /// The `amt` must be less than the number of bytes in the buffer + /// returned by [`fill_buf`]. + /// + /// [`read`]: crate::io::AsyncReadExt::read + /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf + fn consume(&mut self, amt: usize) + where + Self: Unpin, + { + std::pin::Pin::new(self).consume(amt) + } + /// Returns a stream over the lines of this reader. /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). /// diff --git a/tokio/src/io/util/fill_buf.rs b/tokio/src/io/util/fill_buf.rs new file mode 100644 index 00000000000..f7715f1c5d9 --- /dev/null +++ b/tokio/src/io/util/fill_buf.rs @@ -0,0 +1,49 @@ +use crate::io::AsyncBufRead; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Future for the [`fill_buf`](crate::io::AsyncBufReadExt::fill_buf) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct FillBuf<'a, R: ?Sized> { + reader: Option<&'a mut R>, + #[pin] + _pin: PhantomPinned, + } +} + +pub(crate) fn fill_buf<'a, R>(reader: &'a mut R) -> FillBuf<'a, R> +where + R: AsyncBufRead + ?Sized + Unpin, +{ + FillBuf { + reader: Some(reader), + _pin: PhantomPinned, + } +} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> { + type Output = io::Result<&'a [u8]>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + + let reader = me.reader.take().expect("Polled after completion."); + match Pin::new(&mut *reader).poll_fill_buf(cx) { + Poll::Ready(_) => match Pin::new(reader).poll_fill_buf(cx) { + Poll::Ready(slice) => Poll::Ready(slice), + Poll::Pending => panic!("poll_fill_buf returned Pending while having data"), + }, + Poll::Pending => { + *me.reader = Some(reader); + Poll::Pending + } + } + } +} diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index fd3dd0dfc4c..21199d0be84 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -49,6 +49,7 @@ cfg_io_util! { mod read_exact; mod read_int; mod read_line; + mod fill_buf; mod read_to_end; mod vec_with_initialized; From dc9ca39b87b3e433de09268b30ebd1840cb96a73 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 26 Jul 2021 12:41:50 +0200 Subject: [PATCH 2/5] clippy --- tokio/src/io/util/async_buf_read_ext.rs | 2 +- tokio/src/io/util/fill_buf.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs index f96ac0a40f7..c7d764abf79 100644 --- a/tokio/src/io/util/async_buf_read_ext.rs +++ b/tokio/src/io/util/async_buf_read_ext.rs @@ -231,7 +231,7 @@ cfg_io_util! { /// read, but returned an error. /// /// [`consume`]: crate::io::AsyncBufReadExt::consume - fn fill_buf<'a>(&'a mut self) -> FillBuf<'a, Self> + fn fill_buf(&mut self) -> FillBuf<'_, Self> where Self: Unpin, { diff --git a/tokio/src/io/util/fill_buf.rs b/tokio/src/io/util/fill_buf.rs index f7715f1c5d9..19f432761e9 100644 --- a/tokio/src/io/util/fill_buf.rs +++ b/tokio/src/io/util/fill_buf.rs @@ -18,7 +18,7 @@ pin_project! { } } -pub(crate) fn fill_buf<'a, R>(reader: &'a mut R) -> FillBuf<'a, R> +pub(crate) fn fill_buf(reader: &mut R) -> FillBuf<'_, R> where R: AsyncBufRead + ?Sized + Unpin, { From 4968b6ddad63b0ec1790fd7cedef939ccf2782ec Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 26 Jul 2021 13:27:31 +0200 Subject: [PATCH 3/5] Add tests --- tokio/tests/io_buf_reader.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/tokio/tests/io_buf_reader.rs b/tokio/tests/io_buf_reader.rs index c72c058d73c..0d3f6bafc20 100644 --- a/tokio/tests/io_buf_reader.rs +++ b/tokio/tests/io_buf_reader.rs @@ -8,9 +8,11 @@ use std::cmp; use std::io::{self, Cursor}; use std::pin::Pin; use tokio::io::{ - AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader, - ReadBuf, SeekFrom, + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, + BufReader, ReadBuf, SeekFrom, }; +use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; macro_rules! run_fill_buf { ($reader:expr) => {{ @@ -348,3 +350,30 @@ async fn maybe_pending_seek() { Pin::new(&mut reader).consume(1); assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); } + +// This tests the AsyncBufReadExt::fill_buf wrapper. +#[tokio::test] +async fn test_fill_buf_wrapper() { + let (mut write, read) = tokio::io::duplex(16); + + let mut read = BufReader::new(read); + write.write_all(b"hello world").await.unwrap(); + + assert_eq!(read.fill_buf().await.unwrap(), b"hello world"); + read.consume(b"hello ".len()); + assert_eq!(read.fill_buf().await.unwrap(), b"world"); + assert_eq!(read.fill_buf().await.unwrap(), b"world"); + read.consume(b"world".len()); + + let mut fill = spawn(read.fill_buf()); + assert_pending!(fill.poll()); + + write.write_all(b"foo bar").await.unwrap(); + assert_eq!(assert_ready!(fill.poll()).unwrap(), b"foo bar"); + drop(fill); + + drop(write); + assert_eq!(read.fill_buf().await.unwrap(), b"foo bar"); + read.consume(b"foo bar".len()); + assert_eq!(read.fill_buf().await.unwrap(), b""); +} From 66ca121405bb6665c9ab79cee98d236484238882 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 26 Jul 2021 14:20:48 +0200 Subject: [PATCH 4/5] Add fill_buf to tests/async_send_sync --- tokio/tests/async_send_sync.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index 97118ce66dc..a04d4b627a9 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -514,6 +514,7 @@ async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, & async_assert_fn!( tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): Send & Sync & !Unpin ); +async_assert_fn!(tokio::io::AsyncBufReadExt::fill_buf(&mut BoxAsyncRead): Send & Sync & !Unpin); async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): Send & Sync & !Unpin); async_assert_fn!(tokio::io::AsyncReadExt::read_buf(&mut BoxAsyncRead, &mut Vec): Send & Sync & !Unpin); async_assert_fn!( From ff04a3bf893b21452df6e8b70d85976f4dc15dd8 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 29 Jul 2021 12:46:36 +0200 Subject: [PATCH 5/5] Add comment --- tokio/src/io/util/fill_buf.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/io/util/fill_buf.rs b/tokio/src/io/util/fill_buf.rs index 19f432761e9..98ae2ea6da5 100644 --- a/tokio/src/io/util/fill_buf.rs +++ b/tokio/src/io/util/fill_buf.rs @@ -34,6 +34,9 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); + // Due to a limitation in the borrow-checker, we cannot return the value + // directly on Ready. Once Rust starts using the polonius borrow checker, + // this can be simplified. let reader = me.reader.take().expect("Polled after completion."); match Pin::new(&mut *reader).poll_fill_buf(cx) { Poll::Ready(_) => match Pin::new(reader).poll_fill_buf(cx) {