From 340663ac3288cdaa49940de7146c9b55918a92e7 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Wed, 14 Aug 2019 19:32:09 +0900 Subject: [PATCH] io: add async BufReader/BufWriter --- tokio-io/Cargo.toml | 3 +- tokio-io/src/io/buf_reader.rs | 168 ++++++++++++++++++++++++++++++++++ tokio-io/src/io/buf_writer.rs | 159 ++++++++++++++++++++++++++++++++ tokio-io/src/io/mod.rs | 10 ++ tokio-io/src/lib.rs | 2 +- tokio/src/io.rs | 3 +- 6 files changed, 342 insertions(+), 3 deletions(-) create mode 100644 tokio-io/src/io/buf_reader.rs create mode 100644 tokio-io/src/io/buf_writer.rs diff --git a/tokio-io/Cargo.toml b/tokio-io/Cargo.toml index 8a350a906fa..521ca2b7a05 100644 --- a/tokio-io/Cargo.toml +++ b/tokio-io/Cargo.toml @@ -20,13 +20,14 @@ Core I/O primitives for asynchronous I/O in Rust. categories = ["asynchronous"] [features] -util = ["memchr"] +util = ["memchr", "pin-utils"] [dependencies] bytes = "0.4.7" log = "0.4" futures-core-preview = "=0.3.0-alpha.18" memchr = { version = "2.2", optional = true } +pin-utils = { version = "=0.1.0-alpha.4", optional = true } [dev-dependencies] tokio = { version = "=0.2.0-alpha.1", path = "../tokio" } diff --git a/tokio-io/src/io/buf_reader.rs b/tokio-io/src/io/buf_reader.rs new file mode 100644 index 00000000000..b8347d09c0a --- /dev/null +++ b/tokio-io/src/io/buf_reader.rs @@ -0,0 +1,168 @@ +use super::DEFAULT_BUF_SIZE; +use crate::{AsyncBufRead, AsyncRead}; +use futures_core::ready; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::io::{self, Read}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{cmp, fmt}; + +/// The `BufReader` struct adds buffering to any reader. +/// +/// It can be excessively inefficient to work directly with a [`AsyncRead`] +/// instance. A `BufReader` performs large, infrequent reads on the underlying +/// [`AsyncRead`] and maintains an in-memory buffer of the results. +/// +/// `BufReader` can improve the speed of programs that make *small* and +/// *repeated* read calls to the same file or network socket. It does not +/// help when reading very large amounts at once, or reading just one or a few +/// times. It also provides no advantage when reading from a source that is +/// already in memory, like a `Vec`. +/// +/// When the `BufReader` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufReader` on the same +/// stream can cause data loss. +/// +/// [`AsyncRead`]: tokio_io::AsyncRead +/// +// TODO: Examples +pub struct BufReader { + inner: R, + buf: Box<[u8]>, + pos: usize, + cap: usize, +} + +impl BufReader { + unsafe_pinned!(inner: R); + unsafe_unpinned!(pos: usize); + unsafe_unpinned!(cap: usize); + + /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: R) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufReader` with the specified buffer capacity. + pub fn with_capacity(capacity: usize, inner: R) -> Self { + unsafe { + let mut buffer = Vec::with_capacity(capacity); + buffer.set_len(capacity); + inner.prepare_uninitialized_buffer(&mut buffer); + Self { + inner, + buf: buffer.into_boxed_slice(), + pos: 0, + cap: 0, + } + } + } + + /// Gets a reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.inner() + } + + /// Consumes this `BufWriter`, returning the underlying reader. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> R { + self.inner + } + + /// Returns a reference to the internally buffered data. + /// + /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. + pub fn buffer(&self) -> &[u8] { + &self.buf[self.pos..self.cap] + } + + /// Invalidates all data in the internal buffer. + #[inline] + fn discard_buffer(mut self: Pin<&mut Self>) { + *self.as_mut().pos() = 0; + *self.cap() = 0; + } +} + +impl AsyncRead for BufReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.pos == self.cap && buf.len() >= self.buf.len() { + let res = ready!(self.as_mut().inner().poll_read(cx, buf)); + self.discard_buffer(); + return Poll::Ready(res); + } + let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read(buf)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } + + // we can't skip unconditionally because of the large buffer case in read. + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} + +impl AsyncBufRead for BufReader { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + inner, + buf, + cap, + pos, + } = unsafe { self.get_unchecked_mut() }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; + + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + if *pos >= *cap { + debug_assert!(*pos == *cap); + *cap = ready!(inner.as_mut().poll_read(cx, buf))?; + *pos = 0; + } + Poll::Ready(Ok(&buf[*pos..*cap])) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + *self.as_mut().pos() = cmp::min(self.pos + amt, self.cap); + } +} + +impl fmt::Debug for BufReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufReader") + .field("reader", &self.inner) + .field( + "buffer", + &format_args!("{}/{}", self.cap - self.pos, self.buf.len()), + ) + .finish() + } +} diff --git a/tokio-io/src/io/buf_writer.rs b/tokio-io/src/io/buf_writer.rs new file mode 100644 index 00000000000..c91cb6fd5ed --- /dev/null +++ b/tokio-io/src/io/buf_writer.rs @@ -0,0 +1,159 @@ +use super::DEFAULT_BUF_SIZE; +use crate::AsyncWrite; +use futures_core::ready; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::fmt; +use std::io::{self, Write}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Wraps a writer and buffers its output. +/// +/// It can be excessively inefficient to work directly with something that +/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and +/// writes it to an underlying writer in large, infrequent batches. +/// +/// `BufWriter` can improve the speed of programs that make *small* and +/// *repeated* write calls to the same file or network socket. It does not +/// help when writing very large amounts at once, or writing just one or a few +/// times. It also provides no advantage when writing to a destination that is +/// in memory, like a `Vec`. +/// +/// When the `BufWriter` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufWriter` on the same +/// stream can cause data loss. If you need to write out the contents of its +/// buffer, you must manually call flush before the writer is dropped. +/// +/// [`AsyncWrite`]: tokio_io::AsyncWrite +/// [`flush`]: super::AsyncWriteExt::flush +/// +// TODO: Examples +pub struct BufWriter { + inner: W, + buf: Vec, + written: usize, +} + +impl BufWriter { + unsafe_pinned!(inner: W); + unsafe_unpinned!(buf: Vec); + + /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: W) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufWriter` with the specified buffer capacity. + pub fn with_capacity(cap: usize, inner: W) -> Self { + Self { + inner, + buf: Vec::with_capacity(cap), + written: 0, + } + } + + fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + inner, + buf, + written, + } = unsafe { self.get_unchecked_mut() }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; + + let len = buf.len(); + let mut ret = Ok(()); + while *written < len { + match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) { + Ok(0) => { + ret = Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write the buffered data", + )); + break; + } + Ok(n) => *written += n, + Err(e) => { + ret = Err(e); + break; + } + } + } + if *written > 0 { + buf.drain(..*written); + } + *written = 0; + Poll::Ready(ret) + } + + /// Gets a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + &self.inner + } + + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_mut(&mut self) -> &mut W { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.inner() + } + + /// Consumes this `BufWriter`, returning the underlying writer. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> W { + self.inner + } + + /// Returns a reference to the internally buffered data. + pub fn buffer(&self) -> &[u8] { + &self.buf + } +} + +impl AsyncWrite for BufWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.buf.len() + buf.len() > self.buf.capacity() { + ready!(self.as_mut().flush_buf(cx))?; + } + if buf.len() >= self.buf.capacity() { + self.inner().poll_write(cx, buf) + } else { + Poll::Ready(self.buf().write(buf)) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_shutdown(cx) + } +} + +impl fmt::Debug for BufWriter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufWriter") + .field("writer", &self.inner) + .field( + "buffer", + &format_args!("{}/{}", self.buf.len(), self.buf.capacity()), + ) + .field("written", &self.written) + .finish() + } +} diff --git a/tokio-io/src/io/mod.rs b/tokio-io/src/io/mod.rs index 71904c1fec4..8e05bb085c3 100644 --- a/tokio-io/src/io/mod.rs +++ b/tokio-io/src/io/mod.rs @@ -39,6 +39,8 @@ mod async_buf_read_ext; mod async_read_ext; mod async_write_ext; +mod buf_reader; +mod buf_writer; mod copy; mod flush; mod lines; @@ -58,3 +60,11 @@ pub use self::async_buf_read_ext::AsyncBufReadExt; pub use self::async_read_ext::AsyncReadExt; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::async_write_ext::AsyncWriteExt; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::buf_reader::BufReader; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::buf_writer::BufWriter; + +// used by `BufReader` and `BufWriter` +// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 +const DEFAULT_BUF_SIZE: usize = 8 * 1024; diff --git a/tokio-io/src/lib.rs b/tokio-io/src/lib.rs index df7ffdef1a8..0747905fb7c 100644 --- a/tokio-io/src/lib.rs +++ b/tokio-io/src/lib.rs @@ -27,7 +27,7 @@ pub use self::async_read::AsyncRead; pub use self::async_write::AsyncWrite; #[cfg(feature = "util")] -pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; +pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; // Re-export `Buf` and `BufMut` since they are part of the API pub use bytes::{Buf, BufMut}; diff --git a/tokio/src/io.rs b/tokio/src/io.rs index 2dbec25ceb6..0183d28bf57 100644 --- a/tokio/src/io.rs +++ b/tokio/src/io.rs @@ -40,7 +40,8 @@ #[cfg(feature = "fs")] pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout}; pub use tokio_io::{ - AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, + BufWriter, }; // Re-export io::Error so that users don't have to deal