Skip to content

Commit

Permalink
Update future types to be Send
Browse files Browse the repository at this point in the history
Update future types used by `AsyncReadResponseExt` to be `Send` whenever `T` is `Send` as appropriate. Consolidate the newtype futures with a macro that handles conditional Send properly to reduce boilerplate and chance of mistakes.

Fixes #283.
  • Loading branch information
sagebind committed Dec 31, 2020
1 parent 48e4913 commit 970d0af
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 49 deletions.
44 changes: 44 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,47 @@ macro_rules! match_type {
}
}};
}

macro_rules! decl_future {
(
$(
$(#[$meta:meta])*
$vis:vis type $ident:ident$(<$($T:ident),*>)? = impl Future<Output = $output:ty> $(+ SendIf<$($S:ident),+>)?;
)*
) => {
$(
$(#[$meta])*
#[allow(missing_debug_implementations, non_snake_case)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
$vis struct $ident<'a $($(, $T)*)*> {
inner: ::std::pin::Pin<Box<dyn ::std::future::Future<Output = $output> + 'a>>,
$($($T: ::std::marker::PhantomData<$T>,)*)*
}

impl<'a, $($($T)*)*> $ident<'a, $($($T)*)*> {
pub(crate) fn new<F>(future: F) -> Self
where
F: ::std::future::Future<Output = $output> + 'a,
{
Self {
inner: Box::pin(future),
$($($T: ::std::marker::PhantomData,)*)*
}
}
}

impl<$($($T: Unpin)*)*> ::std::future::Future for $ident<'_, $($($T)*)*> {
type Output = $output;

fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll<Self::Output> {
self.as_mut().inner.as_mut().poll(cx)
}
}

$(
#[allow(unsafe_code)]
unsafe impl<$($S: Send),*> Send for $ident<'_, $($S)*> {}
)*
)*
};
}
33 changes: 14 additions & 19 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ use futures_lite::io::{AsyncRead, AsyncWrite};
use http::{Response, Uri};
use std::{
fs::File,
future::Future,
io::{self, Read, Write},
net::SocketAddr,
path::Path,
pin::Pin,
task::{Context, Poll},
};

/// Provides extension methods for working with HTTP responses.
Expand Down Expand Up @@ -229,7 +226,7 @@ pub trait AsyncReadResponseExt<T: AsyncRead + Unpin> {
/// println!("Read {} bytes", buf.len());
/// # Ok(()) }
/// ```
fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a>
fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, T>
where
W: AsyncWrite + Unpin + 'a;

Expand Down Expand Up @@ -260,13 +257,11 @@ pub trait AsyncReadResponseExt<T: AsyncRead + Unpin> {
}

impl<T: AsyncRead + Unpin> AsyncReadResponseExt<T> for Response<T> {
fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a>
fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, T>
where
W: AsyncWrite + Unpin + 'a,
{
CopyFuture(Box::pin(async move {
futures_lite::io::copy(self.body_mut(), writer).await
}))
CopyFuture::new(async move { futures_lite::io::copy(self.body_mut(), writer).await })
}

#[cfg(feature = "text-decoding")]
Expand All @@ -275,19 +270,19 @@ impl<T: AsyncRead + Unpin> AsyncReadResponseExt<T> for Response<T> {
}
}

/// A future which copies all the response body bytes into a sink.
#[allow(missing_debug_implementations)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CopyFuture<'a>(Pin<Box<dyn Future<Output = io::Result<u64>> + 'a>>);

impl Future for CopyFuture<'_> {
type Output = io::Result<u64>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().poll(cx)
}
decl_future! {
/// A future which copies all the response body bytes into a sink.
pub type CopyFuture<T> = impl Future<Output = io::Result<u64>> + SendIf<T>;
}

pub(crate) struct LocalAddr(pub(crate) SocketAddr);

pub(crate) struct RemoteAddr(pub(crate) SocketAddr);

#[cfg(test)]
mod tests {
use super::*;

static_assertions::assert_impl_all!(CopyFuture<'static, Vec<u8>>: Send);
static_assertions::assert_not_impl_any!(CopyFuture<'static, *mut Vec<u8>>: Send);
}
34 changes: 4 additions & 30 deletions src/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use encoding_rs::{CoderResult, Encoding};
use futures_lite::io::{AsyncRead, AsyncReadExt};
use http::Response;
use std::{
future::Future,
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};

// This macro abstracts over async and sync decoding, since the implementation
Expand All @@ -38,30 +34,11 @@ macro_rules! decode_reader {
}};
}

/// A future returning a response body decoded as text.
#[allow(missing_debug_implementations)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TextFuture<'a, R> {
inner: Pin<Box<dyn Future<Output = io::Result<String>> + 'a>>,
_phantom: PhantomData<R>,
decl_future! {
/// A future returning a response body decoded as text.
pub type TextFuture<R> = impl Future<Output = io::Result<String>> + SendIf<R>;
}

impl<'a, R: Unpin> Future for TextFuture<'a, R> {
type Output = io::Result<String>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().inner.as_mut().poll(cx)
}
}

// Since we are boxing our future, we can't conditionally implement `Send` based
// on whether the original future is `Send`. However, we know after inspection
// that everything inside our implementation is `Send` except for the reader,
// which may or may not be. We then put the reader in our wrapper future type
// and conditionally implement `Send` if the reader is also `Send`.
#[allow(unsafe_code)]
unsafe impl<'r, R: Send> Send for TextFuture<'r, R> {}

/// A streaming text decoder that supports multiple encodings.
pub(crate) struct Decoder {
/// Inner decoder implementation.
Expand Down Expand Up @@ -110,10 +87,7 @@ impl Decoder {
where
R: AsyncRead + Unpin + 'r,
{
TextFuture {
inner: Box::pin(async move { decode_reader!(self, buf, reader.read(buf).await) }),
_phantom: PhantomData,
}
TextFuture::new(async move { decode_reader!(self, buf, reader.read(buf).await) })
}

/// Push additional bytes into the decoder, returning any trailing bytes
Expand Down

0 comments on commit 970d0af

Please sign in to comment.