From 8f066fec17c67b1ee537ee8158f0b08da8ff5b3f Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Wed, 30 Dec 2020 22:02:18 -0600 Subject: [PATCH 1/4] Add consume API Add `ReadResponseExt::consume` and `AsyncReadResponseExt::consume` methods for reading and discarding a response bod as described in #257. Also update future types used by `AsyncReadResponseExt` to be `Send` whenever `T` is `Send` as appropriate. Fixes #257 and #283. --- src/macros.rs | 44 +++++++++++++++ src/response.rs | 139 +++++++++++++++++++++++++++++++++++++++++------- src/text.rs | 38 ++----------- 3 files changed, 170 insertions(+), 51 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index 1ecb2159..77b59262 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -21,3 +21,47 @@ macro_rules! match_type { } }}; } + +macro_rules! decl_future { + ( + $( + $(#[$meta:meta])* + $vis:vis type $ident:ident$(<$($T:ident),*>)? = impl Future $(+ SendIf<$($S:ident),+>)?; + )* + ) => { + $( + $(#[$meta])* + #[allow(missing_debug_implementations, non_snake_case)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct $ident<'a $($(, $T)*)*> { + inner: ::std::pin::Pin + 'a>>, + $($($T: ::std::marker::PhantomData<$T>,)*)* + } + + impl<'a, $($($T)*)*> $ident<'a, $($($T)*)*> { + pub(crate) fn new(future: F) -> Self + where + F: ::std::future::Future + 'a, + { + Self { + inner: Box::pin(future), + $($($T: ::std::marker::PhantomData,)*)* + } + } + } + + impl<$($($T: Unpin)*)*> ::std::future::Future for $ident<'_, $($($T)*)*> { + type Output = $output; + + fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll { + self.as_mut().inner.as_mut().poll(cx) + } + } + + $( + #[allow(unsafe_code)] + unsafe impl<$($S: Send),*> Send for $ident<'_, $($S)*> {} + )* + )* + }; +} diff --git a/src/response.rs b/src/response.rs index 53aba9b7..b66a5344 100644 --- a/src/response.rs +++ b/src/response.rs @@ -3,12 +3,9 @@ use futures_lite::io::{AsyncRead, AsyncWrite}; use http::{Response, Uri}; use std::{ fs::File, - future::Future, io::{self, Read, Write}, net::SocketAddr, path::Path, - pin::Pin, - task::{Context, Poll}, }; /// Provides extension methods for working with HTTP responses. @@ -95,6 +92,54 @@ impl ResponseExt for Response { /// Provides extension methods for consuming HTTP response streams. pub trait ReadResponseExt { + /// Read any remaining bytes from the response body stream and discard them + /// until the end of the stream is reached. It is usually a good idea to + /// call this method before dropping a response if you know you haven't read + /// the entire response body. + /// + /// # Background + /// + /// By default, if a response stream is dropped before it has been + /// completely read from, then that HTTP connection will be terminated. + /// Depending on which version of HTTP is being used, this may require + /// closing the network connection to the server entirely. This can result + /// in sub-optimal performance for making multiple requests, as it prevents + /// Isahc from keeping the connection alive to be reused for subsequent + /// requests. + /// + /// If you are downloading a file on behalf of a user and have been + /// requested to cancel the operation, then this is probably what you want. + /// But if you are making many small API calls to a known server, then you + /// may want to call `consume()` before dropping the response, as reading a + /// few megabytes off a socket is usually more efficient in the long run + /// than taking a hit on connection reuse, and opening new connections can + /// be expensive. + /// + /// Note that in HTTP/2 and newer, it is not necessary to close the network + /// connection in order to interrupt the transfer of a particular response. + /// If you know that you will be using only HTTP/2 or newer, then calling + /// this method is probably unnecessary. + /// + /// # Examples + /// + /// ```no_run + /// use isahc::prelude::*; + /// + /// let mut response = isahc::get("https://example.org")?; + /// + /// println!("Status: {}", response.status()); + /// println!("Headers: {:#?}", response.headers()); + /// + /// // Read and discard the response body until the end. + /// response.consume()?; + /// # Ok::<(), isahc::Error>(()) + /// ``` + fn consume(&mut self) -> io::Result<()> { + self.copy_to(io::sink())?; + + Ok(()) + } + /// Copy the response body into a writer. /// /// Returns the number of bytes that were written. @@ -211,6 +256,51 @@ impl ReadResponseExt for Response { /// Provides extension methods for consuming asynchronous HTTP response streams. pub trait AsyncReadResponseExt { + /// Read any remaining bytes from the response body stream and discard them + /// until the end of the stream is reached. It is usually a good idea to + /// call this method before dropping a response if you know you haven't read + /// the entire response body. + /// + /// # Background + /// + /// By default, if a response stream is dropped before it has been + /// completely read from, then that HTTP connection will be terminated. + /// Depending on which version of HTTP is being used, this may require + /// closing the network connection to the server entirely. This can result + /// in sub-optimal performance for making multiple requests, as it prevents + /// Isahc from keeping the connection alive to be reused for subsequent + /// requests. + /// + /// If you are downloading a file on behalf of a user and have been + /// requested to cancel the operation, then this is probably what you want. + /// But if you are making many small API calls to a known server, then you + /// may want to call `consume()` before dropping the response, as reading a + /// few megabytes off a socket is usually more efficient in the long run + /// than taking a hit on connection reuse, and opening new connections can + /// be expensive. + /// + /// Note that in HTTP/2 and newer, it is not necessary to close the network + /// connection in order to interrupt the transfer of a particular response. + /// If you know that you will be using only HTTP/2 or newer, then calling + /// this method is probably unnecessary. + /// + /// # Examples + /// + /// ```no_run + /// use isahc::prelude::*; + /// + /// # async fn run() -> Result<(), isahc::Error> { + /// let mut response = isahc::get_async("https://example.org").await?; + /// + /// println!("Status: {}", response.status()); + /// println!("Headers: {:#?}", response.headers()); + /// + /// // Read and discard the response body until the end. + /// response.consume().await?; + /// # Ok(()) } + /// ``` + fn consume(&mut self) -> ConsumeFuture<'_, T>; + /// Copy the response body into a writer asynchronously. /// /// Returns the number of bytes that were written. @@ -229,7 +319,7 @@ pub trait AsyncReadResponseExt { /// println!("Read {} bytes", buf.len()); /// # Ok(()) } /// ``` - fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a> + fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, T> where W: AsyncWrite + Unpin + 'a; @@ -260,13 +350,19 @@ pub trait AsyncReadResponseExt { } impl AsyncReadResponseExt for Response { - fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a> + fn consume(&mut self) -> ConsumeFuture<'_, T> { + ConsumeFuture::new(async move { + futures_lite::io::copy(self.body_mut(), futures_lite::io::sink()).await?; + + Ok(()) + }) + } + + fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, T> where W: AsyncWrite + Unpin + 'a, { - CopyFuture(Box::pin(async move { - futures_lite::io::copy(self.body_mut(), writer).await - })) + CopyFuture::new(async move { futures_lite::io::copy(self.body_mut(), writer).await }) } #[cfg(feature = "text-decoding")] @@ -275,19 +371,26 @@ impl AsyncReadResponseExt for Response { } } -/// A future which copies all the response body bytes into a sink. -#[allow(missing_debug_implementations)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyFuture<'a>(Pin> + 'a>>); +decl_future! { + /// A future which reads any remaining bytes from the response body stream + /// and discard them. + pub type ConsumeFuture = impl Future> + SendIf; -impl Future for CopyFuture<'_> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().poll(cx) - } + /// A future which copies all the response body bytes into a sink. + pub type CopyFuture = impl Future> + SendIf; } pub(crate) struct LocalAddr(pub(crate) SocketAddr); pub(crate) struct RemoteAddr(pub(crate) SocketAddr); + +#[cfg(test)] +mod tests { + use super::*; + + static_assertions::assert_impl_all!(ConsumeFuture<'static, Vec>: Send); + static_assertions::assert_not_impl_any!(ConsumeFuture<'static, *mut Vec>: Send); + + static_assertions::assert_impl_all!(CopyFuture<'static, Vec>: Send); + static_assertions::assert_not_impl_any!(CopyFuture<'static, *mut Vec>: Send); +} diff --git a/src/text.rs b/src/text.rs index 39f67540..c420163a 100644 --- a/src/text.rs +++ b/src/text.rs @@ -6,13 +6,7 @@ use crate::headers::HasHeaders; use encoding_rs::{CoderResult, Encoding}; use futures_lite::io::{AsyncRead, AsyncReadExt}; use http::Response; -use std::{ - future::Future, - io, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, -}; +use std::io; // This macro abstracts over async and sync decoding, since the implementation // of decoding a stream into text is the same. @@ -38,30 +32,11 @@ macro_rules! decode_reader { }}; } -/// A future returning a response body decoded as text. -#[allow(missing_debug_implementations)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TextFuture<'a, R> { - inner: Pin> + 'a>>, - _phantom: PhantomData, +decl_future! { + /// A future returning a response body decoded as text. + pub type TextFuture = impl Future> + SendIf; } -impl<'a, R: Unpin> Future for TextFuture<'a, R> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().inner.as_mut().poll(cx) - } -} - -// Since we are boxing our future, we can't conditionally implement `Send` based -// on whether the original future is `Send`. However, we know after inspection -// that everything inside our implementation is `Send` except for the reader, -// which may or may not be. We then put the reader in our wrapper future type -// and conditionally implement `Send` if the reader is also `Send`. -#[allow(unsafe_code)] -unsafe impl<'r, R: Send> Send for TextFuture<'r, R> {} - /// A streaming text decoder that supports multiple encodings. pub(crate) struct Decoder { /// Inner decoder implementation. @@ -110,10 +85,7 @@ impl Decoder { where R: AsyncRead + Unpin + 'r, { - TextFuture { - inner: Box::pin(async move { decode_reader!(self, buf, reader.read(buf).await) }), - _phantom: PhantomData, - } + TextFuture::new(async move { decode_reader!(self, buf, reader.read(buf).await) }) } /// Push additional bytes into the decoder, returning any trailing bytes From b1321f717a55856a0f7a223924d60024a331ec8b Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Wed, 30 Dec 2020 22:11:52 -0600 Subject: [PATCH 2/4] Use specified visibility in macro --- src/macros.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/macros.rs b/src/macros.rs index 77b59262..0cdfe727 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -33,7 +33,7 @@ macro_rules! decl_future { $(#[$meta])* #[allow(missing_debug_implementations, non_snake_case)] #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct $ident<'a $($(, $T)*)*> { + $vis struct $ident<'a $($(, $T)*)*> { inner: ::std::pin::Pin + 'a>>, $($($T: ::std::marker::PhantomData<$T>,)*)* } From df207fdea6293112223f285c39eed48b0b8d5d24 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Wed, 30 Dec 2020 22:18:38 -0600 Subject: [PATCH 3/4] Add test for consume --- tests/response_body.rs | 45 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/tests/response_body.rs b/tests/response_body.rs index 02c05216..0655d67a 100644 --- a/tests/response_body.rs +++ b/tests/response_body.rs @@ -1,6 +1,11 @@ +use futures_lite::{future::block_on, io::AsyncReadExt}; use isahc::prelude::*; +use std::{io, io::Read}; use testserver::mock; +#[macro_use] +mod utils; + #[test] fn simple_response_body() { let m = mock! { @@ -74,8 +79,6 @@ fn dropping_client_does_not_abort_response_transfer() { // See issue #72. #[test] fn reading_from_response_body_after_eof_continues_to_return_eof() { - use std::{io, io::Read}; - let m = mock! { body: "hello world", }; @@ -92,3 +95,41 @@ fn reading_from_response_body_after_eof_continues_to_return_eof() { assert_eq!(body.read(&mut buf).unwrap(), 0); } } + +#[test] +fn consume_unread_response_body() { + let body = "wow so large ".repeat(1000); + + let m = { + let body = body.clone(); + mock! { + body: body.clone(), + } + }; + + let mut response = isahc::get(m.url()).unwrap(); + response.consume().unwrap(); + + let mut buf = [0; 8192]; + assert_matches!(response.body_mut().read(&mut buf), Ok(0)); +} + +#[test] +fn consume_unread_response_body_async() { + let body = "wow so large ".repeat(1000); + + let m = { + let body = body.clone(); + mock! { + body: body.clone(), + } + }; + + block_on(async move { + let mut response = isahc::get_async(m.url()).await.unwrap(); + response.consume().await.unwrap(); + + let mut buf = [0; 8192]; + assert_matches!(response.body_mut().read(&mut buf).await, Ok(0)); + }); +} From 1efdc56b7f053e18068e4961bb6a0bb4ab13305d Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Mon, 11 Jan 2021 22:12:47 -0600 Subject: [PATCH 4/4] Fix merge goof --- src/cookies/psl/list | 2 +- src/response.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cookies/psl/list b/src/cookies/psl/list index f9f612a3..6b67c6f2 160000 --- a/src/cookies/psl/list +++ b/src/cookies/psl/list @@ -1 +1 @@ -Subproject commit f9f612a3386dd9a1e4a1892722e3418549520b49 +Subproject commit 6b67c6f2f7691ff91a9dd6758cd3fef6aef8727d diff --git a/src/response.rs b/src/response.rs index 9804b02d..baef77b8 100644 --- a/src/response.rs +++ b/src/response.rs @@ -349,7 +349,6 @@ pub trait AsyncReadResponseExt { fn text(&mut self) -> crate::text::TextFuture<'_, &mut R>; } - impl AsyncReadResponseExt for Response { fn consume(&mut self) -> ConsumeFuture<'_, R> { ConsumeFuture::new(async move { @@ -375,7 +374,7 @@ impl AsyncReadResponseExt for Response { decl_future! { /// A future which reads any remaining bytes from the response body stream /// and discard them. - pub type ConsumeFuture = impl Future> + SendIf; + pub type ConsumeFuture = impl Future> + SendIf; /// A future which copies all the response body bytes into a sink. pub type CopyFuture = impl Future> + SendIf;