Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim some heavy dependencies #254

Merged
merged 2 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ unstable-interceptors = []

[dependencies]
bytes = "0.5"
crossbeam-channel = "0.5"
crossbeam-utils = "0.8"
curl = "0.4.34"
curl-sys = "0.4.37"
futures-channel = "0.3"
futures-io = "0.3"
futures-lite = "1.11"
http = "0.2.1"
log = "0.4"
once_cell = "1"
slab = "0.4"
sluice = "0.5"
waker-fn = "1"

[dependencies.chrono]
version = "0.4"
Expand All @@ -52,10 +51,10 @@ optional = true
version = "0.8"
optional = true

[dependencies.futures-util]
version = "0.3"
[dependencies.flume]
version = "0.9"
default-features = false
features = ["io"]
features = ["async"]

[dependencies.mime]
version = "0.3"
Expand Down
30 changes: 16 additions & 14 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
use crate::handler::RequestHandler;
use crate::task::{UdpWaker, WakerExt};
use crate::Error;
use crossbeam_channel::{Receiver, Sender};
use crossbeam_utils::sync::WaitGroup;
use curl::multi::WaitFd;
use flume::{Receiver, Sender};
use slab::Slab;
use std::net::UdpSocket;
use std::sync::{Arc, Mutex};
use std::task::Waker;
use std::thread;
use std::time::{Duration, Instant};
use std::{
net::UdpSocket,
sync::Mutex,
task::Waker,
thread,
time::{Duration, Instant},
};

const WAIT_TIMEOUT: Duration = Duration::from_millis(100);

Expand Down Expand Up @@ -72,10 +74,10 @@ impl AgentBuilder {
wake_socket.set_nonblocking(true)?;
let wake_addr = wake_socket.local_addr()?;
let port = wake_addr.port();
let waker = futures_util::task::waker(Arc::new(UdpWaker::connect(wake_addr)?));
let waker = Waker::from(UdpWaker::connect(wake_addr)?);
tracing::debug!("agent waker listening on {}", wake_addr);

let (message_tx, message_rx) = crossbeam_channel::unbounded();
let (message_tx, message_rx) = flume::unbounded();

let wait_group = WaitGroup::new();
let wait_group_thread = wait_group.clone();
Expand Down Expand Up @@ -114,7 +116,7 @@ impl AgentBuilder {

let agent = AgentContext {
multi,
multi_messages: crossbeam_channel::unbounded(),
multi_messages: flume::unbounded(),
message_tx,
message_rx,
wake_socket,
Expand Down Expand Up @@ -234,7 +236,7 @@ impl Handle {
self.waker.wake_by_ref();
Ok(())
}
Err(crossbeam_channel::SendError(_)) => match self.try_join() {
Err(flume::SendError(_)) => match self.try_join() {
JoinResult::Err(e) => panic!("agent thread terminated with error: {}", e),
JoinResult::Panic => panic!("agent thread panicked"),
_ => panic!("agent thread terminated prematurely"),
Expand Down Expand Up @@ -375,8 +377,8 @@ impl AgentContext {
} else {
match self.message_rx.try_recv() {
Ok(message) => self.handle_message(message)?,
Err(crossbeam_channel::TryRecvError::Empty) => break,
Err(crossbeam_channel::TryRecvError::Disconnected) => {
Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => {
tracing::warn!("agent handle disconnected without close message");
self.close_requested = true;
break;
Expand Down Expand Up @@ -456,8 +458,8 @@ impl AgentContext {
match self.multi_messages.1.try_recv() {
// A request completed.
Ok((token, result)) => self.complete_request(token, result)?,
Err(crossbeam_channel::TryRecvError::Empty) => break,
Err(crossbeam_channel::TryRecvError::Disconnected) => unreachable!(),
Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => unreachable!(),
}
}

Expand Down
24 changes: 14 additions & 10 deletions src/body.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Provides types for working with request and response bodies.

use crate::task::Join;
use bytes::Bytes;
use futures_io::AsyncRead;
use futures_util::io::{AsyncReadExt, Cursor};
use std::fmt;
use std::io::{self, Read};
use std::pin::Pin;
use std::str;
use std::task::{Context, Poll};
use futures_lite::{future::block_on, io::{AsyncRead, AsyncReadExt}};
use std::{
fmt,
io::{self, Cursor, Read},
pin::Pin,
str,
task::{Context, Poll},
};

macro_rules! match_type {
{
Expand Down Expand Up @@ -159,7 +159,11 @@ impl Body {

impl Read for Body {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
AsyncReadExt::read(self, buf).join()
match &mut self.0 {
Inner::Empty => Ok(0),
Inner::Bytes(cursor) => cursor.read(buf),
Inner::AsyncRead(reader, _) => block_on(reader.read(buf)),
}
}
}

Expand All @@ -171,7 +175,7 @@ impl AsyncRead for Body {
) -> Poll<io::Result<usize>> {
match &mut self.0 {
Inner::Empty => Poll::Ready(Ok(0)),
Inner::Bytes(cursor) => AsyncRead::poll_read(Pin::new(cursor), cx, buf),
Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
Inner::AsyncRead(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf),
}
}
Expand Down
23 changes: 10 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ use crate::{
handler::{RequestHandler, ResponseBodyReader},
headers,
interceptor::{self, Interceptor, InterceptorObj},
task::Join,
Body, Error,
};
use futures_io::AsyncRead;
use futures_util::{future::BoxFuture, pin_mut};
use futures_lite::{future::block_on, io::AsyncRead, pin};
use http::{
header::{HeaderMap, HeaderName, HeaderValue},
Request, Response,
Expand Down Expand Up @@ -650,7 +648,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.get_async(uri).join()
block_on(self.get_async(uri))
}

/// Send a GET request to the given URI asynchronously.
Expand Down Expand Up @@ -685,7 +683,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.head_async(uri).join()
block_on(self.head_async(uri))
}

/// Send a HEAD request to the given URI asynchronously.
Expand Down Expand Up @@ -723,7 +721,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.post_async(uri, body).join()
block_on(self.post_async(uri, body))
}

/// Send a POST request to the given URI asynchronously with a given request
Expand Down Expand Up @@ -763,7 +761,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.put_async(uri, body).join()
block_on(self.put_async(uri, body))
}

/// Send a PUT request to the given URI asynchronously with a given request
Expand All @@ -789,7 +787,7 @@ impl HttpClient {
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.delete_async(uri).join()
block_on(self.delete_async(uri))
}

/// Send a DELETE request to the given URI asynchronously.
Expand Down Expand Up @@ -855,7 +853,7 @@ impl HttpClient {
#[inline]
#[tracing::instrument(level = "debug", skip(self, request), err)]
pub fn send<B: Into<Body>>(&self, request: Request<B>) -> Result<Response<Body>, Error> {
self.send_async(request).join()
block_on(self.send_async(request))
}

/// Send an HTTP request and return the HTTP response asynchronously.
Expand Down Expand Up @@ -1122,7 +1120,7 @@ impl fmt::Debug for HttpClient {
}

/// A future for a request being executed.
pub struct ResponseFuture<'c>(BoxFuture<'c, Result<Response<Body>, Error>>);
pub struct ResponseFuture<'c>(Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + 'c + Send>>);

impl<'c> ResponseFuture<'c> {
fn new(future: impl Future<Output = Result<Response<Body>, Error>> + Send + 'c) -> Self {
Expand All @@ -1134,8 +1132,7 @@ impl Future for ResponseFuture<'_> {
type Output = Result<Response<Body>, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_util::future::FutureExt;
self.0.poll_unpin(cx)
self.0.as_mut().poll(cx)
}
}

Expand All @@ -1159,7 +1156,7 @@ impl AsyncRead for ResponseBody {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = &mut self.inner;
pin_mut!(inner);
pin!(inner);
inner.poll_read(cx, buf)
}
}
Expand Down
26 changes: 8 additions & 18 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use crate::{
use crossbeam_utils::atomic::AtomicCell;
use curl::easy::{InfoType, ReadError, SeekResult, WriteError};
use curl_sys::CURL;
use futures_channel::oneshot::Sender;
use futures_io::{AsyncRead, AsyncWrite};
use futures_util::{pin_mut, task::AtomicWaker};
use flume::Sender;
use futures_lite::{io::{AsyncRead, AsyncWrite}, pin};
use http::{Response, Uri};
use once_cell::sync::OnceCell;
use sluice::pipe;
Expand Down Expand Up @@ -104,9 +103,6 @@ unsafe impl Send for RequestHandler {}
/// This is also used to keep track of the lifetime of the request.
#[derive(Debug)]
struct Shared {
/// A waker used by the handler to wake up the associated future.
waker: AtomicWaker,

/// Set to the final result of the transfer received from curl. This is used
/// to communicate an error while reading the response body if the handler
/// suddenly aborts.
Expand All @@ -127,9 +123,8 @@ impl RequestHandler {
Self,
impl Future<Output = Result<Response<ResponseBodyReader>, Error>>,
) {
let (sender, receiver) = futures_channel::oneshot::channel();
let (sender, receiver) = flume::bounded(1);
let shared = Arc::new(Shared {
waker: AtomicWaker::default(),
result: OnceCell::new(),
response_body_dropped: AtomicCell::new(false),
});
Expand All @@ -153,7 +148,7 @@ impl RequestHandler {
// Create a future that resolves when the handler receives the response
// headers.
let future = async move {
let builder = receiver.await.map_err(|_| Error::Aborted)??;
let builder = receiver.recv_async().await.map_err(|_| Error::Aborted)??;

let reader = ResponseBodyReader {
inner: response_body_reader,
Expand Down Expand Up @@ -187,7 +182,7 @@ impl RequestHandler {
fn is_future_canceled(&self) -> bool {
self.sender
.as_ref()
.map(Sender::is_canceled)
.map(Sender::is_disconnected)
.unwrap_or(false)
}

Expand Down Expand Up @@ -289,13 +284,8 @@ impl RequestHandler {
tracing::warn!("request completed with error: {}", e);
}

match sender.send(result) {
Ok(()) => {
self.shared.waker.wake();
}
Err(_) => {
tracing::debug!("request canceled by user");
}
if sender.send(result).is_err() {
tracing::debug!("request canceled by user");
}
}
}
Expand Down Expand Up @@ -692,7 +682,7 @@ impl AsyncRead for ResponseBodyReader {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = &mut self.inner;
pin_mut!(inner);
pin!(inner);

match inner.poll_read(cx, buf) {
// On EOF, check to see if the transfer was cancelled, and if so,
Expand Down
5 changes: 3 additions & 2 deletions src/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
/// enabled.

use crate::Body;
use futures_util::future::BoxFuture;
use http::{Request, Response};
use std::{
error::Error,
fmt,
future::Future,
pin::Pin,
};

mod context;
Expand Down Expand Up @@ -81,7 +82,7 @@ pub trait Interceptor: Send + Sync {
}

/// The type of future returned by an interceptor.
pub type InterceptorFuture<'a, E> = BoxFuture<'a, Result<Response<Body>, E>>;
pub type InterceptorFuture<'a, E> = Pin<Box<dyn Future<Output = Result<Response<Body>, E>> + Send + 'a>>;

/// Creates an interceptor from an arbitrary closure or function.
pub fn from_fn<F, E>(f: F) -> InterceptorFn<F>
Expand Down
5 changes: 3 additions & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::Metrics;
use futures_lite::io::AsyncRead;
use http::{Response, Uri};
use std::{
fs::File,
Expand Down Expand Up @@ -144,7 +145,7 @@ pub trait ResponseExt<T> {
#[cfg(feature = "text-decoding")]
fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T>
where
T: futures_io::AsyncRead + Unpin;
T: AsyncRead + Unpin;

/// Deserialize the response body as JSON into a given type.
///
Expand Down Expand Up @@ -210,7 +211,7 @@ impl<T> ResponseExt<T> for Response<T> {
#[cfg(feature = "text-decoding")]
fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T>
where
T: futures_io::AsyncRead + Unpin,
T: AsyncRead + Unpin,
{
crate::text::Decoder::for_response(&self).decode_reader_async(self.body_mut())
}
Expand Down
Loading