Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port hyperlocal client to latest hyper #29

Merged
merged 13 commits into from
Jan 10, 2020
5 changes: 5 additions & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version = "Two"
use_field_init_shorthand = true
merge_imports = true
wrap_comments = true
use_try_shorthand = true
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# 0.6.0

* upgrade to hyper 0.13
* upgrade hex to 0.3 [#15](https://github.com/softprops/hyperlocal/pull/15)
* move from tokio-core to tokio 0.1 [#16](https://github.com/softprops/hyperlocal/pull/16)
* don't explicitly block on unix socket connection [#18](https://github.com/softprops/hyperlocal/pull/18)
Expand Down
17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ readme = "README.md"
edition = "2018"

[dependencies]
hex = "0.4"
hyper = { version = "0.13.0-alpha.4", features = ["runtime", "unstable-stream"] }
tokio-net = "0.2.0-alpha.6"
futures-core-preview = "0.3.0-alpha.19"
futures-util-preview = "0.3.0-alpha.19"
hex = "0.4.0"
hyper = { version = "0.13.1", features = ["stream"] }
tokio = { version = "0.2.6", features = ["uds", "stream"] }
pin-project = "0.4.6"
futures-util = "0.3.1"

[dev-dependencies]
tokio = "0.2.0-alpha.6"
tokio = { version = "0.2.6", features = ["uds", "stream", "macros"]}

[features]
client = []
server = []
default = ["client", "server"]
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ hyperlocal = "0.7-alpha.1"
A typical server can be built with `hyperlocal::server::UnixServerExt`.

```rust
use std::error::Error;
use std::fs;
use std::path::Path;
use std::{error::Error, fs, path::Path};

use hyper::{
service::{make_service_fn, service_fn},
Expand All @@ -49,12 +47,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
fs::remove_file(path)?;
}

let make_service = make_service_fn(|_| {
async {
Ok::<_, hyper::Error>(service_fn(|_req| {
async { Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE))) }
}))
}
let make_service = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(|_req| async {
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}))
});

Server::bind_unix(path)?.serve(make_service).await?;
Expand All @@ -78,20 +74,25 @@ socket and the resource URI path and query string.
use std::error::Error;
use std::path::Path;

use futures_util::try_stream::TryStreamExt;
use futures_util::stream::TryStreamExt;
use hyper::{Body, Client};
use hyperlocal::{UnixConnector, Uri};
use hyperlocal::{Uri, UnixClientExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let path = Path::new("/tmp/hyperlocal.sock");

let client = Client::builder().build::<_, Body>(UnixConnector::default());

let url = Uri::new(path, "/").into();

let response = client.get(url).await?;
let bytes = response.into_body().try_concat().await?.to_vec();
let client = Client::unix();

let response_body = client.get(url).await?.into_body;

let bytes = response_body
.try_fold(Vec::default(), |mut buf, bytes| async {
buf.extend(bytes);
Ok(buf)
})
.await?;

println!("{}", String::from_utf8(bytes)?);

Expand Down
21 changes: 12 additions & 9 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use std::error::Error;
use std::path::Path;

use futures_util::try_stream::TryStreamExt;
use hyper::{Body, Client};
use hyperlocal::{UnixConnector, Uri};
use futures_util::stream::TryStreamExt;
use hyper::Client;
use hyperlocal::{UnixClientExt, Uri};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let path = Path::new("/tmp/hyperlocal.sock");
let url = Uri::new("/tmp/hyperlocal.sock", "/").into();

let client = Client::builder().build::<_, Body>(UnixConnector::default());
let client = Client::unix();

let url = Uri::new(path, "/").into();
let response_body = client.get(url).await?.into_body();

let response = client.get(url).await?;
let bytes = response.into_body().try_concat().await?.to_vec();
let bytes = response_body
.try_fold(Vec::default(), |mut v, bytes| async {
v.extend(bytes);
Ok(v)
})
.await?;

println!("{}", String::from_utf8(bytes)?);

Expand Down
14 changes: 5 additions & 9 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::error::Error;
use std::fs;
use std::path::Path;
use std::{error::Error, fs, path::Path};

use hyper::{
service::{make_service_fn, service_fn},
Expand All @@ -18,12 +16,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
fs::remove_file(path)?;
}

let make_service = make_service_fn(|_| {
async {
Ok::<_, hyper::Error>(service_fn(|_req| {
async { Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE))) }
}))
}
let make_service = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(|_req| async {
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}))
});

Server::bind_unix(path)?.serve(make_service).await?;
Expand Down
144 changes: 144 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use futures_util::future::BoxFuture;
use hex::FromHex;
use hyper::{
client::connect::{Connected, Connection},
service::Service,
Body, Client, Uri,
};
use pin_project::pin_project;
use std::{
io,
path::{Path, PathBuf},
pin::Pin,
task::{Context, Poll},
};

#[pin_project]
#[derive(Debug)]
pub struct UnixStream {
#[pin]
unix_stream: tokio::net::UnixStream,
}

impl UnixStream {
async fn connect<P>(path: P) -> std::io::Result<Self>
where
P: AsRef<Path>,
{
let unix_stream = tokio::net::UnixStream::connect(path).await?;
Ok(Self { unix_stream })
}
}

impl tokio::io::AsyncWrite for UnixStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.project().unix_stream.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().unix_stream.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().unix_stream.poll_shutdown(cx)
}
}

impl tokio::io::AsyncRead for UnixStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.project().unix_stream.poll_read(cx, buf)
}
}

/// the `[UnixConnector]` can be used to construct a `[hyper::Client]` which can
/// speak to a unix domain socket.
///
/// # Example
/// ```
/// use hyper::{Client, Body};
/// use hyperlocal::UnixConnector;
///
/// let connector = UnixConnector;
/// let client: Client<UnixConnector, Body> = Client::builder().build(connector);
/// ```
///
/// # Note
/// If you don't need access to the low-level `[hyper::Client]` builder
/// interface, consider using the `[UnixClientExt]` trait instead.
#[derive(Clone, Copy, Debug, Default)]
pub struct UnixConnector;

impl Unpin for UnixConnector {}

impl Service<Uri> for UnixConnector {
type Response = UnixStream;
type Error = std::io::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&mut self, req: Uri) -> Self::Future {
let fut = async move {
let path = parse_socket_path(req)?;
UnixStream::connect(path).await
};

Box::pin(fut)
}
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}

impl Connection for UnixStream {
fn connected(&self) -> Connected {
Connected::new()
}
}

fn parse_socket_path(uri: Uri) -> Result<std::path::PathBuf, io::Error> {
if uri.scheme_str() != Some("unix") {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URL, scheme must be unix",
));
}

if let Some(host) = uri.host() {
let bytes = Vec::from_hex(host).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URL, host must be a hex-encoded path",
)
})?;

Ok(PathBuf::from(String::from_utf8_lossy(&bytes).into_owned()))
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URL, host must be present",
))
}
}

/// Extention trait for constructing a hyper HTTP client over a Unix domain
/// socket.
pub trait UnixClientExt {
/// Construct a client which speaks HTTP over a Unix domain socket
///
/// # Example
/// ```
/// use hyper::Client;
/// use hyperlocal::UnixClientExt;
///
/// let client = Client::unix();
/// ```
fn unix() -> Client<UnixConnector, Body> {
Client::builder().build(UnixConnector)
}
}

impl UnixClientExt for Client<UnixConnector> {}
80 changes: 0 additions & 80 deletions src/client/mod.rs

This file was deleted.

Loading