Skip to content

Commit

Permalink
Change copy_into/copy_buf_into to free functions for consistency with…
Browse files Browse the repository at this point in the history
… the standard library
  • Loading branch information
taiki-e committed Nov 5, 2019
1 parent 0c165a3 commit cf2003b
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 124 deletions.
63 changes: 63 additions & 0 deletions futures-util/src/io/copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, AsyncWrite};
use std::io;
use std::pin::Pin;
use super::{BufReader, copy_buf, CopyBuf};
use pin_utils::unsafe_pinned;

/// Creates a future which copies all the bytes from one object to another.
///
/// The returned future will copy all the bytes read from this `AsyncRead` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{self, AsyncWriteExt, Cursor};
///
/// let reader = Cursor::new([1, 2, 3, 4]);
/// let mut writer = Cursor::new(vec![0u8; 5]);
///
/// let bytes = io::copy(reader, &mut writer).await?;
/// writer.close().await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn copy<R, W>(reader: R, writer: &mut W) -> Copy<'_, R, W>
where
R: AsyncRead,
W: AsyncWrite + Unpin + ?Sized,
{
Copy {
inner: copy_buf(BufReader::new(reader), writer),
}
}

/// Future for the [`copy()`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Copy<'a, R, W: ?Sized> {
inner: CopyBuf<'a, BufReader<R>, W>,
}

impl<'a, R: AsyncRead, W: ?Sized> Unpin for Copy<'a, R, W> where CopyBuf<'a, BufReader<R>, W>: Unpin {}

impl<'a, R: AsyncRead, W: ?Sized> Copy<'a, R, W> {
unsafe_pinned!(inner: CopyBuf<'a, BufReader<R>, W>);
}

impl<R: AsyncRead, W: AsyncWrite + Unpin + ?Sized> Future for Copy<'_, R, W> {
type Output = io::Result<u64>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner().poll(cx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,55 @@ use futures_io::{AsyncBufRead, AsyncWrite};
use std::io;
use std::pin::Pin;

/// Future for the [`copy_buf_into`](super::AsyncBufReadExt::copy_buf_into) method.
/// Creates a future which copies all the bytes from one object to another.
///
/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{self, AsyncWriteExt, Cursor};
///
/// let reader = Cursor::new([1, 2, 3, 4]);
/// let mut writer = Cursor::new(vec![0u8; 5]);
///
/// let bytes = io::copy_buf(reader, &mut writer).await?;
/// writer.close().await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn copy_buf<R, W>(reader: R, writer: &mut W) -> CopyBuf<'_, R, W>
where
R: AsyncBufRead,
W: AsyncWrite + Unpin + ?Sized,
{
CopyBuf {
reader,
writer,
amt: 0,
}
}

/// Future for the [`copy_buf()`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CopyBufInto<'a, R, W: ?Sized> {
pub struct CopyBuf<'a, R, W: ?Sized> {
reader: R,
writer: &'a mut W,
amt: u64,
}

impl<R: Unpin, W: ?Sized> Unpin for CopyBufInto<'_, R, W> {}

impl<R, W: ?Sized> CopyBufInto<'_, R, W> {
pub(super) fn new(reader: R, writer: &mut W) -> CopyBufInto<'_, R, W> {
CopyBufInto {
reader,
writer,
amt: 0,
}
}
}
impl<R: Unpin, W: ?Sized> Unpin for CopyBuf<'_, R, W> {}

impl<R, W: Unpin + ?Sized> CopyBufInto<'_, R, W> {
impl<R, W: Unpin + ?Sized> CopyBuf<'_, R, W> {
fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, Pin<&mut W>, &mut u64) {
unsafe {
let this = self.get_unchecked_mut();
Expand All @@ -34,7 +61,7 @@ impl<R, W: Unpin + ?Sized> CopyBufInto<'_, R, W> {
}
}

