Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consume API #284

Merged
merged 5 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,54 @@ impl<T> ResponseExt<T> for Response<T> {

/// Provides extension methods for consuming HTTP response streams.
pub trait ReadResponseExt<R: Read> {
/// Read any remaining bytes from the response body stream and discard them
/// until the end of the stream is reached. It is usually a good idea to
/// call this method before dropping a response if you know you haven't read
/// the entire response body.
///
/// # Background
///
/// By default, if a response stream is dropped before it has been
/// completely read from, then that HTTP connection will be terminated.
/// Depending on which version of HTTP is being used, this may require
/// closing the network connection to the server entirely. This can result
/// in sub-optimal performance for making multiple requests, as it prevents
/// Isahc from keeping the connection alive to be reused for subsequent
/// requests.
///
/// If you are downloading a file on behalf of a user and have been
/// requested to cancel the operation, then this is probably what you want.
/// But if you are making many small API calls to a known server, then you
/// may want to call `consume()` before dropping the response, as reading a
/// few megabytes off a socket is usually more efficient in the long run
/// than taking a hit on connection reuse, and opening new connections can
/// be expensive.
///
/// Note that in HTTP/2 and newer, it is not necessary to close the network
/// connection in order to interrupt the transfer of a particular response.
/// If you know that you will be using only HTTP/2 or newer, then calling
/// this method is probably unnecessary.
///
/// # Examples
///
/// ```no_run
/// use isahc::prelude::*;
///
/// let mut response = isahc::get("https://example.org")?;
///
/// println!("Status: {}", response.status());
/// println!("Headers: {:#?}", response.headers());
///
/// // Read and discard the response body until the end.
/// response.consume()?;
/// # Ok::<(), isahc::Error>(())
/// ```
fn consume(&mut self) -> io::Result<()> {
self.copy_to(io::sink())?;

Ok(())
}

/// Copy the response body into a writer.
///
/// Returns the number of bytes that were written.
Expand Down Expand Up @@ -208,6 +256,51 @@ impl<R: Read> ReadResponseExt<R> for Response<R> {

/// Provides extension methods for consuming asynchronous HTTP response streams.
pub trait AsyncReadResponseExt<R: AsyncRead + Unpin> {
/// Read any remaining bytes from the response body stream and discard them
/// until the end of the stream is reached. It is usually a good idea to
/// call this method before dropping a response if you know you haven't read
/// the entire response body.
///
/// # Background
///
/// By default, if a response stream is dropped before it has been
/// completely read from, then that HTTP connection will be terminated.
/// Depending on which version of HTTP is being used, this may require
/// closing the network connection to the server entirely. This can result
/// in sub-optimal performance for making multiple requests, as it prevents
/// Isahc from keeping the connection alive to be reused for subsequent
/// requests.
///
/// If you are downloading a file on behalf of a user and have been
/// requested to cancel the operation, then this is probably what you want.
/// But if you are making many small API calls to a known server, then you
/// may want to call `consume()` before dropping the response, as reading a
/// few megabytes off a socket is usually more efficient in the long run
/// than taking a hit on connection reuse, and opening new connections can
/// be expensive.
///
/// Note that in HTTP/2 and newer, it is not necessary to close the network
/// connection in order to interrupt the transfer of a particular response.
/// If you know that you will be using only HTTP/2 or newer, then calling
/// this method is probably unnecessary.
///
/// # Examples
///
/// ```no_run
/// use isahc::prelude::*;
///
/// # async fn run() -> Result<(), isahc::Error> {
/// let mut response = isahc::get_async("https://example.org").await?;
///
/// println!("Status: {}", response.status());
/// println!("Headers: {:#?}", response.headers());
///
/// // Read and discard the response body until the end.
/// response.consume().await?;
/// # Ok(()) }
/// ```
fn consume(&mut self) -> ConsumeFuture<'_, R>;

/// Copy the response body into a writer asynchronously.
///
/// Returns the number of bytes that were written.
Expand Down Expand Up @@ -257,6 +350,14 @@ pub trait AsyncReadResponseExt<R: AsyncRead + Unpin> {
}

impl<R: AsyncRead + Unpin> AsyncReadResponseExt<R> for Response<R> {
fn consume(&mut self) -> ConsumeFuture<'_, R> {
ConsumeFuture::new(async move {
futures_lite::io::copy(self.body_mut(), futures_lite::io::sink()).await?;

Ok(())
})
}

fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, R, W>
where
W: AsyncWrite + Unpin + 'a,
Expand All @@ -271,6 +372,10 @@ impl<R: AsyncRead + Unpin> AsyncReadResponseExt<R> for Response<R> {
}

decl_future! {
/// A future which reads any remaining bytes from the response body stream
/// and discard them.
pub type ConsumeFuture<R> = impl Future<Output = io::Result<()>> + SendIf<R>;

/// A future which copies all the response body bytes into a sink.
pub type CopyFuture<R, W> = impl Future<Output = io::Result<u64>> + SendIf<R, W>;
}
Expand Down
45 changes: 43 additions & 2 deletions tests/response_body.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use futures_lite::{future::block_on, io::AsyncReadExt};
use isahc::prelude::*;
use std::{io, io::Read};
use testserver::mock;

#[macro_use]
mod utils;

#[test]
fn simple_response_body() {
let m = mock! {
Expand Down Expand Up @@ -74,8 +79,6 @@ fn dropping_client_does_not_abort_response_transfer() {
// See issue #72.
#[test]
fn reading_from_response_body_after_eof_continues_to_return_eof() {
use std::{io, io::Read};

let m = mock! {
body: "hello world",
};
Expand All @@ -92,3 +95,41 @@ fn reading_from_response_body_after_eof_continues_to_return_eof() {
assert_eq!(body.read(&mut buf).unwrap(), 0);
}
}

#[test]
fn consume_unread_response_body() {
let body = "wow so large ".repeat(1000);

let m = {
let body = body.clone();
mock! {
body: body.clone(),
}
};

let mut response = isahc::get(m.url()).unwrap();
response.consume().unwrap();

let mut buf = [0; 8192];
assert_matches!(response.body_mut().read(&mut buf), Ok(0));
}

#[test]
fn consume_unread_response_body_async() {
let body = "wow so large ".repeat(1000);

let m = {
let body = body.clone();
mock! {
body: body.clone(),
}
};

block_on(async move {
let mut response = isahc::get_async(m.url()).await.unwrap();
response.consume().await.unwrap();

let mut buf = [0; 8192];
assert_matches!(response.body_mut().read(&mut buf).await, Ok(0));
});
}