diff --git a/crates/test-programs/wasi-http-proxy-tests/src/lib.rs b/crates/test-programs/wasi-http-proxy-tests/src/lib.rs index f1d7a65a8767..bb76f9c41e29 100644 --- a/crates/test-programs/wasi-http-proxy-tests/src/lib.rs +++ b/crates/test-programs/wasi-http-proxy-tests/src/lib.rs @@ -15,9 +15,7 @@ use bindings::wasi::http::types::{IncomingRequest, ResponseOutparam}; struct T; impl bindings::exports::wasi::http::incoming_handler::Guest for T { - fn handle(request: IncomingRequest, outparam: ResponseOutparam) { - let method = bindings::wasi::http::types::incoming_request_method(request); - + fn handle(_request: IncomingRequest, outparam: ResponseOutparam) { let hdrs = bindings::wasi::http::types::new_fields(&[]); let resp = bindings::wasi::http::types::new_outgoing_response(200, hdrs); let body = @@ -29,6 +27,6 @@ impl bindings::exports::wasi::http::incoming_handler::Guest for T { bindings::wasi::io::streams::blocking_write_and_flush(out, b"hello, world!") .expect("writing response"); - println!("handling method: {method:?}!"); + bindings::wasi::http::types::outgoing_body_finish(body, None); } } diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index 7eca1948045b..fa021a3ddd61 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -4,7 +4,6 @@ use bytes::Bytes; use http_body_util::combinators::BoxBody; use std::future::Future; use std::{ - convert::Infallible, pin::Pin, sync::{Arc, Mutex}, time::Duration, @@ -241,29 +240,32 @@ impl HostFutureTrailers { } } -pub type HyperOutgoingBody = BoxBody; +pub type HyperOutgoingBody = BoxBody; + +pub enum FinishMessage { + Finished, + Trailers(hyper::HeaderMap), + Abort, +} pub struct HostOutgoingBody { pub body_output_stream: Option>, - pub trailers_sender: Option>, + pub finish_sender: Option>, } impl HostOutgoingBody { pub fn new() -> (Self, HyperOutgoingBody) { use http_body_util::BodyExt; - use hyper::{ - body::{Body, Frame}, - HeaderMap, - }; + use hyper::body::{Body, Frame}; use std::task::{Context, Poll}; use tokio::sync::oneshot::error::RecvError; struct BodyImpl { body_receiver: mpsc::Receiver, - trailers_receiver: Option>, + finish_receiver: Option>, } impl Body for BodyImpl { type Data = Bytes; - type Error = Infallible; + type Error = anyhow::Error; fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -274,16 +276,21 @@ impl HostOutgoingBody { // This means that the `body_sender` end of the channel has been dropped. Poll::Ready(None) => { - if let Some(mut trailers_receiver) = self.as_mut().trailers_receiver.take() - { - match Pin::new(&mut trailers_receiver).poll(cx) { + if let Some(mut finish_receiver) = self.as_mut().finish_receiver.take() { + match Pin::new(&mut finish_receiver).poll(cx) { Poll::Pending => { - self.as_mut().trailers_receiver = Some(trailers_receiver); + self.as_mut().finish_receiver = Some(finish_receiver); Poll::Pending } - Poll::Ready(Ok(trailers)) => { - Poll::Ready(Some(Ok(Frame::trailers(trailers)))) - } + Poll::Ready(Ok(message)) => match message { + FinishMessage::Finished => Poll::Ready(None), + FinishMessage::Trailers(trailers) => { + Poll::Ready(Some(Ok(Frame::trailers(trailers)))) + } + FinishMessage::Abort => Poll::Ready(Some(Err( + anyhow::anyhow!("response corrupted"), + ))), + }, Poll::Ready(Err(RecvError { .. })) => Poll::Ready(None), } } else { @@ -295,17 +302,17 @@ impl HostOutgoingBody { } let (body_sender, body_receiver) = mpsc::channel(1); - let (trailers_sender, trailers_receiver) = oneshot::channel(); + let (finish_sender, finish_receiver) = oneshot::channel(); let body_impl = BodyImpl { body_receiver, - trailers_receiver: Some(trailers_receiver), + finish_receiver: Some(finish_receiver), } .boxed(); ( Self { // TODO: this capacity constant is arbitrary, and should be configurable body_output_stream: Some(Box::new(BodyWriteStream::new(1024 * 1024, body_sender))), - trailers_sender: Some(trailers_sender), + finish_sender: Some(finish_sender), }, body_impl, ) diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index fb9f0b18b768..a88c1996fa72 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -81,7 +81,11 @@ impl outgoing_handler::Host for T { builder = builder.header(k, v); } - let body = req.body.unwrap_or_else(|| Empty::::new().boxed()); + let body = req.body.unwrap_or_else(|| { + Empty::::new() + .map_err(|_| anyhow::anyhow!("empty error")) + .boxed() + }); let request = builder.body(body).map_err(http_protocol_error)?; diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index 9b8b28f48a70..49baba25aac1 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -107,7 +107,11 @@ impl TryFrom for hyper::Response { match resp.body { Some(body) => builder.body(body), - None => builder.body(Empty::::new().boxed()), + None => builder.body( + Empty::::new() + .map_err(|_| anyhow::anyhow!("empty error")) + .boxed(), + ), } } } diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 8722b7b699b9..4baea8f364bf 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -3,7 +3,7 @@ use crate::bindings::http::types::{ IncomingResponse, Method, OutgoingBody, OutgoingRequest, OutgoingResponse, ResponseOutparam, Scheme, StatusCode, Trailers, }; -use crate::body::{HostFutureTrailers, HostFutureTrailersState}; +use crate::body::{FinishMessage, HostFutureTrailers, HostFutureTrailersState}; use crate::types::{HostIncomingRequest, HostOutgoingResponse}; use crate::WasiHttpView; use crate::{ @@ -13,7 +13,7 @@ use crate::{ HostOutgoingRequest, TableHttpExt, }, }; -use anyhow::{anyhow, Context}; +use anyhow::Context; use std::any::Any; use wasmtime_wasi::preview2::{ bindings::io::streams::{InputStream, OutputStream}, @@ -533,32 +533,41 @@ impl crate::bindings::http::types::Host for T { } } - fn outgoing_body_write_trailers( + fn outgoing_body_finish( &mut self, id: OutgoingBody, - ts: Trailers, + ts: Option, ) -> wasmtime::Result<()> { let mut body = self.table().delete_outgoing_body(id)?; - let trailers = self.table().get_fields(ts)?.clone(); - match body - .trailers_sender + let sender = body + .finish_sender .take() - // Should be unreachable - this is the only place we take the trailers sender, - // at the end of the HostOutgoingBody's lifetime - .ok_or_else(|| anyhow!("trailers_sender missing"))? - .send(trailers.into()) - { - Ok(()) => {} - Err(_) => {} // Ignoring failure: receiver died sending body, but we can't report that - // here. - } + .expect("outgoing-body trailer_sender consumed by a non-owning function"); + + let message = if let Some(ts) = ts { + FinishMessage::Trailers(self.table().get_fields(ts)?.clone().into()) + } else { + FinishMessage::Finished + }; + + // Ignoring failure: receiver died sending body, but we can't report that here. + let _ = sender.send(message.into()); Ok(()) } fn drop_outgoing_body(&mut self, id: OutgoingBody) -> wasmtime::Result<()> { - let _ = self.table().delete_outgoing_body(id)?; + let mut body = self.table().delete_outgoing_body(id)?; + + let sender = body + .finish_sender + .take() + .expect("outgoing-body trailer_sender consumed by a non-owning function"); + + // Ignoring failure: receiver died sending body, but we can't report that here. + let _ = sender.send(FinishMessage::Abort); + Ok(()) } } diff --git a/crates/wasi-http/wit/deps/http/types.wit b/crates/wasi-http/wit/deps/http/types.wit index a4105cd5e806..3743e2d3a276 100644 --- a/crates/wasi-http/wit/deps/http/types.wit +++ b/crates/wasi-http/wit/deps/http/types.wit @@ -174,16 +174,18 @@ interface types { /// Will give the child outgoing-response at most once. subsequent calls will /// return an error. - outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result + outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result type outgoing-body = u32 drop-outgoing-body: func(this: /* own */ outgoing-body) /// Will give the child output-stream at most once. subsequent calls will /// return an error. outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result - /// Write trailers as the way to finish an outgoing-body. To finish an - /// outgoing-body without writing trailers, use drop-outgoing-body. - outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers) + /// Finalize an outgoing body, optionally providing trailers. This must be + /// called to signal that the response is complete. If the `outgoing-body` is + /// dropped without calling `outgoing-body-finalize`, the implementation + /// should treat the body as corrupted. + outgoing-body-finish: func(this: /* own */ outgoing-body, trailers: /* own */ option) /// The following block defines a special resource type used by the /// `wasi:http/outgoing-handler` interface to emulate diff --git a/crates/wasi/wit/deps/http/types.wit b/crates/wasi/wit/deps/http/types.wit index a4105cd5e806..3743e2d3a276 100644 --- a/crates/wasi/wit/deps/http/types.wit +++ b/crates/wasi/wit/deps/http/types.wit @@ -174,16 +174,18 @@ interface types { /// Will give the child outgoing-response at most once. subsequent calls will /// return an error. - outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result + outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result type outgoing-body = u32 drop-outgoing-body: func(this: /* own */ outgoing-body) /// Will give the child output-stream at most once. subsequent calls will /// return an error. outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result - /// Write trailers as the way to finish an outgoing-body. To finish an - /// outgoing-body without writing trailers, use drop-outgoing-body. - outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers) + /// Finalize an outgoing body, optionally providing trailers. This must be + /// called to signal that the response is complete. If the `outgoing-body` is + /// dropped without calling `outgoing-body-finalize`, the implementation + /// should treat the body as corrupted. + outgoing-body-finish: func(this: /* own */ outgoing-body, trailers: /* own */ option) /// The following block defines a special resource type used by the /// `wasi:http/outgoing-handler` interface to emulate diff --git a/src/commands/serve.rs b/src/commands/serve.rs index ecba262cc734..5c177524af44 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -167,24 +167,25 @@ impl hyper::service::Service for ProxyHandler { let handler = self.clone(); Box::pin(async move { - let host = Host::new()?; - let mut store = Store::new(&handler.engine, host); - - let req = store.data_mut().new_incoming_request( - req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()), - )?; - let (sender, receiver) = tokio::sync::oneshot::channel(); - let out = store.data_mut().new_response_outparam(sender)?; - - let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre( - &mut store, - &handler.instance_pre, - ) - .await?; // TODO: need to track the join handle, but don't want to block the response on it tokio::task::spawn(async move { + let host = Host::new()?; + let mut store = Store::new(&handler.engine, host); + + let req = store.data_mut().new_incoming_request( + req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()), + )?; + + let out = store.data_mut().new_response_outparam(sender)?; + + let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre( + &mut store, + &handler.instance_pre, + ) + .await?; + proxy .wasi_http_incoming_handler() .call_handle(store, req, out)