From 9808f32f73c72a3072d38d8b495402eca3990c81 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Wed, 11 Nov 2020 23:52:23 -0600 Subject: [PATCH 1/2] Trim some heavy dependencies Tis the season to be trimming the tree! Reducing dependencies where it makes sense is a regular task for Isahc to ensure shorter compile times, binary size, or package size. This time around the goal was to eliminate `futures-util` from the essential dependencies, since it has long compile times. This included swapping out for `futures-lite` where needed, or just using our own code, as well as updating Sluice to remove `futures-util` from there as well. Also a goal was to unify our preferred channel implementation, since we currently use _both_ `futures-channel` and `crossbeam-channel`, depending on the scenario. Here we can replace both with `flume` which has the nice property of supporting both sync and async operations on one channel. --- Cargo.toml | 11 +++---- src/agent.rs | 30 +++++++++--------- src/body.rs | 24 +++++++++------ src/client.rs | 24 +++++++-------- src/handler.rs | 26 +++++----------- src/interceptor/mod.rs | 5 +-- src/response.rs | 4 +-- src/task.rs | 70 +++++++----------------------------------- src/text.rs | 10 ++---- 9 files changed, 73 insertions(+), 131 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 341d74f7..cc1ff435 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,17 +32,16 @@ unstable-interceptors = [] [dependencies] bytes = "0.5" -crossbeam-channel = "0.5" crossbeam-utils = "0.8" curl = "0.4.34" curl-sys = "0.4.37" -futures-channel = "0.3" -futures-io = "0.3" +futures-lite = "1.11" http = "0.2.1" log = "0.4" once_cell = "1" slab = "0.4" sluice = "0.5" +waker-fn = "1" [dependencies.chrono] version = "0.4" @@ -52,10 +51,10 @@ optional = true version = "0.8" optional = true -[dependencies.futures-util] -version = "0.3" +[dependencies.flume] +version = "0.9" default-features = false -features = ["io"] +features = ["async"] [dependencies.mime] version = "0.3" diff --git a/src/agent.rs b/src/agent.rs index e40ca710..c39e0e27 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -11,15 +11,17 @@ use crate::handler::RequestHandler; use crate::task::{UdpWaker, WakerExt}; use crate::Error; -use crossbeam_channel::{Receiver, Sender}; use crossbeam_utils::sync::WaitGroup; use curl::multi::WaitFd; +use flume::{Receiver, Sender}; use slab::Slab; -use std::net::UdpSocket; -use std::sync::{Arc, Mutex}; -use std::task::Waker; -use std::thread; -use std::time::{Duration, Instant}; +use std::{ + net::UdpSocket, + sync::Mutex, + task::Waker, + thread, + time::{Duration, Instant}, +}; const WAIT_TIMEOUT: Duration = Duration::from_millis(100); @@ -72,10 +74,10 @@ impl AgentBuilder { wake_socket.set_nonblocking(true)?; let wake_addr = wake_socket.local_addr()?; let port = wake_addr.port(); - let waker = futures_util::task::waker(Arc::new(UdpWaker::connect(wake_addr)?)); + let waker = Waker::from(UdpWaker::connect(wake_addr)?); tracing::debug!("agent waker listening on {}", wake_addr); - let (message_tx, message_rx) = crossbeam_channel::unbounded(); + let (message_tx, message_rx) = flume::unbounded(); let wait_group = WaitGroup::new(); let wait_group_thread = wait_group.clone(); @@ -114,7 +116,7 @@ impl AgentBuilder { let agent = AgentContext { multi, - multi_messages: crossbeam_channel::unbounded(), + multi_messages: flume::unbounded(), message_tx, message_rx, wake_socket, @@ -234,7 +236,7 @@ impl Handle { self.waker.wake_by_ref(); Ok(()) } - Err(crossbeam_channel::SendError(_)) => match self.try_join() { + Err(flume::SendError(_)) => match self.try_join() { JoinResult::Err(e) => panic!("agent thread terminated with error: {}", e), JoinResult::Panic => panic!("agent thread panicked"), _ => panic!("agent thread terminated prematurely"), @@ -375,8 +377,8 @@ impl AgentContext { } else { match self.message_rx.try_recv() { Ok(message) => self.handle_message(message)?, - Err(crossbeam_channel::TryRecvError::Empty) => break, - Err(crossbeam_channel::TryRecvError::Disconnected) => { + Err(flume::TryRecvError::Empty) => break, + Err(flume::TryRecvError::Disconnected) => { tracing::warn!("agent handle disconnected without close message"); self.close_requested = true; break; @@ -456,8 +458,8 @@ impl AgentContext { match self.multi_messages.1.try_recv() { // A request completed. Ok((token, result)) => self.complete_request(token, result)?, - Err(crossbeam_channel::TryRecvError::Empty) => break, - Err(crossbeam_channel::TryRecvError::Disconnected) => unreachable!(), + Err(flume::TryRecvError::Empty) => break, + Err(flume::TryRecvError::Disconnected) => unreachable!(), } } diff --git a/src/body.rs b/src/body.rs index 017b654f..ceed168e 100644 --- a/src/body.rs +++ b/src/body.rs @@ -1,14 +1,14 @@ //! Provides types for working with request and response bodies. -use crate::task::Join; use bytes::Bytes; -use futures_io::AsyncRead; -use futures_util::io::{AsyncReadExt, Cursor}; -use std::fmt; -use std::io::{self, Read}; -use std::pin::Pin; -use std::str; -use std::task::{Context, Poll}; +use futures_lite::{future::block_on, io::{AsyncRead, AsyncReadExt}}; +use std::{ + fmt, + io::{self, Cursor, Read}, + pin::Pin, + str, + task::{Context, Poll}, +}; macro_rules! match_type { { @@ -159,7 +159,11 @@ impl Body { impl Read for Body { fn read(&mut self, buf: &mut [u8]) -> io::Result { - AsyncReadExt::read(self, buf).join() + match &mut self.0 { + Inner::Empty => Ok(0), + Inner::Bytes(cursor) => cursor.read(buf), + Inner::AsyncRead(reader, _) => block_on(reader.read(buf)), + } } } @@ -171,7 +175,7 @@ impl AsyncRead for Body { ) -> Poll> { match &mut self.0 { Inner::Empty => Poll::Ready(Ok(0)), - Inner::Bytes(cursor) => AsyncRead::poll_read(Pin::new(cursor), cx, buf), + Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)), Inner::AsyncRead(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf), } } diff --git a/src/client.rs b/src/client.rs index d974ef27..52b3ea19 100644 --- a/src/client.rs +++ b/src/client.rs @@ -9,11 +9,9 @@ use crate::{ handler::{RequestHandler, ResponseBodyReader}, headers, interceptor::{self, Interceptor, InterceptorObj}, - task::Join, Body, Error, }; -use futures_io::AsyncRead; -use futures_util::{future::BoxFuture, pin_mut}; +use futures_lite::{future::block_on, io::AsyncRead, pin}; use http::{ header::{HeaderMap, HeaderName, HeaderValue}, Request, Response, @@ -650,7 +648,7 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.get_async(uri).join() + block_on(self.get_async(uri)) } /// Send a GET request to the given URI asynchronously. @@ -685,7 +683,7 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.head_async(uri).join() + block_on(self.head_async(uri)) } /// Send a HEAD request to the given URI asynchronously. @@ -723,7 +721,7 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.post_async(uri, body).join() + block_on(self.post_async(uri, body)) } /// Send a POST request to the given URI asynchronously with a given request @@ -763,7 +761,7 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.put_async(uri, body).join() + block_on(self.put_async(uri, body)) } /// Send a PUT request to the given URI asynchronously with a given request @@ -789,7 +787,7 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.delete_async(uri).join() + block_on(self.delete_async(uri)) } /// Send a DELETE request to the given URI asynchronously. @@ -855,7 +853,7 @@ impl HttpClient { #[inline] #[tracing::instrument(level = "debug", skip(self, request), err)] pub fn send>(&self, request: Request) -> Result, Error> { - self.send_async(request).join() + block_on(self.send_async(request)) } /// Send an HTTP request and return the HTTP response asynchronously. @@ -1122,7 +1120,7 @@ impl fmt::Debug for HttpClient { } /// A future for a request being executed. -pub struct ResponseFuture<'c>(BoxFuture<'c, Result, Error>>); +pub struct ResponseFuture<'c>(Pin, Error>> + 'c + Send>>); impl<'c> ResponseFuture<'c> { fn new(future: impl Future, Error>> + Send + 'c) -> Self { @@ -1134,8 +1132,8 @@ impl Future for ResponseFuture<'_> { type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_util::future::FutureExt; - self.0.poll_unpin(cx) + // use futures_util::future::FutureExt; + self.0.as_mut().poll(cx) } } @@ -1159,7 +1157,7 @@ impl AsyncRead for ResponseBody { buf: &mut [u8], ) -> Poll> { let inner = &mut self.inner; - pin_mut!(inner); + pin!(inner); inner.poll_read(cx, buf) } } diff --git a/src/handler.rs b/src/handler.rs index 58691f3e..10e5c1f8 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -9,9 +9,8 @@ use crate::{ use crossbeam_utils::atomic::AtomicCell; use curl::easy::{InfoType, ReadError, SeekResult, WriteError}; use curl_sys::CURL; -use futures_channel::oneshot::Sender; -use futures_io::{AsyncRead, AsyncWrite}; -use futures_util::{pin_mut, task::AtomicWaker}; +use flume::Sender; +use futures_lite::{io::{AsyncRead, AsyncWrite}, pin}; use http::{Response, Uri}; use once_cell::sync::OnceCell; use sluice::pipe; @@ -104,9 +103,6 @@ unsafe impl Send for RequestHandler {} /// This is also used to keep track of the lifetime of the request. #[derive(Debug)] struct Shared { - /// A waker used by the handler to wake up the associated future. - waker: AtomicWaker, - /// Set to the final result of the transfer received from curl. This is used /// to communicate an error while reading the response body if the handler /// suddenly aborts. @@ -127,9 +123,8 @@ impl RequestHandler { Self, impl Future, Error>>, ) { - let (sender, receiver) = futures_channel::oneshot::channel(); + let (sender, receiver) = flume::bounded(1); let shared = Arc::new(Shared { - waker: AtomicWaker::default(), result: OnceCell::new(), response_body_dropped: AtomicCell::new(false), }); @@ -153,7 +148,7 @@ impl RequestHandler { // Create a future that resolves when the handler receives the response // headers. let future = async move { - let builder = receiver.await.map_err(|_| Error::Aborted)??; + let builder = receiver.recv_async().await.map_err(|_| Error::Aborted)??; let reader = ResponseBodyReader { inner: response_body_reader, @@ -187,7 +182,7 @@ impl RequestHandler { fn is_future_canceled(&self) -> bool { self.sender .as_ref() - .map(Sender::is_canceled) + .map(Sender::is_disconnected) .unwrap_or(false) } @@ -289,13 +284,8 @@ impl RequestHandler { tracing::warn!("request completed with error: {}", e); } - match sender.send(result) { - Ok(()) => { - self.shared.waker.wake(); - } - Err(_) => { - tracing::debug!("request canceled by user"); - } + if sender.send(result).is_err() { + tracing::debug!("request canceled by user"); } } } @@ -692,7 +682,7 @@ impl AsyncRead for ResponseBodyReader { buf: &mut [u8], ) -> Poll> { let inner = &mut self.inner; - pin_mut!(inner); + pin!(inner); match inner.poll_read(cx, buf) { // On EOF, check to see if the transfer was cancelled, and if so, diff --git a/src/interceptor/mod.rs b/src/interceptor/mod.rs index 0f563c93..e056891e 100644 --- a/src/interceptor/mod.rs +++ b/src/interceptor/mod.rs @@ -28,11 +28,12 @@ /// enabled. use crate::Body; -use futures_util::future::BoxFuture; use http::{Request, Response}; use std::{ error::Error, fmt, + future::Future, + pin::Pin, }; mod context; @@ -81,7 +82,7 @@ pub trait Interceptor: Send + Sync { } /// The type of future returned by an interceptor. -pub type InterceptorFuture<'a, E> = BoxFuture<'a, Result, E>>; +pub type InterceptorFuture<'a, E> = Pin, E>> + Send + 'a>>; /// Creates an interceptor from an arbitrary closure or function. pub fn from_fn(f: F) -> InterceptorFn diff --git a/src/response.rs b/src/response.rs index 7643f091..e32d94c3 100644 --- a/src/response.rs +++ b/src/response.rs @@ -144,7 +144,7 @@ pub trait ResponseExt { #[cfg(feature = "text-decoding")] fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T> where - T: futures_io::AsyncRead + Unpin; + T: futures_lite::io::AsyncRead + Unpin; /// Deserialize the response body as JSON into a given type. /// @@ -210,7 +210,7 @@ impl ResponseExt for Response { #[cfg(feature = "text-decoding")] fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T> where - T: futures_io::AsyncRead + Unpin, + T: futures_lite::io::AsyncRead + Unpin, { crate::text::Decoder::for_response(&self).decode_reader_async(self.body_mut()) } diff --git a/src/task.rs b/src/task.rs index 539aa685..3756a9ec 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,59 +1,11 @@ //! Helpers for working with tasks and futures. use crate::Error; -use crossbeam_utils::sync::{Parker, Unparker}; -use futures_util::{pin_mut, task::ArcWake}; use std::{ - future::Future, net::{SocketAddr, UdpSocket}, - sync::Arc, - task::{Context, Poll, Waker}, + task::Waker, }; -/// Extension trait for efficiently blocking on a future. -pub(crate) trait Join: Future { - fn join(self) -> ::Output; -} - -impl Join for F { - fn join(self) -> ::Output { - struct ThreadWaker(Unparker); - - impl ArcWake for ThreadWaker { - fn wake_by_ref(arc_self: &Arc) { - arc_self.0.unpark(); - } - } - - let parker = Parker::new(); - let waker = futures_util::task::waker(Arc::new(ThreadWaker(parker.unparker().clone()))); - let mut context = Context::from_waker(&waker); - - let future = self; - pin_mut!(future); - - loop { - match future.as_mut().poll(&mut context) { - Poll::Ready(output) => return output, - Poll::Pending => parker.park(), - } - } - } -} - -/// Create a waker from a closure. -fn waker_fn(f: impl Fn() + Send + Sync + 'static) -> Waker { - struct Impl(F); - - impl ArcWake for Impl { - fn wake_by_ref(arc_self: &Arc) { - (&arc_self.0)() - } - } - - futures_util::task::waker(Arc::new(Impl(f))) -} - /// Helper methods for working with wakers. pub(crate) trait WakerExt { /// Create a new waker from a closure that accepts this waker as an @@ -64,7 +16,7 @@ pub(crate) trait WakerExt { impl WakerExt for Waker { fn chain(&self, f: impl Fn(&Waker) + Send + Sync + 'static) -> Waker { let inner = self.clone(); - waker_fn(move || (f)(&inner)) + waker_fn::waker_fn(move || (f)(&inner)) } } @@ -87,14 +39,14 @@ impl UdpWaker { } } -impl ArcWake for UdpWaker { - /// Request the connected agent event loop to wake up. Just like a morning - /// person would do. - fn wake_by_ref(arc_self: &Arc) { - // We don't actually care here if this succeeds. Maybe the agent is - // busy, or tired, or just needs some alone time right now. - if let Err(e) = arc_self.socket.send(&[1]) { - tracing::debug!("agent waker produced an error: {}", e); - } +impl From for Waker { + fn from(waker: UdpWaker) -> Self { + waker_fn::waker_fn(move || { + // We don't actually care here if this succeeds. Maybe the agent is + // busy, or tired, or just needs some alone time right now. + if let Err(e) = waker.socket.send(&[1]) { + tracing::debug!("agent waker produced an error: {}", e); + } + }) } } diff --git a/src/text.rs b/src/text.rs index fee8ed0c..98d4d40d 100644 --- a/src/text.rs +++ b/src/text.rs @@ -3,11 +3,7 @@ #![cfg(feature = "text-decoding")] use encoding_rs::{CoderResult, Encoding}; -use futures_io::AsyncRead; -use futures_util::{ - future::{FutureExt, LocalBoxFuture}, - io::AsyncReadExt, -}; +use futures_lite::io::{AsyncRead, AsyncReadExt}; use http::Response; use std::{ future::Future, @@ -44,7 +40,7 @@ macro_rules! decode_reader { /// A future returning a response body decoded as text. #[allow(missing_debug_implementations)] pub struct TextFuture<'a, R> { - inner: LocalBoxFuture<'a, io::Result>, + inner: Pin> + 'a>>, _phantom: PhantomData, } @@ -52,7 +48,7 @@ impl<'a, R: Unpin> Future for TextFuture<'a, R> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().inner.poll_unpin(cx) + self.as_mut().inner.as_mut().poll(cx) } } From de906ad1c46ba5b9d234066837d2c1b7b66ce6a5 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Thu, 12 Nov 2020 00:01:49 -0600 Subject: [PATCH 2/2] Remove extra comment --- src/client.rs | 1 - src/response.rs | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 52b3ea19..9082c5c6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1132,7 +1132,6 @@ impl Future for ResponseFuture<'_> { type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // use futures_util::future::FutureExt; self.0.as_mut().poll(cx) } } diff --git a/src/response.rs b/src/response.rs index e32d94c3..119f139d 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,4 +1,5 @@ use crate::Metrics; +use futures_lite::io::AsyncRead; use http::{Response, Uri}; use std::{ fs::File, @@ -144,7 +145,7 @@ pub trait ResponseExt { #[cfg(feature = "text-decoding")] fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T> where - T: futures_lite::io::AsyncRead + Unpin; + T: AsyncRead + Unpin; /// Deserialize the response body as JSON into a given type. /// @@ -210,7 +211,7 @@ impl ResponseExt for Response { #[cfg(feature = "text-decoding")] fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T> where - T: futures_lite::io::AsyncRead + Unpin, + T: AsyncRead + Unpin, { crate::text::Decoder::for_response(&self).decode_reader_async(self.body_mut()) }