From 52e3682e4fca9d8b4143fce2cab02a5b94410e77 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Fri, 14 Jun 2019 20:39:40 +0200 Subject: [PATCH] Redefine AsyncReadExt::copy_into on top of AsyncBufReadExt::copy_buf_into --- futures-util/src/io/copy_into.rs | 69 ++++++-------------------------- futures-util/src/io/mod.rs | 7 +--- 2 files changed, 14 insertions(+), 62 deletions(-) diff --git a/futures-util/src/io/copy_into.rs b/futures-util/src/io/copy_into.rs index 629a7783c3..552b5fdecc 100644 --- a/futures-util/src/io/copy_into.rs +++ b/futures-util/src/io/copy_into.rs @@ -3,75 +3,32 @@ use futures_core::task::{Context, Poll}; use futures_io::{AsyncRead, AsyncWrite}; use std::io; use std::pin::Pin; +use super::{BufReader, CopyBufInto}; +use pin_utils::unsafe_pinned; /// Future for the [`copy_into`](super::AsyncReadExt::copy_into) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyInto<'a, R: ?Sized + Unpin, W: ?Sized + Unpin> { - reader: &'a mut R, - read_done: bool, - writer: &'a mut W, - pos: usize, - cap: usize, - amt: u64, - buf: Box<[u8]>, +pub struct CopyInto { + inner: CopyBufInto, W>, } -impl Unpin for CopyInto<'_, R, W> {} +impl Unpin for CopyInto where CopyBufInto, W>: Unpin {} -impl<'a, R: ?Sized + Unpin, W: ?Sized + Unpin> CopyInto<'a, R, W> { - pub(super) fn new(reader: &'a mut R, writer: &'a mut W) -> Self { +impl CopyInto { + unsafe_pinned!(inner: CopyBufInto, W>); + + pub(super) fn new(reader: R, writer: W) -> Self { CopyInto { - reader, - read_done: false, - writer, - amt: 0, - pos: 0, - cap: 0, - buf: Box::new([0; 2048]), + inner: CopyBufInto::new(BufReader::with_capacity(2048, reader), writer), } } } -impl Future for CopyInto<'_, R, W> - where R: AsyncRead + ?Sized + Unpin, - W: AsyncWrite + ?Sized + Unpin, -{ +impl Future for CopyInto { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; - loop { - // If our buffer is empty, then we need to read some data to - // continue. - if this.pos == this.cap && !this.read_done { - let n = ready!(Pin::new(&mut this.reader).poll_read(cx, &mut this.buf))?; - if n == 0 { - this.read_done = true; - } else { - this.pos = 0; - this.cap = n; - } - } - - // If our buffer has some data, let's write it out! - while this.pos < this.cap { - let i = ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buf[this.pos..this.cap]))?; - if i == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } else { - this.pos += i; - this.amt += i as u64; - } - } - - // If we've written al the data and we've seen EOF, flush out the - // data and finish the transfer. - // done with the entire transfer. - if this.pos == this.cap && this.read_done { - ready!(Pin::new(&mut this.writer).poll_flush(cx))?; - return Poll::Ready(Ok(this.amt)); - } - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.inner().poll(cx) } } diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 43bb4f4da9..caf2d2f37f 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -107,12 +107,7 @@ pub trait AsyncReadExt: AsyncRead { /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - fn copy_into<'a, W>( - &'a mut self, - writer: &'a mut W, - ) -> CopyInto<'a, Self, W> - where Self: Unpin, W: AsyncWrite + Unpin, - { + fn copy_into(self, writer: W) -> CopyInto where Self: Sized { CopyInto::new(self, writer) }