Skip to content

Commit

Permalink
Remove TryStreamExt::into_async_read Unpin bound (#2599)
Browse files Browse the repository at this point in the history
  • Loading branch information
khollbach authored May 11, 2022
1 parent a5f4934 commit cc16821
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 61 deletions.
101 changes: 51 additions & 50 deletions futures-util/src/stream/try_stream/into_async_read.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
use crate::stream::TryStreamExt;
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use pin_project_lite::pin_project;
use std::cmp;
use std::io::{Error, Result};

/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
#[derive(Debug)]
#[must_use = "readers do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
pub struct IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St::Ok: AsRef<[u8]>,
{
stream: St,
state: ReadState<St::Ok>,
}

impl<St> Unpin for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St::Ok: AsRef<[u8]>,
{
pin_project! {
/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
#[derive(Debug)]
#[must_use = "readers do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
pub struct IntoAsyncRead<St>
where
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
#[pin]
stream: St,
state: ReadState<St::Ok>,
}
}

#[derive(Debug)]
Expand All @@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {

impl<St> IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
pub(super) fn new(stream: St) -> Self {
Expand All @@ -46,16 +42,18 @@ where

impl<St> AsyncRead for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
let mut this = self.project();

loop {
match &mut self.state {
match this.state {
ReadState::Ready { chunk, chunk_start } => {
let chunk = chunk.as_ref();
let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
Expand All @@ -64,23 +62,23 @@ where
*chunk_start += len;

if chunk.len() == *chunk_start {
self.state = ReadState::PendingChunk;
*this.state = ReadState::PendingChunk;
}

return Poll::Ready(Ok(len));
}
ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready { chunk, chunk_start: 0 };
*this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Ok(0));
}
},
Expand All @@ -94,51 +92,52 @@ where

impl<St> AsyncWrite for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + AsyncWrite + Unpin,
St: TryStream<Error = Error> + AsyncWrite,
St::Ok: AsRef<[u8]>,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
let this = self.project();
this.stream.poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.stream).poll_flush(cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
this.stream.poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.stream).poll_close(cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
this.stream.poll_close(cx)
}
}

impl<St> AsyncBufRead for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
while let ReadState::PendingChunk = self.state {
match ready!(self.stream.try_poll_next_unpin(cx)) {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
let mut this = self.project();

while let ReadState::PendingChunk = this.state {
match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready { chunk, chunk_start: 0 };
*this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Ok(&[]));
}
}
}

if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
let chunk = chunk.as_ref();
return Poll::Ready(Ok(&chunk[chunk_start..]));
}
Expand All @@ -147,16 +146,18 @@ where
Poll::Ready(Ok(&[]))
}

fn consume(mut self: Pin<&mut Self>, amount: usize) {
fn consume(self: Pin<&mut Self>, amount: usize) {
let this = self.project();

// https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
if amount == 0 {
return;
}
if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
if let ReadState::Ready { chunk, chunk_start } = this.state {
*chunk_start += amount;
debug_assert!(*chunk_start <= chunk.as_ref().len());
if *chunk_start >= chunk.as_ref().len() {
self.state = ReadState::PendingChunk;
*this.state = ReadState::PendingChunk;
}
} else {
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
Expand Down
17 changes: 6 additions & 11 deletions futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,12 +985,7 @@ pub trait TryStreamExt: TryStream {
Compat::new(self)
}

/// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
///
/// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
/// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
/// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
/// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
/// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
///
/// This method is only available when the `std` feature of this
/// library is activated, and it is activated by default.
Expand All @@ -1002,20 +997,20 @@ pub trait TryStreamExt: TryStream {
/// use futures::stream::{self, TryStreamExt};
/// use futures::io::AsyncReadExt;
///
/// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
/// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
/// let mut reader = stream.into_async_read();
/// let mut buf = Vec::new();
///
/// assert!(reader.read_to_end(&mut buf).await.is_ok());
/// assert_eq!(buf, &[1, 2, 3, 4, 5]);
/// let mut buf = Vec::new();
/// reader.read_to_end(&mut buf).await.unwrap();
/// assert_eq!(buf, [1, 2, 3, 4, 5]);
/// # })
/// ```
#[cfg(feature = "io")]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
#[cfg(feature = "std")]
fn into_async_read(self) -> IntoAsyncRead<Self>
where
Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
Self: Sized + TryStreamExt<Error = std::io::Error>,
Self::Ok: AsRef<[u8]>,
{
crate::io::assert_read(IntoAsyncRead::new(self))
Expand Down

0 comments on commit cc16821

Please sign in to comment.