Skip to content

Commit

Permalink
Merge pull request #188 from fastly/mgattozzi/async-hostcall
Browse files Browse the repository at this point in the history
  • Loading branch information
mgattozzi authored Nov 16, 2022
2 parents 48ea9ef + 12ec538 commit dc9e8d9
Show file tree
Hide file tree
Showing 22 changed files with 1,195 additions and 446 deletions.
362 changes: 208 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

230 changes: 230 additions & 0 deletions cli/tests/integration/async_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use crate::common::Test;
use crate::common::TestResult;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Barrier;

// On Windows, streaming body backpressure doesn't seem to work as expected, either
// due to the Hyper client or server too eagerly clearing the chunk buffer. This issue does
// not appear related to async I/O hostcalls; the behavior is seen within the streaming body
// implementation in general. For the time being, this test is unix-only.
//
// https://github.com/fastly/Viceroy/issues/207 tracks the broader issue.
#[cfg(target_family = "unix")]
#[tokio::test(flavor = "multi_thread")]
async fn async_io_methods() -> TestResult {
let request_count = Arc::new(AtomicUsize::new(0));
let req_count_1 = request_count.clone();
let req_count_2 = request_count.clone();
let req_count_3 = request_count.clone();

let barrier = Arc::new(Barrier::new(3));
let barrier_1 = barrier.clone();
let barrier_2 = barrier.clone();

let test = Test::using_fixture("async_io.wasm")
.backend("Simple", "http://127.0.0.1:9000/", None)
.async_host(9000, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "simple.org");
let req_count_1 = req_count_1.clone();
let barrier_1 = barrier_1.clone();
Box::new(async move {
match req_count_1.load(Ordering::Relaxed) {
0 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
})
.backend("ReadBody", "http://127.0.0.1:9001/", None)
.async_host(9001, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "readbody.org");
let req_count_2 = req_count_2.clone();
Box::new(async move {
match req_count_2.load(Ordering::Relaxed) {
0 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
1 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
3 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
_ => unreachable!(),
}
})
})
.backend("WriteBody", "http://127.0.0.1:9002/", None)
.async_host(9002, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_3 = req_count_3.clone();
let barrier_2 = barrier_2.clone();
Box::new(async move {
match req_count_3.load(Ordering::Relaxed) {
0 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
2 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
let _body = hyper::body::to_bytes(req.into_body()).await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
});

// request_count is 0 here
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "timeout");

barrier.wait().await;

request_count.store(1, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "true");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "0");
let temp_barrier = barrier.clone();
let _task = tokio::task::spawn(async move { temp_barrier.wait().await });
barrier.wait().await;

request_count.store(2, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "true");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "1");
barrier.wait().await;

request_count.store(3, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "true");
assert_eq!(resp.headers()["Ready-Index"], "2");
let temp_barrier = barrier.clone();
let _task = tokio::task::spawn(async move { temp_barrier.wait().await });
barrier.wait().await;

let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.header("Empty-Select-Timeout", "0")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);

let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.header("Empty-Select-Timeout", "1")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Ready-Index"], "timeout");

Ok(())
}
59 changes: 41 additions & 18 deletions cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use futures::stream::StreamExt;
use hyper::{service, Body as HyperBody, Request, Response, Server};
use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{convert::Infallible, future::Future, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::sync::Mutex;
use tracing_subscriber::filter::EnvFilter;
use viceroy_lib::{
Expand Down Expand Up @@ -126,7 +126,17 @@ impl Test {
HostFn: Fn(Request<Vec<u8>>) -> Response<Vec<u8>>,
HostFn: Send + Sync + 'static,
{
let service = Arc::new(service);
let service = Arc::new(TestService::Sync(Arc::new(service)));
self.hosts.push(HostSpec { port, service });
self
}

pub fn async_host<HostFn>(mut self, port: u16, service: HostFn) -> Self
where
HostFn: Fn(Request<HyperBody>) -> AsyncResp,
HostFn: Send + Sync + 'static,
{
let service = Arc::new(TestService::Async(Arc::new(service)));
self.hosts.push(HostSpec { port, service });
self
}
Expand Down Expand Up @@ -273,7 +283,7 @@ impl Test {
/// The specification of a mock host, as part of a `Test` builder.
struct HostSpec {
port: u16,
service: Arc<dyn Fn(Request<Vec<u8>>) -> Response<Vec<u8>> + Send + Sync>,
service: Arc<TestService>,
}

/// A handle to a running mock host, used to gracefully shut down the host on test completion.
Expand All @@ -292,21 +302,26 @@ impl HostSpec {
// we transform `service` into an async function that consumes Hyper bodies. that requires a bit
// of `Arc` and `move` operations because each invocation needs to produce a distinct `Future`
let async_service = Arc::new(move |req: Request<HyperBody>| {
let (parts, body) = req.into_parts();
let mut body = Box::new(body); // for pinning
let service = service.clone();

async move {
// read out all of the bytes from the body into a vector, then re-assemble the request
let mut body_bytes = Vec::new();
while let Some(chunk) = body.next().await {
body_bytes.extend_from_slice(&chunk.unwrap());
}
let req = Request::from_parts(parts, body_bytes);

// pass the request through the host function, then convert its body into the form
// that Hyper wants
let resp = service(req).map(HyperBody::from);
let resp = match &*service {
TestService::Sync(s) => {
let (parts, body) = req.into_parts();
let mut body = Box::new(body); // for pinning
// read out all of the bytes from the body into a vector, then re-assemble the request
let mut body_bytes = Vec::new();
while let Some(chunk) = body.next().await {
body_bytes.extend_from_slice(&chunk.unwrap());
}
let req = Request::from_parts(parts, body_bytes);

// pass the request through the host function, then convert its body into the form
// that Hyper wants
s(req).map(HyperBody::from)
}
TestService::Async(s) => Box::into_pin(s(req)).await.map(HyperBody::from),
};

let res: Result<_, hyper::Error> = Ok(resp);
res
Expand Down Expand Up @@ -345,8 +360,16 @@ impl HostHandle {
self.terminate_signal
.send(())
.expect("could not send terminate signal to mock host");
self.task_handle
.await
.expect("mock host did not terminate cleanly")
if !self.task_handle.is_finished() {
self.task_handle.abort();
}
}
}

#[derive(Clone)]
pub enum TestService {
Sync(Arc<dyn Fn(Request<Vec<u8>>) -> Response<Vec<u8>> + Send + Sync>),
Async(Arc<dyn Fn(Request<HyperBody>) -> AsyncResp + Send + Sync>),
}

type AsyncResp = Box<dyn Future<Output = Response<HyperBody>> + Send + Sync>;
1 change: 1 addition & 0 deletions cli/tests/integration/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod async_io;
mod body;
mod common;
mod dictionary_lookup;
Expand Down
Loading

0 comments on commit dc9e8d9

Please sign in to comment.