diff --git a/tokio-io/Cargo.toml b/tokio-io/Cargo.toml index d5e5cb786c1..82c520097ec 100644 --- a/tokio-io/Cargo.toml +++ b/tokio-io/Cargo.toml @@ -28,6 +28,7 @@ log = "0.4" futures-core-preview = "=0.3.0-alpha.18" memchr = { version = "2.2", optional = true } pin-utils = { version = "=0.1.0-alpha.4", optional = true } +pin-project = "=0.4.0-alpha.11" [dev-dependencies] tokio = { version = "0.2.0-alpha.4", path = "../tokio" } diff --git a/tokio-io/src/io/buf_reader.rs b/tokio-io/src/io/buf_reader.rs index 7f0fdd7c7e8..738f5a5a2c2 100644 --- a/tokio-io/src/io/buf_reader.rs +++ b/tokio-io/src/io/buf_reader.rs @@ -1,5 +1,5 @@ use super::DEFAULT_BUF_SIZE; -use crate::{AsyncBufRead, AsyncRead}; +use crate::{AsyncBufRead, AsyncRead, AsyncWrite}; use futures_core::ready; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::io::{self, Read}; @@ -152,6 +152,24 @@ impl AsyncBufRead for BufReader { } } +impl AsyncWrite for BufReader { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_shutdown(cx) + } +} + impl fmt::Debug for BufReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") diff --git a/tokio-io/src/io/buf_stream.rs b/tokio-io/src/io/buf_stream.rs new file mode 100644 index 00000000000..f2abfb08ec3 --- /dev/null +++ b/tokio-io/src/io/buf_stream.rs @@ -0,0 +1,71 @@ +use crate::io::{BufReader, BufWriter}; +use crate::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project::pin_project; +use std::io::{self}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +/// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. +/// +/// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] +/// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall +/// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] +/// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps +/// one in the other so that both directions are buffered. See their documentation for details. +#[pin_project] +#[derive(Debug)] +pub struct BufStream(#[pin] BufReader>); + +impl BufStream { + /// Wrap a type in both [`BufWriter`] and [`BufReader`]. + /// + /// See the documentation for those types and [`BufStream`] for details. + pub fn new(stream: RW) -> BufStream { + BufStream(BufReader::new(BufWriter::new(stream))) + } +} + +impl AsyncWrite for BufStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().0.poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_shutdown(cx) + } +} + +impl AsyncRead for BufStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().0.poll_read(cx, buf) + } + + // we can't skip unconditionally because of the large buffer case in read. + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } +} + +impl AsyncBufRead for BufStream { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project_into().0.poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} diff --git a/tokio-io/src/io/buf_writer.rs b/tokio-io/src/io/buf_writer.rs index 438bc653be7..2e8613b6e9e 100644 --- a/tokio-io/src/io/buf_writer.rs +++ b/tokio-io/src/io/buf_writer.rs @@ -1,5 +1,5 @@ use super::DEFAULT_BUF_SIZE; -use crate::AsyncWrite; +use crate::{AsyncBufRead, AsyncRead, AsyncWrite}; use futures_core::ready; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::fmt; @@ -145,6 +145,31 @@ impl AsyncWrite for BufWriter { } } +impl AsyncRead for BufWriter { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.get_pin_mut().poll_read(cx, buf) + } + + // we can't skip unconditionally because of the large buffer case in read. + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl AsyncBufRead for BufWriter { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.get_pin_mut().consume(amt) + } +} + impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufWriter") diff --git a/tokio-io/src/io/mod.rs b/tokio-io/src/io/mod.rs index 36eae7a3d8b..ab3a2b42d89 100644 --- a/tokio-io/src/io/mod.rs +++ b/tokio-io/src/io/mod.rs @@ -2,6 +2,7 @@ mod async_buf_read_ext; mod async_read_ext; mod async_write_ext; mod buf_reader; +mod buf_stream; mod buf_writer; mod chain; mod copy; @@ -27,6 +28,8 @@ pub use self::async_write_ext::AsyncWriteExt; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::buf_reader::BufReader; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::buf_stream::BufStream; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::buf_writer::BufWriter; // used by `BufReader` and `BufWriter` diff --git a/tokio-io/src/lib.rs b/tokio-io/src/lib.rs index 0e9c5f7204c..f54fc065e36 100644 --- a/tokio-io/src/lib.rs +++ b/tokio-io/src/lib.rs @@ -31,7 +31,7 @@ pub use self::async_read::AsyncRead; pub use self::async_write::AsyncWrite; #[cfg(feature = "util")] -pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream, BufWriter}; // Re-export `Buf` and `BufMut` since they are part of the API pub use bytes::{Buf, BufMut};