From 009636cf64ac05b85440212fae0053e369065898 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Thu, 16 May 2019 21:44:32 +0900 Subject: [PATCH] Add async BufWriter --- futures-util/src/io/buf_writer.rs | 182 ++++++++++++++++++++++ futures-util/src/io/mod.rs | 4 +- futures/src/lib.rs | 8 +- futures/tests/io_buf_writer.rs | 247 ++++++++++++++++++++++++++++++ 4 files changed, 435 insertions(+), 6 deletions(-) create mode 100644 futures-util/src/io/buf_writer.rs create mode 100644 futures/tests/io_buf_writer.rs diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs new file mode 100644 index 0000000000..32e316f8fa --- /dev/null +++ b/futures-util/src/io/buf_writer.rs @@ -0,0 +1,182 @@ +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::fmt; +use std::io::{self, Write}; +use std::pin::Pin; +use super::DEFAULT_BUF_SIZE; + +/// Wraps a writer and buffers its output. +/// +/// It can be excessively inefficient to work directly with something that +/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and +/// writes it to an underlying writer in large, infrequent batches. +/// +/// `BufWriter` can improve the speed of programs that make *small* and +/// *repeated* write calls to the same file or network socket. It does not +/// help when writing very large amounts at once, or writing just one or a few +/// times. It also provides no advantage when writing to a destination that is +/// in memory, like a `Vec`. +/// +/// When the `BufWriter` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufWriter` on the same +/// stream can cause data loss. If you need to write out the contents of its +/// buffer, you must manually call flush before the writer is dropped. +/// +/// [`AsyncWrite`]: futures_io::AsyncWrite +/// [`flush`]: super::AsyncWriteExt::flush +/// +// TODO: Examples +pub struct BufWriter { + inner: W, + buf: Vec, + written: usize, +} + +impl BufWriter { + unsafe_pinned!(inner: W); + unsafe_unpinned!(buf: Vec); + + /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: W) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufWriter` with the specified buffer capacity. + pub fn with_capacity(cap: usize, inner: W) -> Self { + Self { + inner, + buf: Vec::with_capacity(cap), + written: 0, + } + } + + fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { inner, buf, written } = unsafe { Pin::get_unchecked_mut(self) }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; + + let len = buf.len(); + let mut ret = Ok(()); + while *written < len { + match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) { + Ok(0) => { + ret = Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write the buffered data", + )); + break; + } + Ok(n) => *written += n, + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => { + ret = Err(e); + break; + } + } + } + if *written > 0 { + buf.drain(..*written); + } + *written = 0; + Poll::Ready(ret) + } + + /// Gets a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + &self.inner + } + + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_mut(&mut self) -> &mut W { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut W> { + self.inner() + } + + /// Consumes this `BufWriter`, returning the underlying writer. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> W { + self.inner + } + + /// Returns a reference to the internally buffered data. + pub fn buffer(&self) -> &[u8] { + &self.buf + } +} + +impl AsyncWrite for BufWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.buf.len() + buf.len() > self.buf.capacity() { + ready!(self.as_mut().flush_buf(cx))?; + } + if buf.len() >= self.buf.capacity() { + self.inner().poll_write(cx, buf) + } else { + Poll::Ready(self.buf().write(buf)) + } + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + let total_len = bufs.iter().map(|b| b.len()).sum::(); + if self.buf.len() + total_len > self.buf.capacity() { + ready!(self.as_mut().flush_buf(cx))?; + } + if total_len >= self.buf.capacity() { + self.inner().poll_write_vectored(cx, bufs) + } else { + Poll::Ready(self.buf().write_vectored(bufs)) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_close(cx) + } +} + +impl fmt::Debug for BufWriter { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BufWriter") + .field("writer", &self.inner) + .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) + .field("written", &self.written) + .finish() + } +} + +impl AsyncSeek for BufWriter { + /// Seek to the offset, in bytes, in the underlying writer. + /// + /// Seeking always writes out the internal buffer before seeking. + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_seek(cx, pos) + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index e8b7fd8b2d..4e15d2be00 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -25,8 +25,8 @@ pub use self::allow_std::AllowStdIo; mod buf_reader; pub use self::buf_reader::BufReader; -// mod buf_writer; -// pub use self::buf_writer::BufWriter; +mod buf_writer; +pub use self::buf_writer::BufWriter; mod copy_into; pub use self::copy_into::CopyInto; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 70c25eabdc..ac02a1d298 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -295,10 +295,10 @@ pub mod io { }; pub use futures_util::io::{ - AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader, - Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd, - ReadUntil, Seek, Window, WriteAll, WriteHalf, - Lines, + AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, + BufReader, BufWriter, Close, CopyInto, Flush, Lines, Read, ReadExact, + ReadHalf, ReadLine, ReadToEnd, ReadUntil, Seek, Window, WriteAll, + WriteHalf, }; } diff --git a/futures/tests/io_buf_writer.rs b/futures/tests/io_buf_writer.rs new file mode 100644 index 0000000000..6054b0280b --- /dev/null +++ b/futures/tests/io_buf_writer.rs @@ -0,0 +1,247 @@ +use futures::executor::block_on; +use futures::future::Future; +use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, SeekFrom}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::io::{self, Cursor}; +use std::pin::Pin; + +macro_rules! run_write { + ($writer:expr, $buf:expr) => {{ + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut $writer).poll_write(&mut cx, $buf) { + break x; + } + } + }}; +} + +#[test] +fn buf_writer() { + let mut writer = BufWriter::with_capacity(2, Vec::new()); + + run_write!(&mut writer, &[0, 1]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1]); + + run_write!(&mut writer, &[2]).unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(*writer.get_ref(), [0, 1]); + + run_write!(&mut writer, &[3]).unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + run_write!(&mut writer, &[4]).unwrap(); + run_write!(&mut writer, &[5]).unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + run_write!(&mut writer, &[6]).unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + + run_write!(&mut writer, &[7, 8]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); + + run_write!(&mut writer, &[9, 10, 11]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + block_on(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[test] +fn buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, Vec::new()); + run_write!(&mut w, &[0, 1]).unwrap(); + assert_eq!(*w.get_ref(), []); + block_on(w.flush()).unwrap(); + let w = w.into_inner(); + assert_eq!(w, [0, 1]); +} + +#[test] +fn buf_writer_seek() { + // FIXME: when https://github.com/rust-lang-nursery/futures-rs/issues/1510 fixed, + // use `Vec::new` instead of `vec![0; 8]`. + let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8])); + block_on(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap(); + block_on(w.write_all(&[6, 7])).unwrap(); + assert_eq!(block_on(w.seek(SeekFrom::Current(0))).ok(), Some(8)); + assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(block_on(w.seek(SeekFrom::Start(2))).ok(), Some(2)); + block_on(w.write_all(&[8, 9])).unwrap(); + block_on(w.flush()).unwrap(); + assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} + +struct MaybePending { + inner: Vec, + ready: bool, +} + +impl MaybePending { + fn new(inner: Vec) -> Self { + Self { inner, ready: false } + } +} + +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) { + return x; + } + } +} + +#[test] +fn maybe_pending_buf_writer() { + let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); + + run_write!(&mut writer, &[0, 1]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run_write!(&mut writer, &[2]).unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run_write!(&mut writer, &[3]).unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + run_write!(&mut writer, &[4]).unwrap(); + run_write!(&mut writer, &[5]).unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + run_write!(&mut writer, &[6]).unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]); + + run_write!(&mut writer, &[7, 8]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); + + run_write!(&mut writer, &[9, 10, 11]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + run(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[test] +fn maybe_pending_buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); + run_write!(&mut w, &[0, 1]).unwrap(); + assert_eq!(&w.get_ref().inner, &[]); + run(w.flush()).unwrap(); + let w = w.into_inner().inner; + assert_eq!(w, [0, 1]); +} + + +struct MaybePendingSeek { + inner: Cursor>, + ready_write: bool, + ready_seek: bool, +} + +impl MaybePendingSeek { + fn new(inner: Vec) -> Self { + Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false } + } +} + +impl AsyncWrite for MaybePendingSeek { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready_write { + self.ready_write = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready_write = true; + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + +impl AsyncSeek for MaybePendingSeek { + fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + if self.ready_seek { + self.ready_seek = false; + Pin::new(&mut self.inner).poll_seek(cx, pos) + } else { + self.ready_seek = true; + Poll::Pending + } + } +} + +#[test] +fn maybe_pending_buf_writer_seek() { + // FIXME: when https://github.com/rust-lang-nursery/futures-rs/issues/1510 fixed, + // use `Vec::new` instead of `vec![0; 8]`. + let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8])); + run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap(); + run(w.write_all(&[6, 7])).unwrap(); + assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8)); + assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2)); + run(w.write_all(&[8, 9])).unwrap(); + run(w.flush()).unwrap(); + assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +}