Skip to content

Commit

Permalink
Add AsyncBufRead implementation for IntoAsyncRead
Browse files Browse the repository at this point in the history
  • Loading branch information
Nemo157 committed May 1, 2019
1 parent 50f3f71 commit 9b99079
Showing 1 changed file with 79 additions and 1 deletion.
80 changes: 79 additions & 1 deletion futures-util/src/try_stream/into_async_read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::pin::Pin;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
use futures_io::{AsyncRead, AsyncBufRead};
use std::cmp;
use std::io::{Error, Result};

Expand Down Expand Up @@ -98,6 +98,59 @@ where
}
}

impl<St> AsyncBufRead for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St::Ok: AsRef<[u8]>,
{
fn poll_fill_buf<'a>(
mut self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<&'a [u8]>> {
if let ReadState::PendingChunk = self.state {
match ready!(Pin::new(&mut self.stream).try_poll_next(cx)) {
Some(Ok(chunk)) => {
self.state = ReadState::Ready {
chunk,
chunk_start: 0,
};
}
Some(Err(err)) => {
self.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
self.state = ReadState::Eof;
return Poll::Ready(Ok(&[]));
}
}
}

if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
let chunk = chunk.as_ref();
return Poll::Ready(Ok(&chunk[chunk_start..]));
}

// To get to this point we must be in ReadState::Eof
return Poll::Ready(Ok(&[]));
}

fn consume(
mut self: Pin<&mut Self>,
amount: usize,
) {
if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
*chunk_start += amount;
debug_assert!(*chunk_start <= chunk.as_ref().len());
if *chunk_start >= chunk.as_ref().len() {
self.state = ReadState::PendingChunk;
}
} else {
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -148,4 +201,29 @@ mod tests {

assert_read!(reader, &mut buf, 0);
}

#[test]
fn test_into_async_bufread() -> std::io::Result<()> {
let stream = stream::iter(1..=2).map(|_| Ok(vec![1, 2, 3, 4, 5]));
let mut reader = stream.into_async_read();

let mut cx = noop_context();
let mut reader = Pin::new(&mut reader);

assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[1, 2, 3, 4, 5][..]));
reader.as_mut().consume(3);

assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[4, 5][..]));
reader.as_mut().consume(2);

assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[1, 2, 3, 4, 5][..]));
reader.as_mut().consume(2);

assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[3, 4, 5][..]));
reader.as_mut().consume(3);

assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[][..]));

Ok(())
}
}

0 comments on commit 9b99079

Please sign in to comment.