Skip to content

Commit

Permalink
Trim some heavy dependencies (#254)
Browse files Browse the repository at this point in the history
Tis the season to be trimming the tree! Reducing dependencies where it makes sense is a regular task for Isahc to ensure shorter compile times, binary size, or package size.

This time around the goal was to eliminate `futures-util` from the essential dependencies, since it has long compile times. This included swapping out for `futures-lite` where needed, or just using our own code, as well as updating Sluice to remove `futures-util` from there as well.

Also a goal was to unify our preferred channel implementation, since we currently use _both_ `futures-channel` and `crossbeam-channel`, depending on the scenario. Here we can replace both with `flume` which has the nice property of supporting both sync and async operations on one channel.
  • Loading branch information
sagebind authored Nov 12, 2020
1 parent a9d71aa commit 6498f6a
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 131 deletions.
11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ unstable-interceptors = []

[dependencies]
bytes = "0.5"
crossbeam-channel = "0.5"
crossbeam-utils = "0.8"
curl = "0.4.34"
curl-sys = "0.4.37"
futures-channel = "0.3"
futures-io = "0.3"
futures-lite = "1.11"
http = "0.2.1"
log = "0.4"
once_cell = "1"
slab = "0.4"
sluice = "0.5"
waker-fn = "1"

[dependencies.chrono]
version = "0.4"
Expand All @@ -52,10 +51,10 @@ optional = true
version = "0.8"
optional = true

[dependencies.futures-util]
version = "0.3"
[dependencies.flume]
version = "0.9"
default-features = false
features = ["io"]
features = ["async"]

[dependencies.mime]
version = "0.3"
Expand Down
30 changes: 16 additions & 14 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
use crate::handler::RequestHandler;
use crate::task::{UdpWaker, WakerExt};
use crate::Error;
use crossbeam_channel::{Receiver, Sender};
use crossbeam_utils::sync::WaitGroup;
use curl::multi::WaitFd;
use flume::{Receiver, Sender};
use slab::Slab;
use std::net::UdpSocket;
use std::sync::{Arc, Mutex};
use std::task::Waker;
use std::thread;
use std::time::{Duration, Instant};
use std::{
net::UdpSocket,
sync::Mutex,
task::Waker,
thread,
time::{Duration, Instant},
};

const WAIT_TIMEOUT: Duration = Duration::from_millis(100);

Expand Down Expand Up @@ -72,10 +74,10 @@ impl AgentBuilder {
wake_socket.set_nonblocking(true)?;
let wake_addr = wake_socket.local_addr()?;
let port = wake_addr.port();
let waker = futures_util::task::waker(Arc::new(UdpWaker::connect(wake_addr)?));
let waker = Waker::from(UdpWaker::connect(wake_addr)?);
tracing::debug!("agent waker listening on {}", wake_addr);

let (message_tx, message_rx) = crossbeam_channel::unbounded();
let (message_tx, message_rx) = flume::unbounded();

let wait_group = WaitGroup::new();
let wait_group_thread = wait_group.clone();
Expand Down Expand Up @@ -114,7 +116,7 @@ impl AgentBuilder {

let agent = AgentContext {
multi,
multi_messages: crossbeam_channel::unbounded(),
multi_messages: flume::unbounded(),
message_tx,
message_rx,
wake_socket,
Expand Down Expand Up @@ -234,7 +236,7 @@ impl Handle {
self.waker.wake_by_ref();
Ok(())
}
Err(crossbeam_channel::SendError(_)) => match self.try_join() {
Err(flume::SendError(_)) => match self.try_join() {
JoinResult::Err(e) => panic!("agent thread terminated with error: {}", e),
JoinResult::Panic => panic!("agent thread panicked"),
_ => panic!("agent thread terminated prematurely"),
Expand Down Expand Up @@ -375,8 +377,8 @@ impl AgentContext {
} else {
match self.message_rx.try_recv() {
Ok(message) => self.handle_message(message)?,
Err(crossbeam_channel::TryRecvError::Empty) => break,
Err(crossbeam_channel::TryRecvError::Disconnected) => {
Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => {
tracing::warn!("agent handle disconnected without close message");
self.close_requested = true;
break;
Expand Down Expand Up @@ -456,8 +458,8 @@ impl AgentContext {
match self.multi_messages.1.try_recv() {
// A request completed.
Ok((token, result)) => self.complete_request(token, result)?,
Err(crossbeam_channel::TryRecvError::Empty) => break,
Err(crossbeam_channel::TryRecvError::Disconnected) => unreachable!(),
Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => unreachable!(),
}
}

Expand Down
24 changes: 14 additions & 10 deletions src/body.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Provides types for working with request and response bodies.
use crate::task::Join;
use bytes::Bytes;
use futures_io::AsyncRead;
use futures_util::io::{AsyncReadExt, Cursor};
use std::fmt;
use std::io::{self, Read};
use std::pin::Pin;
use std::str;
use std::task::{Context, Poll};
use futures_lite::{future::block_on, io::{AsyncRead, AsyncReadExt}};
use std::{
fmt,
io::{self, Cursor, Read},
pin::Pin,
str,
task::{Context, Poll},
};

macro_rules! match_type {
{
Expand Down Expand Up @@ -159,7 +159,11 @@ impl Body {

impl Read for Body {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
AsyncReadExt::read(self, buf).join()
match &mut self.0 {
Inner::Empty => Ok(0),
Inner::Bytes(cursor) => cursor.read(buf),
Inner::AsyncRead(reader, _) => block_on(reader.read(buf)),
}
}
}

Expand All @@ -171,7 +175,7 @@ impl AsyncRead for Body {
) -> Poll<io::Result<usize>> {
match &mut self.0 {
Inner::Empty => Poll::Ready(Ok(0)),
Inner::Bytes(cursor) => AsyncRead::poll_read(Pin::new(cursor), cx, buf),
Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
Inner::AsyncRead(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf),
}
}
Expand Down
23 changes: 10 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ use crate::{
handler::{RequestHandler, ResponseBodyReader},
headers,
interceptor::{self, Interceptor, InterceptorObj},
task::Join,
Body, Error,
};
use futures_io::AsyncRead;
use futures_util::{future::BoxFuture, pin_mut};
use futures_lite::{future::block_on, io::AsyncRead, pin};
use http::{
header::{HeaderMap, HeaderName, HeaderValue},
Request, Response,
Expand Down Expand Up @@ -650,7 +648,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.get_async(uri).join()
block_on(self.get_async(uri))
}

/// Send a GET request to the given URI asynchronously.
Expand Down Expand Up @@ -685,7 +683,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.head_async(uri).join()
block_on(self.head_async(uri))
}

/// Send a HEAD request to the given URI asynchronously.
Expand Down Expand Up @@ -723,7 +721,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.post_async(uri, body).join()
block_on(self.post_async(uri, body))
}

/// Send a POST request to the given URI asynchronously with a given request
Expand Down Expand Up @@ -763,7 +761,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.put_async(uri, body).join()
block_on(self.put_async(uri, body))
}

/// Send a PUT request to the given URI asynchronously with a given request
Expand All @@ -789,7 +787,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.delete_async(uri).join()
block_on(self.delete_async(uri))
}

/// Send a DELETE request to the given URI asynchronously.
Expand Down Expand Up @@ -855,7 +853,7 @@ impl HttpClient {
#[inline]
#[tracing::instrument(level = "debug", skip(self, request), err)]
pub fn send<B: Into<Body>>(&self, request: Request<B>) -> Result<Response<Body>, Error> {
self.send_async(request).join()
block_on(self.send_async(request))
}

/// Send an HTTP request and return the HTTP response asynchronously.
Expand Down Expand Up @@ -1122,7 +1120,7 @@ impl fmt::Debug for HttpClient {
}

/// A future for a request being executed.
pub struct ResponseFuture<'c>(BoxFuture<'c, Result<Response<Body>, Error>>);
pub struct ResponseFuture<'c>(Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + 'c + Send>>);

impl<'c> ResponseFuture<'c> {
fn new(future: impl Future<Output = Result<Response<Body>, Error>> + Send + 'c) -> Self {
Expand All @@ -1134,8 +1132,7 @@ impl Future for ResponseFuture<'_> {
type Output = Result<Response<Body>, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_util::future::FutureExt;
self.0.poll_unpin(cx)
self.0.as_mut().poll(cx)
}
}

Expand All @@ -1159,7 +1156,7 @@ impl AsyncRead for ResponseBody {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = &mut self.inner;
pin_mut!(inner);
pin!(inner);
inner.poll_read(cx, buf)
}
}
Expand Down
26 changes: 8 additions & 18 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use crate::{
use crossbeam_utils::atomic::AtomicCell;
use curl::easy::{InfoType, ReadError, SeekResult, WriteError};
use curl_sys::CURL;
use futures_channel::oneshot::Sender;
use futures_io::{AsyncRead, AsyncWrite};
use futures_util::{pin_mut, task::AtomicWaker};
use flume::Sender;
use futures_lite::{io::{AsyncRead, AsyncWrite}, pin};
use http::{Response, Uri};
use once_cell::sync::OnceCell;
use sluice::pipe;
Expand Down Expand Up @@ -104,9 +103,6 @@ unsafe impl Send for RequestHandler {}
/// This is also used to keep track of the lifetime of the request.
#[derive(Debug)]
struct Shared {
/// A waker used by the handler to wake up the associated future.
waker: AtomicWaker,

/// Set to the final result of the transfer received from curl. This is used
/// to communicate an error while reading the response body if the handler
/// suddenly aborts.
Expand All @@ -127,9 +123,8 @@ impl RequestHandler {
Self,
impl Future<Output = Result<Response<ResponseBodyReader>, Error>>,
) {
let (sender, receiver) = futures_channel::oneshot::channel();
let (sender, receiver) = flume::bounded(1);
let shared = Arc::new(Shared {
waker: AtomicWaker::default(),
result: OnceCell::new(),
response_body_dropped: AtomicCell::new(false),
});
Expand All @@ -153,7 +148,7 @@ impl RequestHandler {
// Create a future that resolves when the handler receives the response
// headers.
let future = async move {
let builder = receiver.await.map_err(|_| Error::Aborted)??;
let builder = receiver.recv_async().await.map_err(|_| Error::Aborted)??;

let reader = ResponseBodyReader {
inner: response_body_reader,
Expand Down Expand Up @@ -187,7 +182,7 @@ impl RequestHandler {
fn is_future_canceled(&self) -> bool {
self.sender
.as_ref()
.map(Sender::is_canceled)
.map(Sender::is_disconnected)
.unwrap_or(false)
}

Expand Down Expand Up @@ -289,13 +284,8 @@ impl RequestHandler {
tracing::warn!("request completed with error: {}", e);
}

match sender.send(result) {
Ok(()) => {
self.shared.waker.wake();
}
Err(_) => {
tracing::debug!("request canceled by user");
}
if sender.send(result).is_err() {
tracing::debug!("request canceled by user");
}
}
}
Expand Down Expand Up @@ -692,7 +682,7 @@ impl AsyncRead for ResponseBodyReader {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = &mut self.inner;
pin_mut!(inner);
pin!(inner);

match inner.poll_read(cx, buf) {
// On EOF, check to see if the transfer was cancelled, and if so,
Expand Down
5 changes: 3 additions & 2 deletions src/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
/// enabled.
use crate::Body;
use futures_util::future::BoxFuture;
use http::{Request, Response};
use std::{
error::Error,
fmt,
future::Future,
pin::Pin,
};

mod context;
Expand Down Expand Up @@ -81,7 +82,7 @@ pub trait Interceptor: Send + Sync {
}

/// The type of future returned by an interceptor.
pub type InterceptorFuture<'a, E> = BoxFuture<'a, Result<Response<Body>, E>>;
pub type InterceptorFuture<'a, E> = Pin<Box<dyn Future<Output = Result<Response<Body>, E>> + Send + 'a>>;

/// Creates an interceptor from an arbitrary closure or function.
pub fn from_fn<F, E>(f: F) -> InterceptorFn<F>
Expand Down
5 changes: 3 additions & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::Metrics;
use futures_lite::io::AsyncRead;
use http::{Response, Uri};
use std::{
fs::File,
Expand Down Expand Up @@ -144,7 +145,7 @@ pub trait ResponseExt<T> {
#[cfg(feature = "text-decoding")]
fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T>
where
T: futures_io::AsyncRead + Unpin;
T: AsyncRead + Unpin;

/// Deserialize the response body as JSON into a given type.
///
Expand Down Expand Up @@ -210,7 +211,7 @@ impl<T> ResponseExt<T> for Response<T> {
#[cfg(feature = "text-decoding")]
fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T>
where
T: futures_io::AsyncRead + Unpin,
T: AsyncRead + Unpin,
{
crate::text::Decoder::for_response(&self).decode_reader_async(self.body_mut())
}
Expand Down
Loading

0 comments on commit 6498f6a

Please sign in to comment.