From 970d0afbe5704de0a3e696713e96b09e3a89d8b3 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Wed, 30 Dec 2020 22:50:30 -0600 Subject: [PATCH] Update future types to be Send Update future types used by `AsyncReadResponseExt` to be `Send` whenever `T` is `Send` as appropriate. Consolidate the newtype futures with a macro that handles conditional Send properly to reduce boilerplate and chance of mistakes. Fixes #283. --- src/macros.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ src/response.rs | 33 ++++++++++++++------------------- src/text.rs | 34 ++++------------------------------ 3 files changed, 62 insertions(+), 49 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index 1ecb2159..0cdfe727 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -21,3 +21,47 @@ macro_rules! match_type { } }}; } + +macro_rules! decl_future { + ( + $( + $(#[$meta:meta])* + $vis:vis type $ident:ident$(<$($T:ident),*>)? = impl Future $(+ SendIf<$($S:ident),+>)?; + )* + ) => { + $( + $(#[$meta])* + #[allow(missing_debug_implementations, non_snake_case)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + $vis struct $ident<'a $($(, $T)*)*> { + inner: ::std::pin::Pin + 'a>>, + $($($T: ::std::marker::PhantomData<$T>,)*)* + } + + impl<'a, $($($T)*)*> $ident<'a, $($($T)*)*> { + pub(crate) fn new(future: F) -> Self + where + F: ::std::future::Future + 'a, + { + Self { + inner: Box::pin(future), + $($($T: ::std::marker::PhantomData,)*)* + } + } + } + + impl<$($($T: Unpin)*)*> ::std::future::Future for $ident<'_, $($($T)*)*> { + type Output = $output; + + fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll { + self.as_mut().inner.as_mut().poll(cx) + } + } + + $( + #[allow(unsafe_code)] + unsafe impl<$($S: Send),*> Send for $ident<'_, $($S)*> {} + )* + )* + }; +} diff --git a/src/response.rs b/src/response.rs index 53aba9b7..4e217ffd 100644 --- a/src/response.rs +++ b/src/response.rs @@ -3,12 +3,9 @@ use futures_lite::io::{AsyncRead, AsyncWrite}; use http::{Response, Uri}; use std::{ fs::File, - future::Future, io::{self, Read, Write}, net::SocketAddr, path::Path, - pin::Pin, - task::{Context, Poll}, }; /// Provides extension methods for working with HTTP responses. @@ -229,7 +226,7 @@ pub trait AsyncReadResponseExt { /// println!("Read {} bytes", buf.len()); /// # Ok(()) } /// ``` - fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a> + fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, T> where W: AsyncWrite + Unpin + 'a; @@ -260,13 +257,11 @@ pub trait AsyncReadResponseExt { } impl AsyncReadResponseExt for Response { - fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a> + fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, T> where W: AsyncWrite + Unpin + 'a, { - CopyFuture(Box::pin(async move { - futures_lite::io::copy(self.body_mut(), writer).await - })) + CopyFuture::new(async move { futures_lite::io::copy(self.body_mut(), writer).await }) } #[cfg(feature = "text-decoding")] @@ -275,19 +270,19 @@ impl AsyncReadResponseExt for Response { } } -/// A future which copies all the response body bytes into a sink. -#[allow(missing_debug_implementations)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyFuture<'a>(Pin> + 'a>>); - -impl Future for CopyFuture<'_> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().poll(cx) - } +decl_future! { + /// A future which copies all the response body bytes into a sink. + pub type CopyFuture = impl Future> + SendIf; } pub(crate) struct LocalAddr(pub(crate) SocketAddr); pub(crate) struct RemoteAddr(pub(crate) SocketAddr); + +#[cfg(test)] +mod tests { + use super::*; + + static_assertions::assert_impl_all!(CopyFuture<'static, Vec>: Send); + static_assertions::assert_not_impl_any!(CopyFuture<'static, *mut Vec>: Send); +} diff --git a/src/text.rs b/src/text.rs index 39f67540..8e573f71 100644 --- a/src/text.rs +++ b/src/text.rs @@ -7,11 +7,7 @@ use encoding_rs::{CoderResult, Encoding}; use futures_lite::io::{AsyncRead, AsyncReadExt}; use http::Response; use std::{ - future::Future, io, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, }; // This macro abstracts over async and sync decoding, since the implementation @@ -38,30 +34,11 @@ macro_rules! decode_reader { }}; } -/// A future returning a response body decoded as text. -#[allow(missing_debug_implementations)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TextFuture<'a, R> { - inner: Pin> + 'a>>, - _phantom: PhantomData, +decl_future! { + /// A future returning a response body decoded as text. + pub type TextFuture = impl Future> + SendIf; } -impl<'a, R: Unpin> Future for TextFuture<'a, R> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().inner.as_mut().poll(cx) - } -} - -// Since we are boxing our future, we can't conditionally implement `Send` based -// on whether the original future is `Send`. However, we know after inspection -// that everything inside our implementation is `Send` except for the reader, -// which may or may not be. We then put the reader in our wrapper future type -// and conditionally implement `Send` if the reader is also `Send`. -#[allow(unsafe_code)] -unsafe impl<'r, R: Send> Send for TextFuture<'r, R> {} - /// A streaming text decoder that supports multiple encodings. pub(crate) struct Decoder { /// Inner decoder implementation. @@ -110,10 +87,7 @@ impl Decoder { where R: AsyncRead + Unpin + 'r, { - TextFuture { - inner: Box::pin(async move { decode_reader!(self, buf, reader.read(buf).await) }), - _phantom: PhantomData, - } + TextFuture::new(async move { decode_reader!(self, buf, reader.read(buf).await) }) } /// Push additional bytes into the decoder, returning any trailing bytes