impl<R, W> Future for CopyBufInto<'_, R, W>
impl<R, W> Future for CopyBuf<'_, R, W>
where R: AsyncBufRead,
W: AsyncWrite + Unpin + ?Sized,
{
Expand Down
34 changes: 0 additions & 34 deletions futures-util/src/io/copy_into.rs

This file was deleted.

80 changes: 7 additions & 73 deletions futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ pub use self::chain::Chain;
mod close;
pub use self::close::Close;

mod copy_into;
pub use self::copy_into::CopyInto;
mod copy;
pub use self::copy::{copy, Copy};

mod copy_buf_into;
pub use self::copy_buf_into::CopyBufInto;
mod copy_buf;
pub use self::copy_buf::{copy_buf, CopyBuf};

mod cursor;
pub use self::cursor::Cursor;
Expand Down Expand Up @@ -157,39 +157,6 @@ pub trait AsyncReadExt: AsyncRead {
Chain::new(self, next)
}

/// Creates a future which copies all the bytes from one object to another.
///
/// The returned future will copy all the bytes read from this `AsyncRead` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{AsyncReadExt, AsyncWriteExt, Cursor};
///
/// let reader = Cursor::new([1, 2, 3, 4]);
/// let mut writer = Cursor::new(vec![0u8; 5]);
///
/// let bytes = reader.copy_into(&mut writer).await?;
/// writer.close().await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
fn copy_into<W>(self, writer: &mut W) -> CopyInto<'_, Self, W>
where
Self: Sized,
W: AsyncWrite + Unpin + ?Sized,
{
CopyInto::new(self, writer)
}

/// Tries to read some bytes directly into the given `buf` in asynchronous
/// manner, returning a future type.
///
Expand Down Expand Up @@ -342,7 +309,7 @@ pub trait AsyncReadExt: AsyncRead {
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{AsyncReadExt, Cursor};
/// use futures::io::{self, AsyncReadExt, Cursor};
///
/// // Note that for `Cursor` the read and write halves share a single
/// // seek position. This may or may not be true for other types that
Expand All @@ -354,8 +321,8 @@ pub trait AsyncReadExt: AsyncRead {
///
/// {
/// let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
/// reader.copy_into(&mut buffer_writer).await?;
/// buffer_reader.copy_into(&mut writer).await?;
/// io::copy(reader, &mut buffer_writer).await?;
/// io::copy(buffer_reader, &mut writer).await?;
/// }
///
/// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
Expand Down Expand Up @@ -558,39 +525,6 @@ impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}

/// An extension trait which adds utility methods to `AsyncBufRead` types.
pub trait AsyncBufReadExt: AsyncBufRead {
/// Creates a future which copies all the bytes from one object to another.
///
/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{AsyncBufReadExt, AsyncWriteExt, Cursor};
///
/// let reader = Cursor::new([1, 2, 3, 4]);
/// let mut writer = Cursor::new(vec![0u8; 5]);
///
/// let bytes = reader.copy_buf_into(&mut writer).await?;
/// writer.close().await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
fn copy_buf_into<W>(self, writer: &mut W) -> CopyBufInto<'_, Self, W>
where
Self: Sized,
W: AsyncWrite + Unpin + ?Sized,
{
CopyBufInto::new(self, writer)
}

/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until the delimiter `byte` or EOF is reached.
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
Expand Down
4 changes: 2 additions & 2 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub mod executor {
//! than threads). Tasks spawned onto the pool with the
//! [`spawn_ok()`](crate::executor::ThreadPool::spawn_ok)
//! function will run on ambiently on the created threads.
//!
//!
//! # Spawning additional tasks
//!
//! Tasks can be spawned onto a spawner by calling its
Expand Down Expand Up @@ -271,7 +271,7 @@ pub mod io {

pub use futures_util::io::{
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
BufReader, BufWriter, Cursor, Chain, Close, CopyInto, CopyBufInto,
BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf,
empty, Empty, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf,
ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat,
Repeat, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
Expand Down

0 comments on commit cf2003b

Please sign in to comment.