Skip to content

Commit

Permalink
Add a method to finish outgoing responses
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Foltzer <acfoltzer@fastly.com>
Co-authored-by: Pat Hickey <phickey@fastly.com>
  • Loading branch information
3 people committed Sep 28, 2023
1 parent a6115d8 commit f0b74dc
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 64 deletions.
6 changes: 2 additions & 4 deletions crates/test-programs/wasi-http-proxy-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use bindings::wasi::http::types::{IncomingRequest, ResponseOutparam};
struct T;

impl bindings::exports::wasi::http::incoming_handler::Guest for T {
fn handle(request: IncomingRequest, outparam: ResponseOutparam) {
let method = bindings::wasi::http::types::incoming_request_method(request);

fn handle(_request: IncomingRequest, outparam: ResponseOutparam) {
let hdrs = bindings::wasi::http::types::new_fields(&[]);
let resp = bindings::wasi::http::types::new_outgoing_response(200, hdrs);
let body =
Expand All @@ -29,6 +27,6 @@ impl bindings::exports::wasi::http::incoming_handler::Guest for T {
bindings::wasi::io::streams::blocking_write_and_flush(out, b"hello, world!")
.expect("writing response");

println!("handling method: {method:?}!");
bindings::wasi::http::types::outgoing_body_finish(body, None);
}
}
45 changes: 26 additions & 19 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use std::future::Future;
use std::{
convert::Infallible,
pin::Pin,
sync::{Arc, Mutex},
time::Duration,
Expand Down Expand Up @@ -241,29 +240,32 @@ impl HostFutureTrailers {
}
}

pub type HyperOutgoingBody = BoxBody<Bytes, Infallible>;
pub type HyperOutgoingBody = BoxBody<Bytes, anyhow::Error>;

pub enum FinishMessage {
Finished,
Trailers(hyper::HeaderMap),
Abort,
}

pub struct HostOutgoingBody {
pub body_output_stream: Option<Box<dyn HostOutputStream>>,
pub trailers_sender: Option<tokio::sync::oneshot::Sender<hyper::HeaderMap>>,
pub finish_sender: Option<tokio::sync::oneshot::Sender<FinishMessage>>,
}

impl HostOutgoingBody {
pub fn new() -> (Self, HyperOutgoingBody) {
use http_body_util::BodyExt;
use hyper::{
body::{Body, Frame},
HeaderMap,
};
use hyper::body::{Body, Frame};
use std::task::{Context, Poll};
use tokio::sync::oneshot::error::RecvError;
struct BodyImpl {
body_receiver: mpsc::Receiver<Bytes>,
trailers_receiver: Option<oneshot::Receiver<HeaderMap>>,
finish_receiver: Option<oneshot::Receiver<FinishMessage>>,
}
impl Body for BodyImpl {
type Data = Bytes;
type Error = Infallible;
type Error = anyhow::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -274,16 +276,21 @@ impl HostOutgoingBody {

// This means that the `body_sender` end of the channel has been dropped.
Poll::Ready(None) => {
if let Some(mut trailers_receiver) = self.as_mut().trailers_receiver.take()
{
match Pin::new(&mut trailers_receiver).poll(cx) {
if let Some(mut finish_receiver) = self.as_mut().finish_receiver.take() {
match Pin::new(&mut finish_receiver).poll(cx) {
Poll::Pending => {
self.as_mut().trailers_receiver = Some(trailers_receiver);
self.as_mut().finish_receiver = Some(finish_receiver);
Poll::Pending
}
Poll::Ready(Ok(trailers)) => {
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
}
Poll::Ready(Ok(message)) => match message {
FinishMessage::Finished => Poll::Ready(None),
FinishMessage::Trailers(trailers) => {
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
}
FinishMessage::Abort => Poll::Ready(Some(Err(
anyhow::anyhow!("response corrupted"),
))),
},
Poll::Ready(Err(RecvError { .. })) => Poll::Ready(None),
}
} else {
Expand All @@ -295,17 +302,17 @@ impl HostOutgoingBody {
}

let (body_sender, body_receiver) = mpsc::channel(1);
let (trailers_sender, trailers_receiver) = oneshot::channel();
let (finish_sender, finish_receiver) = oneshot::channel();
let body_impl = BodyImpl {
body_receiver,
trailers_receiver: Some(trailers_receiver),
finish_receiver: Some(finish_receiver),
}
.boxed();
(
Self {
// TODO: this capacity constant is arbitrary, and should be configurable
body_output_stream: Some(Box::new(BodyWriteStream::new(1024 * 1024, body_sender))),
trailers_sender: Some(trailers_sender),
finish_sender: Some(finish_sender),
},
body_impl,
)
Expand Down
6 changes: 5 additions & 1 deletion crates/wasi-http/src/http_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl<T: WasiHttpView> outgoing_handler::Host for T {
builder = builder.header(k, v);
}

let body = req.body.unwrap_or_else(|| Empty::<Bytes>::new().boxed());
let body = req.body.unwrap_or_else(|| {
Empty::<Bytes>::new()
.map_err(|_| anyhow::anyhow!("empty error"))
.boxed()
});

let request = builder.body(body).map_err(http_protocol_error)?;

Expand Down
6 changes: 5 additions & 1 deletion crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ impl TryFrom<HostOutgoingResponse> for hyper::Response<HyperOutgoingBody> {

match resp.body {
Some(body) => builder.body(body),
None => builder.body(Empty::<bytes::Bytes>::new().boxed()),
None => builder.body(
Empty::<bytes::Bytes>::new()
.map_err(|_| anyhow::anyhow!("empty error"))
.boxed(),
),
}
}
}
Expand Down
43 changes: 26 additions & 17 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::bindings::http::types::{
IncomingResponse, Method, OutgoingBody, OutgoingRequest, OutgoingResponse, ResponseOutparam,
Scheme, StatusCode, Trailers,
};
use crate::body::{HostFutureTrailers, HostFutureTrailersState};
use crate::body::{FinishMessage, HostFutureTrailers, HostFutureTrailersState};
use crate::types::{HostIncomingRequest, HostOutgoingResponse};
use crate::WasiHttpView;
use crate::{
Expand All @@ -13,7 +13,7 @@ use crate::{
HostOutgoingRequest, TableHttpExt,
},
};
use anyhow::{anyhow, Context};
use anyhow::Context;
use std::any::Any;
use wasmtime_wasi::preview2::{
bindings::io::streams::{InputStream, OutputStream},
Expand Down Expand Up @@ -533,32 +533,41 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
}
}

fn outgoing_body_write_trailers(
fn outgoing_body_finish(
&mut self,
id: OutgoingBody,
ts: Trailers,
ts: Option<Trailers>,
) -> wasmtime::Result<()> {
let mut body = self.table().delete_outgoing_body(id)?;
let trailers = self.table().get_fields(ts)?.clone();

match body
.trailers_sender
let sender = body
.finish_sender
.take()
// Should be unreachable - this is the only place we take the trailers sender,
// at the end of the HostOutgoingBody's lifetime
.ok_or_else(|| anyhow!("trailers_sender missing"))?
.send(trailers.into())
{
Ok(()) => {}
Err(_) => {} // Ignoring failure: receiver died sending body, but we can't report that
// here.
}
.expect("outgoing-body trailer_sender consumed by a non-owning function");

let message = if let Some(ts) = ts {
FinishMessage::Trailers(self.table().get_fields(ts)?.clone().into())
} else {
FinishMessage::Finished
};

// Ignoring failure: receiver died sending body, but we can't report that here.
let _ = sender.send(message.into());

Ok(())
}

fn drop_outgoing_body(&mut self, id: OutgoingBody) -> wasmtime::Result<()> {
let _ = self.table().delete_outgoing_body(id)?;
let mut body = self.table().delete_outgoing_body(id)?;

let sender = body
.finish_sender
.take()
.expect("outgoing-body trailer_sender consumed by a non-owning function");

// Ignoring failure: receiver died sending body, but we can't report that here.
let _ = sender.send(FinishMessage::Abort);

Ok(())
}
}
10 changes: 6 additions & 4 deletions crates/wasi-http/wit/deps/http/types.wit
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,18 @@ interface types {

/// Will give the child outgoing-response at most once. subsequent calls will
/// return an error.
outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result</* child */ outgoing-body>
outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result</* own */ outgoing-body>

type outgoing-body = u32
drop-outgoing-body: func(this: /* own */ outgoing-body)
/// Will give the child output-stream at most once. subsequent calls will
/// return an error.
outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result</* child */ output-stream>
/// Write trailers as the way to finish an outgoing-body. To finish an
/// outgoing-body without writing trailers, use drop-outgoing-body.
outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers)
/// Finalize an outgoing body, optionally providing trailers. This must be
/// called to signal that the response is complete. If the `outgoing-body` is
/// dropped without calling `outgoing-body-finalize`, the implementation
/// should treat the body as corrupted.
outgoing-body-finish: func(this: /* own */ outgoing-body, trailers: /* own */ option<trailers>)

/// The following block defines a special resource type used by the
/// `wasi:http/outgoing-handler` interface to emulate
Expand Down
10 changes: 6 additions & 4 deletions crates/wasi/wit/deps/http/types.wit
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,18 @@ interface types {

/// Will give the child outgoing-response at most once. subsequent calls will
/// return an error.
outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result</* child */ outgoing-body>
outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result</* own */ outgoing-body>

type outgoing-body = u32
drop-outgoing-body: func(this: /* own */ outgoing-body)
/// Will give the child output-stream at most once. subsequent calls will
/// return an error.
outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result</* child */ output-stream>
/// Write trailers as the way to finish an outgoing-body. To finish an
/// outgoing-body without writing trailers, use drop-outgoing-body.
outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers)
/// Finalize an outgoing body, optionally providing trailers. This must be
/// called to signal that the response is complete. If the `outgoing-body` is
/// dropped without calling `outgoing-body-finalize`, the implementation
/// should treat the body as corrupted.
outgoing-body-finish: func(this: /* own */ outgoing-body, trailers: /* own */ option<trailers>)

/// The following block defines a special resource type used by the
/// `wasi:http/outgoing-handler` interface to emulate
Expand Down
29 changes: 15 additions & 14 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,25 @@ impl hyper::service::Service<Request> for ProxyHandler {
let handler = self.clone();

Box::pin(async move {
let host = Host::new()?;
let mut store = Store::new(&handler.engine, host);

let req = store.data_mut().new_incoming_request(
req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()),
)?;

let (sender, receiver) = tokio::sync::oneshot::channel();
let out = store.data_mut().new_response_outparam(sender)?;

let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre(
&mut store,
&handler.instance_pre,
)
.await?;

// TODO: need to track the join handle, but don't want to block the response on it
tokio::task::spawn(async move {
let host = Host::new()?;
let mut store = Store::new(&handler.engine, host);

let req = store.data_mut().new_incoming_request(
req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()),
)?;

let out = store.data_mut().new_response_outparam(sender)?;

let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre(
&mut store,
&handler.instance_pre,
)
.await?;

proxy
.wasi_http_incoming_handler()
.call_handle(store, req, out)
Expand Down

0 comments on commit f0b74dc

Please sign in to comment.