Skip to content

Commit

Permalink
Merge pull request #29 from danieleades/update-hyper
Browse files Browse the repository at this point in the history
port hyperlocal client to latest hyper
  • Loading branch information
softprops authored Jan 10, 2020
2 parents 486caac + 5c1daea commit 53b2a13
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 266 deletions.
5 changes: 5 additions & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version = "Two"
use_field_init_shorthand = true
merge_imports = true
wrap_comments = true
use_try_shorthand = true
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# 0.6.0

* upgrade to hyper 0.13
* upgrade hex to 0.3 [#15](https://github.com/softprops/hyperlocal/pull/15)
* move from tokio-core to tokio 0.1 [#16](https://github.com/softprops/hyperlocal/pull/16)
* don't explicitly block on unix socket connection [#18](https://github.com/softprops/hyperlocal/pull/18)
Expand Down
17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ readme = "README.md"
edition = "2018"

[dependencies]
hex = "0.4"
hyper = { version = "0.13.0-alpha.4", features = ["runtime", "unstable-stream"] }
tokio-net = "0.2.0-alpha.6"
futures-core-preview = "0.3.0-alpha.19"
futures-util-preview = "0.3.0-alpha.19"
hex = "0.4.0"
hyper = { version = "0.13.1", features = ["stream"] }
tokio = { version = "0.2.6", features = ["uds", "stream"] }
pin-project = "0.4.6"
futures-util = "0.3.1"

[dev-dependencies]
tokio = "0.2.0-alpha.6"
tokio = { version = "0.2.6", features = ["uds", "stream", "macros"]}

[features]
client = []
server = []
default = ["client", "server"]
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ hyperlocal = "0.7-alpha.1"
A typical server can be built with `hyperlocal::server::UnixServerExt`.

```rust
use std::error::Error;
use std::fs;
use std::path::Path;
use std::{error::Error, fs, path::Path};

use hyper::{
service::{make_service_fn, service_fn},
Expand All @@ -49,12 +47,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
fs::remove_file(path)?;
}

let make_service = make_service_fn(|_| {
async {
Ok::<_, hyper::Error>(service_fn(|_req| {
async { Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE))) }
}))
}
let make_service = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(|_req| async {
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}))
});

Server::bind_unix(path)?.serve(make_service).await?;
Expand All @@ -78,20 +74,25 @@ socket and the resource URI path and query string.
use std::error::Error;
use std::path::Path;

use futures_util::try_stream::TryStreamExt;
use futures_util::stream::TryStreamExt;
use hyper::{Body, Client};
use hyperlocal::{UnixConnector, Uri};
use hyperlocal::{Uri, UnixClientExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let path = Path::new("/tmp/hyperlocal.sock");

let client = Client::builder().build::<_, Body>(UnixConnector::default());

let url = Uri::new(path, "/").into();

let response = client.get(url).await?;
let bytes = response.into_body().try_concat().await?.to_vec();
let client = Client::unix();

let response_body = client.get(url).await?.into_body;

let bytes = response_body
.try_fold(Vec::default(), |mut buf, bytes| async {
buf.extend(bytes);
Ok(buf)
})
.await?;

println!("{}", String::from_utf8(bytes)?);

Expand Down
21 changes: 12 additions & 9 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use std::error::Error;
use std::path::Path;

use futures_util::try_stream::TryStreamExt;
use hyper::{Body, Client};
use hyperlocal::{UnixConnector, Uri};
use futures_util::stream::TryStreamExt;
use hyper::Client;
use hyperlocal::{UnixClientExt, Uri};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let path = Path::new("/tmp/hyperlocal.sock");
let url = Uri::new("/tmp/hyperlocal.sock", "/").into();

let client = Client::builder().build::<_, Body>(UnixConnector::default());
let client = Client::unix();

let url = Uri::new(path, "/").into();
let response_body = client.get(url).await?.into_body();

let response = client.get(url).await?;
let bytes = response.into_body().try_concat().await?.to_vec();
let bytes = response_body
.try_fold(Vec::default(), |mut v, bytes| async {
v.extend(bytes);
Ok(v)
})
.await?;

println!("{}", String::from_utf8(bytes)?);

Expand Down
14 changes: 5 additions & 9 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::error::Error;
use std::fs;
use std::path::Path;
use std::{error::Error, fs, path::Path};

use hyper::{
service::{make_service_fn, service_fn},
Expand All @@ -18,12 +16,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
fs::remove_file(path)?;
}

let make_service = make_service_fn(|_| {
async {
Ok::<_, hyper::Error>(service_fn(|_req| {
async { Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE))) }
}))
}
let make_service = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(|_req| async {
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}))
});

Server::bind_unix(path)?.serve(make_service).await?;
Expand Down
144 changes: 144 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use futures_util::future::BoxFuture;
use hex::FromHex;
use hyper::{
client::connect::{Connected, Connection},
service::Service,
Body, Client, Uri,
};
use pin_project::pin_project;
use std::{
io,
path::{Path, PathBuf},
pin::Pin,
task::{Context, Poll},
};

#[pin_project]
#[derive(Debug)]
pub struct UnixStream {
#[pin]
unix_stream: tokio::net::UnixStream,
}

impl UnixStream {
async fn connect<P>(path: P) -> std::io::Result<Self>
where
P: AsRef<Path>,
{
let unix_stream = tokio::net::UnixStream::connect(path).await?;
Ok(Self { unix_stream })
}
}

impl tokio::io::AsyncWrite for UnixStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.project().unix_stream.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().unix_stream.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().unix_stream.poll_shutdown(cx)
}
}

impl tokio::io::AsyncRead for UnixStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.project().unix_stream.poll_read(cx, buf)
}
}

/// the `[UnixConnector]` can be used to construct a `[hyper::Client]` which can
/// speak to a unix domain socket.
///
/// # Example
/// ```
/// use hyper::{Client, Body};
/// use hyperlocal::UnixConnector;
///
/// let connector = UnixConnector;
/// let client: Client<UnixConnector, Body> = Client::builder().build(connector);
/// ```
///
/// # Note
/// If you don't need access to the low-level `[hyper::Client]` builder
/// interface, consider using the `[UnixClientExt]` trait instead.
#[derive(Clone, Copy, Debug, Default)]
pub struct UnixConnector;

impl Unpin for UnixConnector {}

impl Service<Uri> for UnixConnector {
type Response = UnixStream;
type Error = std::io::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&mut self, req: Uri) -> Self::Future {
let fut = async move {
let path = parse_socket_path(req)?;
UnixStream::connect(path).await
};

Box::pin(fut)
}
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}

impl Connection for UnixStream {
fn connected(&self) -> Connected {
Connected::new()
}
}

fn parse_socket_path(uri: Uri) -> Result<std::path::PathBuf, io::Error> {
if uri.scheme_str() != Some("unix") {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URL, scheme must be unix",
));
}

if let Some(host) = uri.host() {
let bytes = Vec::from_hex(host).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URL, host must be a hex-encoded path",
)
})?;

Ok(PathBuf::from(String::from_utf8_lossy(&bytes).into_owned()))
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URL, host must be present",
))
}
}

/// Extention trait for constructing a hyper HTTP client over a Unix domain
/// socket.
pub trait UnixClientExt {
/// Construct a client which speaks HTTP over a Unix domain socket
///
/// # Example
/// ```
/// use hyper::Client;
/// use hyperlocal::UnixClientExt;
///
/// let client = Client::unix();
/// ```
fn unix() -> Client<UnixConnector, Body> {
Client::builder().build(UnixConnector)
}
}

impl UnixClientExt for Client<UnixConnector> {}
80 changes: 0 additions & 80 deletions src/client/mod.rs

This file was deleted.

Loading

0 comments on commit 53b2a13

Please sign in to comment.