diff --git a/Cargo.lock b/Cargo.lock index 63a5aab2b99c..545b526d217f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2466,6 +2466,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "similar" version = "2.2.0" @@ -2659,6 +2668,7 @@ name = "test-programs" version = "0.0.0" dependencies = [ "anyhow", + "bytes", "cap-rand", "cap-std", "cargo_metadata", @@ -2750,6 +2760,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", @@ -3042,6 +3053,13 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "wasi-http-proxy-tests" +version = "0.0.0" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasi-http-tests" version = "0.0.0" @@ -3394,6 +3412,9 @@ dependencies = [ "criterion", "env_logger 0.10.0", "filecheck", + "http-body", + "http-body-util", + "hyper", "libc", "listenfd", "log", @@ -3762,7 +3783,6 @@ dependencies = [ "http-body-util", "hyper", "rustls", - "thiserror", "tokio", "tokio-rustls", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 5c823d258df9..98d9335576ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,12 @@ serde_json = { workspace = true } wasmparser = { workspace = true } wasm-encoder = { workspace = true } +async-trait = { workspace = true } +tokio = { workspace = true, optional = true, features = [ "signal", "macros" ] } +hyper = { workspace = true, optional = true } +http-body = { workspace = true } +http-body-util = { workspace = true } + [target.'cfg(unix)'.dependencies] rustix = { workspace = true, features = ["mm", "param"] } @@ -60,7 +66,7 @@ filecheck = { workspace = true } tempfile = { workspace = true } test-programs = { path = "crates/test-programs" } wasmtime-runtime = { workspace = true } -tokio = { version = "1.8.0", features = ["rt", "time", "macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["rt", "time", "macros", "rt-multi-thread"] } wast = { workspace = true } criterion = "0.5.0" num_cpus = "1.13.0" @@ -101,6 +107,7 @@ members = [ "crates/jit-icache-coherence", "crates/test-programs/wasi-tests", "crates/test-programs/wasi-http-tests", + "crates/test-programs/wasi-http-proxy-tests", "crates/test-programs/wasi-sockets-tests", "crates/test-programs/command-tests", "crates/test-programs/reactor-tests", @@ -251,7 +258,11 @@ tempfile = "3.1.0" filecheck = "0.5.0" libc = "0.2.60" file-per-thread-logger = "0.2.0" -tokio = { version = "1.26.0" } +tokio = { version = "1.26.0", features = [ "rt", "time" ] } +hyper = "=1.0.0-rc.3" +http = "0.2.9" +http-body = "=1.0.0-rc.2" +http-body-util = "=0.1.0-rc.2" bytes = "1.4" futures = { version = "0.3.27", default-features = false } indexmap = "2.0.0" @@ -275,7 +286,7 @@ jitdump = ["wasmtime/jitdump"] vtune = ["wasmtime/vtune"] wasi-nn = ["dep:wasmtime-wasi-nn"] wasi-threads = ["dep:wasmtime-wasi-threads"] -wasi-http = ["dep:wasmtime-wasi-http", "wasmtime-wasi-http?/sync"] +wasi-http = ["component-model", "dep:wasmtime-wasi-http", "dep:tokio", "dep:hyper", "wasmtime-wasi-http?/sync"] pooling-allocator = ["wasmtime/pooling-allocator", "wasmtime-cli-flags/pooling-allocator"] all-arch = ["wasmtime/all-arch"] component-model = [ @@ -286,6 +297,9 @@ component-model = [ winch = ["wasmtime/winch"] wmemcheck = ["wasmtime/wmemcheck"] +# Enable the `wasmtime serve` command +serve = ["wasi-http", "component-model"] + [[test]] name = "host_segfault" harness = false diff --git a/crates/test-programs/Cargo.toml b/crates/test-programs/Cargo.toml index 4808b8e462e7..12ad3188e2ef 100644 --- a/crates/test-programs/Cargo.toml +++ b/crates/test-programs/Cargo.toml @@ -15,6 +15,7 @@ heck = { workspace = true } [dependencies] anyhow = { workspace = true } +bytes = { workspace = true } http = { version = "0.2.9" } http-body = "1.0.0-rc.2" http-body-util = "0.1.0-rc.2" diff --git a/crates/test-programs/build.rs b/crates/test-programs/build.rs index 4062b02fea13..0de514283caa 100644 --- a/crates/test-programs/build.rs +++ b/crates/test-programs/build.rs @@ -33,6 +33,7 @@ fn build_and_generate_tests() { println!("cargo:rerun-if-changed=./wasi-sockets-tests"); if BUILD_WASI_HTTP_TESTS { println!("cargo:rerun-if-changed=./wasi-http-tests"); + println!("cargo:rerun-if-changed=./wasi-http-proxy-tests"); } else { println!("cargo:rustc-cfg=skip_wasi_http_tests"); } @@ -50,6 +51,7 @@ fn build_and_generate_tests() { .env_remove("CARGO_ENCODED_RUSTFLAGS"); if BUILD_WASI_HTTP_TESTS { cmd.arg("--package=wasi-http-tests"); + cmd.arg("--package=wasi-http-proxy-tests"); } let status = cmd.status().unwrap(); assert!(status.success()); @@ -60,8 +62,14 @@ fn build_and_generate_tests() { components_rs(&meta, "wasi-tests", "bin", &command_adapter, &out_dir); if BUILD_WASI_HTTP_TESTS { - modules_rs(&meta, "wasi-http-tests", "bin", &out_dir); components_rs(&meta, "wasi-http-tests", "bin", &command_adapter, &out_dir); + components_rs( + &meta, + "wasi-http-proxy-tests", + "cdylib", + &reactor_adapter, + &out_dir, + ); } components_rs(&meta, "command-tests", "bin", &command_adapter, &out_dir); diff --git a/crates/test-programs/tests/wasi-http-components-sync.rs b/crates/test-programs/tests/wasi-http-components-sync.rs index 33c638fc6883..222fc800edce 100644 --- a/crates/test-programs/tests/wasi-http-components-sync.rs +++ b/crates/test-programs/tests/wasi-http-components-sync.rs @@ -4,9 +4,8 @@ use wasmtime::{ Config, Engine, Store, }; use wasmtime_wasi::preview2::{ - command::sync::{add_to_linker, Command}, - pipe::MemoryOutputPipe, - IsATTY, Table, WasiCtx, WasiCtxBuilder, WasiView, + command::sync::Command, pipe::MemoryOutputPipe, IsATTY, Table, WasiCtx, WasiCtxBuilder, + WasiView, }; use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; @@ -60,8 +59,7 @@ fn instantiate_component( ctx: Ctx, ) -> Result<(Store, Command), anyhow::Error> { let mut linker = Linker::new(&ENGINE); - add_to_linker(&mut linker)?; - wasmtime_wasi_http::proxy::add_to_linker(&mut linker)?; + wasmtime_wasi_http::proxy::sync::add_to_linker(&mut linker)?; let mut store = Store::new(&ENGINE, ctx); diff --git a/crates/test-programs/tests/wasi-http-components.rs b/crates/test-programs/tests/wasi-http-components.rs index 8d1fa1f96c52..11fb6bd498ca 100644 --- a/crates/test-programs/tests/wasi-http-components.rs +++ b/crates/test-programs/tests/wasi-http-components.rs @@ -4,9 +4,7 @@ use wasmtime::{ Config, Engine, Store, }; use wasmtime_wasi::preview2::{ - command::{add_to_linker, Command}, - pipe::MemoryOutputPipe, - IsATTY, Table, WasiCtx, WasiCtxBuilder, WasiView, + command::Command, pipe::MemoryOutputPipe, IsATTY, Table, WasiCtx, WasiCtxBuilder, WasiView, }; use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; @@ -60,7 +58,6 @@ async fn instantiate_component( ctx: Ctx, ) -> Result<(Store, Command), anyhow::Error> { let mut linker = Linker::new(&ENGINE); - add_to_linker(&mut linker)?; wasmtime_wasi_http::proxy::add_to_linker(&mut linker)?; let mut store = Store::new(&ENGINE, ctx); diff --git a/crates/test-programs/tests/wasi-http-proxy.rs b/crates/test-programs/tests/wasi-http-proxy.rs new file mode 100644 index 000000000000..f344dc15071f --- /dev/null +++ b/crates/test-programs/tests/wasi-http-proxy.rs @@ -0,0 +1,146 @@ +#![cfg(all(feature = "test_programs", not(skip_wasi_http_tests)))] +use anyhow::Context; +use wasmtime::{ + component::{Component, Linker}, + Config, Engine, Store, +}; +use wasmtime_wasi::preview2::{ + self, pipe::MemoryOutputPipe, IsATTY, Table, WasiCtx, WasiCtxBuilder, WasiView, +}; +use wasmtime_wasi_http::{proxy::Proxy, WasiHttpCtx, WasiHttpView}; + +lazy_static::lazy_static! { + static ref ENGINE: Engine = { + let mut config = Config::new(); + config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + config.wasm_component_model(true); + config.async_support(true); + let engine = Engine::new(&config).unwrap(); + engine + }; +} +// uses ENGINE, creates a fn get_module(&str) -> Module +include!(concat!( + env!("OUT_DIR"), + "/wasi_http_proxy_tests_components.rs" +)); + +struct Ctx { + table: Table, + wasi: WasiCtx, + http: WasiHttpCtx, +} + +impl WasiView for Ctx { + fn table(&self) -> &Table { + &self.table + } + fn table_mut(&mut self) -> &mut Table { + &mut self.table + } + fn ctx(&self) -> &WasiCtx { + &self.wasi + } + fn ctx_mut(&mut self) -> &mut WasiCtx { + &mut self.wasi + } +} + +impl WasiHttpView for Ctx { + fn table(&mut self) -> &mut Table { + &mut self.table + } + fn ctx(&mut self) -> &mut WasiHttpCtx { + &mut self.http + } +} + +async fn instantiate(component: Component, ctx: Ctx) -> Result<(Store, Proxy), anyhow::Error> { + let mut linker = Linker::new(&ENGINE); + wasmtime_wasi_http::proxy::add_to_linker(&mut linker)?; + + let mut store = Store::new(&ENGINE, ctx); + + let (proxy, _instance) = Proxy::instantiate_async(&mut store, &component, &linker).await?; + Ok((store, proxy)) +} + +#[test_log::test(tokio::test)] +async fn wasi_http_proxy_tests() -> anyhow::Result<()> { + let stdout = MemoryOutputPipe::new(4096); + let stderr = MemoryOutputPipe::new(4096); + + let mut table = Table::new(); + let component = get_component("wasi_http_proxy_tests"); + + // Create our wasi context. + let mut builder = WasiCtxBuilder::new(); + builder.stdout(stdout.clone(), IsATTY::No); + builder.stderr(stderr.clone(), IsATTY::No); + for (var, val) in test_programs::wasi_tests_environment() { + builder.env(var, val); + } + let wasi = builder.build(&mut table)?; + let http = WasiHttpCtx; + + let ctx = Ctx { table, wasi, http }; + + let (mut store, proxy) = instantiate(component, ctx).await?; + + let req = { + use http_body_util::{BodyExt, Empty}; + + let req = hyper::Request::builder().method(http::Method::GET).body( + Empty::::new() + .map_err(|e| anyhow::anyhow!(e)) + .boxed(), + )?; + store.data_mut().new_incoming_request(req)? + }; + + let (sender, receiver) = tokio::sync::oneshot::channel(); + let out = store.data_mut().new_response_outparam(sender)?; + + let handle = preview2::spawn(async move { + proxy + .wasi_http_incoming_handler() + .call_handle(&mut store, req, out) + .await?; + + Ok::<_, anyhow::Error>(()) + }); + + let resp = match receiver.await { + Ok(Ok(resp)) => { + use http_body_util::BodyExt; + let (parts, body) = resp.into_parts(); + let collected = BodyExt::collect(body).await?; + Ok(hyper::Response::from_parts(parts, collected)) + } + + Ok(Err(e)) => Err(e), + + // This happens if the wasm never calls `set-response-outparam` + Err(e) => panic!("Failed to receive a response: {e:?}"), + }; + + // Now that the response has been processed, we can wait on the wasm to finish without + // deadlocking. + handle.await.context("Component execution")?; + + let stdout = stdout.contents(); + if !stdout.is_empty() { + println!("[guest] stdout:\n{}\n===", String::from_utf8_lossy(&stdout)); + } + let stderr = stderr.contents(); + if !stderr.is_empty() { + println!("[guest] stderr:\n{}\n===", String::from_utf8_lossy(&stderr)); + } + + match resp { + Ok(resp) => println!("response: {resp:?}"), + Err(e) => panic!("Error given in response: {e:?}"), + }; + + Ok(()) +} diff --git a/crates/test-programs/wasi-http-proxy-tests/Cargo.toml b/crates/test-programs/wasi-http-proxy-tests/Cargo.toml new file mode 100644 index 000000000000..fc87dc1bce7d --- /dev/null +++ b/crates/test-programs/wasi-http-proxy-tests/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "wasi-http-proxy-tests" +version = "0.0.0" +readme = "README.md" +edition = "2021" +publish = false + +[lib] +crate-type=["cdylib"] + +[dependencies] +wit-bindgen = { workspace = true, features = ["macros", "realloc"] } diff --git a/crates/test-programs/wasi-http-proxy-tests/src/lib.rs b/crates/test-programs/wasi-http-proxy-tests/src/lib.rs new file mode 100644 index 000000000000..239f1754327d --- /dev/null +++ b/crates/test-programs/wasi-http-proxy-tests/src/lib.rs @@ -0,0 +1,33 @@ +pub mod bindings { + use super::T; + + wit_bindgen::generate!({ + path: "../../wasi-http/wit", + world: "wasi:http/proxy", + exports: { + "wasi:http/incoming-handler": T, + }, + }); +} + +use bindings::wasi::http::types::{IncomingRequest, ResponseOutparam}; + +struct T; + +impl bindings::exports::wasi::http::incoming_handler::Guest for T { + fn handle(_request: IncomingRequest, outparam: ResponseOutparam) { + let hdrs = bindings::wasi::http::types::new_fields(&[]); + let resp = bindings::wasi::http::types::new_outgoing_response(200, hdrs); + let body = + bindings::wasi::http::types::outgoing_response_write(resp).expect("outgoing response"); + + bindings::wasi::http::types::set_response_outparam(outparam, Ok(resp)); + + let out = bindings::wasi::http::types::outgoing_body_write(body).expect("outgoing stream"); + bindings::wasi::io::streams::blocking_write_and_flush(out, b"hello, world!") + .expect("writing response"); + + bindings::wasi::io::streams::drop_output_stream(out); + bindings::wasi::http::types::outgoing_body_finish(body, None); + } +} diff --git a/crates/test-programs/wasi-http-tests/src/lib.rs b/crates/test-programs/wasi-http-tests/src/lib.rs index d516af03da16..e2ae84de6998 100644 --- a/crates/test-programs/wasi-http-tests/src/lib.rs +++ b/crates/test-programs/wasi-http-tests/src/lib.rs @@ -119,7 +119,7 @@ pub async fn request( // TODO: The current implementation requires this drop after the request is sent. // The ownership semantics are unclear in wasi-http we should clarify exactly what is // supposed to happen here. - http_types::drop_outgoing_body(outgoing_body); + http_types::outgoing_body_finish(outgoing_body, None); let incoming_response = match http_types::future_incoming_response_get(future_response) { Some(result) => result.map_err(|_| anyhow!("incoming response errored"))?, diff --git a/crates/wasi-http/Cargo.toml b/crates/wasi-http/Cargo.toml index 3ebbea483adc..354d00624525 100644 --- a/crates/wasi-http/Cargo.toml +++ b/crates/wasi-http/Cargo.toml @@ -12,16 +12,15 @@ anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } futures = { workspace = true, default-features = false } -hyper = { version = "=1.0.0-rc.3", features = ["full"] } -tokio = { version = "1", default-features = false, features = [ +hyper = { workspace = true, features = ["full"] } +tokio = { workspace = true, features = [ "net", "rt-multi-thread", "time", ] } -http = { version = "0.2.9" } -http-body = "1.0.0-rc.2" -http-body-util = "0.1.0-rc.2" -thiserror = { workspace = true } +http = { workspace = true } +http-body = { workspace = true } +http-body-util = { workspace = true } tracing = { workspace = true } wasmtime-wasi = { workspace = true, default-features = false, features = [ "preview2", diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index 298fe7b92bfe..fa021a3ddd61 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -4,7 +4,6 @@ use bytes::Bytes; use http_body_util::combinators::BoxBody; use std::future::Future; use std::{ - convert::Infallible, pin::Pin, sync::{Arc, Mutex}, time::Duration, @@ -15,11 +14,13 @@ use wasmtime_wasi::preview2::{ StreamRuntimeError, StreamState, }; +pub type HyperIncomingBody = BoxBody; + /// Holds onto the things needed to construct a [`HostIncomingBody`] until we are ready to build /// one. The HostIncomingBody spawns a task that starts consuming the incoming body, and we don't /// want to do that unless the user asks to consume the body. pub struct HostIncomingBodyBuilder { - pub body: hyper::body::Incoming, + pub body: HyperIncomingBody, pub between_bytes_timeout: Duration, } @@ -45,7 +46,7 @@ impl HostIncomingBodyBuilder { Ok(Some(Ok(frame))) => frame, Ok(Some(Err(e))) => { - match body_writer.send(Err(anyhow::anyhow!(e))).await { + match body_writer.send(Err(e)).await { Ok(_) => {} // If the body read end has dropped, then we report this error with the // trailers. unwrap and rewrap Err because the Ok side of these two Results @@ -239,29 +240,32 @@ impl HostFutureTrailers { } } -pub type HyperBody = BoxBody; +pub type HyperOutgoingBody = BoxBody; + +pub enum FinishMessage { + Finished, + Trailers(hyper::HeaderMap), + Abort, +} pub struct HostOutgoingBody { pub body_output_stream: Option>, - pub trailers_sender: Option>, + pub finish_sender: Option>, } impl HostOutgoingBody { - pub fn new() -> (Self, HyperBody) { + pub fn new() -> (Self, HyperOutgoingBody) { use http_body_util::BodyExt; - use hyper::{ - body::{Body, Frame}, - HeaderMap, - }; + use hyper::body::{Body, Frame}; use std::task::{Context, Poll}; use tokio::sync::oneshot::error::RecvError; struct BodyImpl { body_receiver: mpsc::Receiver, - trailers_receiver: Option>, + finish_receiver: Option>, } impl Body for BodyImpl { type Data = Bytes; - type Error = Infallible; + type Error = anyhow::Error; fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -272,16 +276,21 @@ impl HostOutgoingBody { // This means that the `body_sender` end of the channel has been dropped. Poll::Ready(None) => { - if let Some(mut trailers_receiver) = self.as_mut().trailers_receiver.take() - { - match Pin::new(&mut trailers_receiver).poll(cx) { + if let Some(mut finish_receiver) = self.as_mut().finish_receiver.take() { + match Pin::new(&mut finish_receiver).poll(cx) { Poll::Pending => { - self.as_mut().trailers_receiver = Some(trailers_receiver); + self.as_mut().finish_receiver = Some(finish_receiver); Poll::Pending } - Poll::Ready(Ok(trailers)) => { - Poll::Ready(Some(Ok(Frame::trailers(trailers)))) - } + Poll::Ready(Ok(message)) => match message { + FinishMessage::Finished => Poll::Ready(None), + FinishMessage::Trailers(trailers) => { + Poll::Ready(Some(Ok(Frame::trailers(trailers)))) + } + FinishMessage::Abort => Poll::Ready(Some(Err( + anyhow::anyhow!("response corrupted"), + ))), + }, Poll::Ready(Err(RecvError { .. })) => Poll::Ready(None), } } else { @@ -293,17 +302,17 @@ impl HostOutgoingBody { } let (body_sender, body_receiver) = mpsc::channel(1); - let (trailers_sender, trailers_receiver) = oneshot::channel(); + let (finish_sender, finish_receiver) = oneshot::channel(); let body_impl = BodyImpl { body_receiver, - trailers_receiver: Some(trailers_receiver), + finish_receiver: Some(finish_receiver), } .boxed(); ( Self { // TODO: this capacity constant is arbitrary, and should be configurable body_output_stream: Some(Box::new(BodyWriteStream::new(1024 * 1024, body_sender))), - trailers_sender: Some(trailers_sender), + finish_sender: Some(finish_sender), }, body_impl, ) diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 570d229cf7b2..a88c1996fa72 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -2,7 +2,7 @@ use crate::bindings::http::{ outgoing_handler, types::{FutureIncomingResponse, OutgoingRequest, RequestOptions, Scheme}, }; -use crate::types::{HostFutureIncomingResponse, IncomingResponseInternal, TableHttpExt}; +use crate::types::{self, HostFutureIncomingResponse, IncomingResponseInternal, TableHttpExt}; use crate::WasiHttpView; use anyhow::Context; use bytes::Bytes; @@ -37,7 +37,7 @@ impl outgoing_handler::Host for T { .unwrap_or(600 * 1000) as u64, ); - let req = self.table().delete_outgoing_request(request_id)?; + let req = types::OutgoingRequestLens::from(request_id).delete(self.table())?; let method = match req.method { crate::bindings::http::types::Method::Get => Method::GET, @@ -81,7 +81,11 @@ impl outgoing_handler::Host for T { builder = builder.header(k, v); } - let body = req.body.unwrap_or_else(|| Empty::::new().boxed()); + let body = req.body.unwrap_or_else(|| { + Empty::::new() + .map_err(|_| anyhow::anyhow!("empty error")) + .boxed() + }); let request = builder.body(body).map_err(http_protocol_error)?; @@ -159,7 +163,8 @@ impl outgoing_handler::Host for T { let resp = timeout(first_byte_timeout, sender.send_request(request)) .await .map_err(|_| timeout_error("first byte"))? - .map_err(hyper_protocol_error)?; + .map_err(hyper_protocol_error)? + .map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()); Ok(IncomingResponseInternal { resp, diff --git a/crates/wasi-http/src/incoming_handler.rs b/crates/wasi-http/src/incoming_handler.rs deleted file mode 100644 index 3fd26bf7d0af..000000000000 --- a/crates/wasi-http/src/incoming_handler.rs +++ /dev/null @@ -1,12 +0,0 @@ -use crate::bindings::http::types::{IncomingRequest, ResponseOutparam}; -use crate::WasiHttpView; - -impl crate::bindings::http::incoming_handler::Host for T { - fn handle( - &mut self, - _request: IncomingRequest, - _response_out: ResponseOutparam, - ) -> wasmtime::Result<()> { - anyhow::bail!("unimplemented: [incoming_handler] handle") - } -} diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index c6b072b1e60f..b0b07068b61c 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -4,7 +4,6 @@ use std::fmt::{self, Display}; pub mod body; pub mod http_impl; -pub mod incoming_handler; pub mod proxy; pub mod types; pub mod types_impl; diff --git a/crates/wasi-http/src/proxy.rs b/crates/wasi-http/src/proxy.rs index 9f156fd09a14..b276a6545265 100644 --- a/crates/wasi-http/src/proxy.rs +++ b/crates/wasi-http/src/proxy.rs @@ -4,7 +4,7 @@ use wasmtime_wasi::preview2; wasmtime::component::bindgen!({ world: "wasi:http/proxy", tracing: true, - async: false, + async: true, with: { "wasi:cli/stderr": preview2::bindings::cli::stderr, "wasi:cli/stdin": preview2::bindings::cli::stdin, @@ -23,10 +23,53 @@ wasmtime::component::bindgen!({ pub fn add_to_linker(l: &mut wasmtime::component::Linker) -> anyhow::Result<()> where - T: WasiHttpView + bindings::http::types::Host, + T: WasiHttpView + preview2::WasiView + bindings::http::types::Host, { - bindings::http::incoming_handler::add_to_linker(l, |t| t)?; + // TODO: this shouldn't be required, but the adapter unconditionally pulls in all of these + // dependencies. + preview2::command::add_to_linker(l)?; + bindings::http::outgoing_handler::add_to_linker(l, |t| t)?; bindings::http::types::add_to_linker(l, |t| t)?; + Ok(()) } + +pub mod sync { + use crate::{bindings, WasiHttpView}; + use wasmtime_wasi::preview2; + + wasmtime::component::bindgen!({ + world: "wasi:http/proxy", + tracing: true, + async: false, + with: { + "wasi:cli/stderr": preview2::bindings::cli::stderr, + "wasi:cli/stdin": preview2::bindings::cli::stdin, + "wasi:cli/stdout": preview2::bindings::cli::stdout, + "wasi:clocks/monotonic-clock": preview2::bindings::clocks::monotonic_clock, + "wasi:clocks/timezone": preview2::bindings::clocks::timezone, + "wasi:clocks/wall-clock": preview2::bindings::clocks::wall_clock, + "wasi:http/incoming-handler": bindings::http::incoming_handler, + "wasi:http/outgoing-handler": bindings::http::outgoing_handler, + "wasi:http/types": bindings::http::types, + "wasi:io/streams": preview2::bindings::io::streams, + "wasi:poll/poll": preview2::bindings::poll::poll, + "wasi:random/random": preview2::bindings::random::random, + }, + }); + + pub fn add_to_linker(l: &mut wasmtime::component::Linker) -> anyhow::Result<()> + where + T: WasiHttpView + preview2::WasiView + bindings::http::types::Host, + { + // TODO: this shouldn't be required, but the adapter unconditionally pulls in all of these + // dependencies. + preview2::command::sync::add_to_linker(l)?; + + bindings::http::outgoing_handler::add_to_linker(l, |t| t)?; + bindings::http::types::add_to_linker(l, |t| t)?; + + Ok(()) + } +} diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index 3b73f7f3fd49..49baba25aac1 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -2,9 +2,13 @@ //! implementation of the wasi-http API. use crate::{ - bindings::http::types::{FutureTrailers, IncomingBody, Method, OutgoingBody, Scheme}, + bindings::http::types::{ + self, FutureTrailers, IncomingBody, IncomingRequest, Method, OutgoingBody, + OutgoingResponse, ResponseOutparam, Scheme, + }, body::{ - HostFutureTrailers, HostIncomingBody, HostIncomingBodyBuilder, HostOutgoingBody, HyperBody, + HostFutureTrailers, HostIncomingBody, HostIncomingBodyBuilder, HostOutgoingBody, + HyperIncomingBody, HyperOutgoingBody, }, }; use std::any::Any; @@ -18,15 +22,60 @@ pub struct WasiHttpCtx; pub trait WasiHttpView: Send { fn ctx(&mut self) -> &mut WasiHttpCtx; fn table(&mut self) -> &mut Table; + + fn new_incoming_request( + &mut self, + req: hyper::Request, + ) -> wasmtime::Result { + let (parts, body) = req.into_parts(); + let body = HostIncomingBodyBuilder { + body, + // TODO: this needs to be plumbed through + between_bytes_timeout: std::time::Duration::from_millis(600 * 1000), + }; + Ok(IncomingRequestLens::push( + self.table(), + HostIncomingRequest { + parts, + body: Some(body), + }, + )? + .id) + } + + fn new_response_outparam( + &mut self, + result: tokio::sync::oneshot::Sender< + Result, types::Error>, + >, + ) -> wasmtime::Result { + Ok(ResponseOutparamLens::push(self.table(), HostResponseOutparam { result })?.id) + } +} + +pub type IncomingRequestLens = TableLens; + +pub struct HostIncomingRequest { + pub parts: http::request::Parts, + pub body: Option, +} + +pub type ResponseOutparamLens = TableLens; + +pub struct HostResponseOutparam { + pub result: + tokio::sync::oneshot::Sender, types::Error>>, } +pub type OutgoingRequestLens = TableLens; + pub struct HostOutgoingRequest { pub method: Method, pub scheme: Option, pub path_with_query: String, pub authority: String, pub headers: FieldMap, - pub body: Option, + pub body: Option, } pub struct HostIncomingResponse { @@ -36,6 +85,37 @@ pub struct HostIncomingResponse { pub worker: AbortOnDropJoinHandle>, } +pub type OutgoingResponseLens = TableLens; + +pub struct HostOutgoingResponse { + pub status: u16, + pub headers: FieldMap, + pub body: Option, +} + +impl TryFrom for hyper::Response { + type Error = http::Error; + + fn try_from( + resp: HostOutgoingResponse, + ) -> Result, Self::Error> { + use http_body_util::{BodyExt, Empty}; + + let mut builder = hyper::Response::builder().status(resp.status); + + *builder.headers_mut().unwrap() = resp.headers; + + match resp.body { + Some(body) => builder.body(body), + None => builder.body( + Empty::::new() + .map_err(|_| anyhow::anyhow!("empty error")) + .boxed(), + ), + } + } +} + pub type FieldMap = hyper::HeaderMap; pub enum HostFields { @@ -54,7 +134,7 @@ pub enum HostFields { } pub struct IncomingResponseInternal { - pub resp: hyper::Response, + pub resp: hyper::Response, pub worker: AbortOnDropJoinHandle>, pub between_bytes_timeout: std::time::Duration, } @@ -105,13 +185,52 @@ impl std::future::Future for HostFutureIncomingResponse { } } -#[async_trait::async_trait] +pub struct TableLens { + id: u32, + _unused: std::marker::PhantomData, +} + +impl TableLens { + pub fn from(id: u32) -> Self { + Self { + id, + _unused: std::marker::PhantomData {}, + } + } + + pub fn into(self) -> u32 { + self.id + } + + #[inline(always)] + pub fn push(table: &mut Table, val: T) -> Result { + let id = table.push(Box::new(val))?; + Ok(Self::from(id)) + } + + #[inline(always)] + pub fn get<'t>(&self, table: &'t Table) -> Result<&'t T, TableError> { + table.get(self.id) + } + + #[inline(always)] + pub fn get_mut<'t>(&self, table: &'t mut Table) -> Result<&'t mut T, TableError> { + table.get_mut(self.id) + } + + #[inline(always)] + pub fn delete(&self, table: &mut Table) -> Result { + table.delete(self.id) + } +} + pub trait TableHttpExt { - fn push_outgoing_response(&mut self, request: HostOutgoingRequest) -> Result; - fn get_outgoing_request(&self, id: u32) -> Result<&HostOutgoingRequest, TableError>; - fn get_outgoing_request_mut(&mut self, id: u32) - -> Result<&mut HostOutgoingRequest, TableError>; - fn delete_outgoing_request(&mut self, id: u32) -> Result; + fn push_outgoing_response( + &mut self, + resp: HostOutgoingResponse, + ) -> Result; + fn get_outgoing_response(&mut self, id: u32) -> Result<&mut HostOutgoingResponse, TableError>; + fn delete_outgoing_response(&mut self, id: u32) -> Result; fn push_incoming_response(&mut self, response: HostIncomingResponse) -> Result; @@ -165,23 +284,26 @@ pub trait TableHttpExt { ) -> Result; } -#[async_trait::async_trait] impl TableHttpExt for Table { - fn push_outgoing_response(&mut self, request: HostOutgoingRequest) -> Result { - self.push(Box::new(request)) - } - fn get_outgoing_request(&self, id: u32) -> Result<&HostOutgoingRequest, TableError> { - self.get::(id) + fn push_outgoing_response( + &mut self, + response: HostOutgoingResponse, + ) -> Result { + self.push(Box::new(response)) } - fn get_outgoing_request_mut( + + fn get_outgoing_response( &mut self, - id: u32, - ) -> Result<&mut HostOutgoingRequest, TableError> { - self.get_mut::(id) + id: OutgoingResponse, + ) -> Result<&mut HostOutgoingResponse, TableError> { + self.get_mut(id) } - fn delete_outgoing_request(&mut self, id: u32) -> Result { - let req = self.delete::(id)?; - Ok(req) + + fn delete_outgoing_response( + &mut self, + id: OutgoingResponse, + ) -> Result { + self.delete(id) } fn push_incoming_response( diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 1b2326f3553b..4baea8f364bf 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -3,17 +3,17 @@ use crate::bindings::http::types::{ IncomingResponse, Method, OutgoingBody, OutgoingRequest, OutgoingResponse, ResponseOutparam, Scheme, StatusCode, Trailers, }; -use crate::body::{HostFutureTrailers, HostFutureTrailersState}; -use crate::types::FieldMap; +use crate::body::{FinishMessage, HostFutureTrailers, HostFutureTrailersState}; +use crate::types::{HostIncomingRequest, HostOutgoingResponse}; use crate::WasiHttpView; use crate::{ body::{HostIncomingBodyBuilder, HostOutgoingBody}, types::{ - HostFields, HostFutureIncomingResponse, HostIncomingResponse, HostOutgoingRequest, - TableHttpExt, + self, FieldMap, HostFields, HostFutureIncomingResponse, HostIncomingResponse, + HostOutgoingRequest, TableHttpExt, }, }; -use anyhow::{anyhow, Context}; +use anyhow::Context; use std::any::Any; use wasmtime_wasi::preview2::{ bindings::io::streams::{InputStream, OutputStream}, @@ -113,42 +113,110 @@ impl crate::bindings::http::types::Host for T { .context("[fields_clone] pushing fields")?; Ok(id) } - fn drop_incoming_request(&mut self, _request: IncomingRequest) -> wasmtime::Result<()> { - todo!("we haven't implemented the server side of wasi-http yet") + fn drop_incoming_request(&mut self, id: IncomingRequest) -> wasmtime::Result<()> { + let _ = types::IncomingRequestLens::from(id).delete(self.table())?; + Ok(()) } fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { - self.table().delete_outgoing_request(request)?; + types::OutgoingRequestLens::from(request).delete(self.table())?; Ok(()) } - fn incoming_request_method(&mut self, _request: IncomingRequest) -> wasmtime::Result { - todo!("we haven't implemented the server side of wasi-http yet") + fn incoming_request_method(&mut self, request: IncomingRequest) -> wasmtime::Result { + let method = types::IncomingRequestLens::from(request) + .get(self.table())? + .parts + .method + .as_ref(); + + if method == hyper::Method::GET { + Ok(Method::Get) + } else if method == hyper::Method::HEAD { + Ok(Method::Head) + } else if method == hyper::Method::POST { + Ok(Method::Post) + } else if method == hyper::Method::PUT { + Ok(Method::Put) + } else if method == hyper::Method::DELETE { + Ok(Method::Delete) + } else if method == hyper::Method::CONNECT { + Ok(Method::Connect) + } else if method == hyper::Method::OPTIONS { + Ok(Method::Options) + } else if method == hyper::Method::TRACE { + Ok(Method::Trace) + } else if method == hyper::Method::PATCH { + Ok(Method::Patch) + } else { + Ok(Method::Other(method.to_owned())) + } } fn incoming_request_path_with_query( &mut self, - _request: IncomingRequest, + id: IncomingRequest, ) -> wasmtime::Result> { - todo!("we haven't implemented the server side of wasi-http yet") - } - fn incoming_request_scheme( - &mut self, - _request: IncomingRequest, - ) -> wasmtime::Result> { - todo!("we haven't implemented the server side of wasi-http yet") + let req = types::IncomingRequestLens::from(id).get(self.table())?; + Ok(req + .parts + .uri + .path_and_query() + .map(|path_and_query| path_and_query.as_str().to_owned())) + } + fn incoming_request_scheme(&mut self, id: IncomingRequest) -> wasmtime::Result> { + let req = types::IncomingRequestLens::from(id).get(self.table())?; + Ok(req.parts.uri.scheme().map(|scheme| { + if scheme == &http::uri::Scheme::HTTP { + return Scheme::Http; + } + + if scheme == &http::uri::Scheme::HTTPS { + return Scheme::Https; + } + + Scheme::Other(req.parts.uri.scheme_str().unwrap().to_owned()) + })) } fn incoming_request_authority( &mut self, - _request: IncomingRequest, + id: IncomingRequest, ) -> wasmtime::Result> { - todo!("we haven't implemented the server side of wasi-http yet") + let req = types::IncomingRequestLens::from(id).get(self.table())?; + Ok(req + .parts + .uri + .authority() + .map(|auth| auth.as_str().to_owned())) } - fn incoming_request_headers(&mut self, _request: IncomingRequest) -> wasmtime::Result { - todo!("we haven't implemented the server side of wasi-http yet") + fn incoming_request_headers(&mut self, id: IncomingRequest) -> wasmtime::Result { + let _ = types::IncomingRequestLens::from(id).get(self.table())?; + + fn get_fields(elem: &mut dyn Any) -> &mut FieldMap { + &mut elem + .downcast_mut::() + .unwrap() + .parts + .headers + } + + let headers = self.table().push_fields(HostFields::Ref { + parent: id, + get_fields, + })?; + + Ok(headers) } fn incoming_request_consume( &mut self, - _request: IncomingRequest, - ) -> wasmtime::Result> { - todo!("we haven't implemented the server side of wasi-http yet") + id: IncomingRequest, + ) -> wasmtime::Result> { + let req = types::IncomingRequestLens::from(id).get_mut(self.table())?; + match req.body.take() { + Some(builder) => { + let id = self.table().push_incoming_body(builder.build())?; + Ok(Ok(id)) + } + + None => Ok(Err(())), + } } fn new_outgoing_request( &mut self, @@ -168,19 +236,17 @@ impl crate::bindings::http::types::Host for T { scheme, body: None, }; - let id = self - .table() - .push_outgoing_response(req) - .context("[new_outgoing_request] pushing request")?; + let id = types::OutgoingRequestLens::push(self.table(), req) + .context("[new_outgoing_request] pushing request")? + .into(); Ok(id) } fn outgoing_request_write( &mut self, request: OutgoingRequest, ) -> wasmtime::Result> { - let req = self - .table() - .get_outgoing_request_mut(request) + let req = types::OutgoingRequestLens::from(request) + .get_mut(self.table()) .context("[outgoing_request_write] getting request")?; if req.body.is_some() { @@ -197,15 +263,25 @@ impl crate::bindings::http::types::Host for T { Ok(Ok(outgoing_body)) } - fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> { - todo!("we haven't implemented the server side of wasi-http yet") + fn drop_response_outparam(&mut self, id: ResponseOutparam) -> wasmtime::Result<()> { + let _ = types::ResponseOutparamLens::from(id).delete(self.table())?; + Ok(()) } fn set_response_outparam( &mut self, - _outparam: ResponseOutparam, - _response: Result, + id: ResponseOutparam, + resp: Result, ) -> wasmtime::Result<()> { - todo!("we haven't implemented the server side of wasi-http yet") + let val = match resp { + Ok(resp) => Ok(self.table().delete_outgoing_response(resp)?.try_into()?), + Err(e) => Err(e), + }; + + types::ResponseOutparamLens::from(id) + .delete(self.table())? + .result + .send(val) + .map_err(|_| anyhow::anyhow!("failed to initialize response")) } fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { self.table() @@ -213,8 +289,9 @@ impl crate::bindings::http::types::Host for T { .context("[drop_incoming_response] deleting response")?; Ok(()) } - fn drop_outgoing_response(&mut self, _response: OutgoingResponse) -> wasmtime::Result<()> { - todo!("we haven't implemented the server side of wasi-http yet") + fn drop_outgoing_response(&mut self, id: OutgoingResponse) -> wasmtime::Result<()> { + types::OutgoingResponseLens::from(id).delete(self.table())?; + Ok(()) } fn incoming_response_status( &mut self, @@ -315,16 +392,41 @@ impl crate::bindings::http::types::Host for T { fn new_outgoing_response( &mut self, - _status_code: StatusCode, - _headers: Headers, + status: StatusCode, + headers: Headers, ) -> wasmtime::Result { - todo!("we haven't implemented the server side of wasi-http yet") + let fields = self.table().get_fields(headers)?.clone(); + self.table().delete_fields(headers)?; + + let id = types::OutgoingResponseLens::push( + self.table(), + HostOutgoingResponse { + status, + headers: fields, + body: None, + }, + )? + .into(); + + Ok(id) } fn outgoing_response_write( &mut self, - _response: OutgoingResponse, - ) -> wasmtime::Result> { - todo!("we haven't implemented the server side of wasi-http yet") + id: OutgoingResponse, + ) -> wasmtime::Result> { + let resp = types::OutgoingResponseLens::from(id).get_mut(self.table())?; + + if resp.body.is_some() { + return Ok(Err(())); + } + + let (host, body) = HostOutgoingBody::new(); + + resp.body.replace(body); + + let id = self.table().push_outgoing_body(host)?; + + Ok(Ok(id)) } fn drop_future_incoming_response( &mut self, @@ -431,32 +533,41 @@ impl crate::bindings::http::types::Host for T { } } - fn outgoing_body_write_trailers( + fn outgoing_body_finish( &mut self, id: OutgoingBody, - ts: Trailers, + ts: Option, ) -> wasmtime::Result<()> { let mut body = self.table().delete_outgoing_body(id)?; - let trailers = self.table().get_fields(ts)?.clone(); - match body - .trailers_sender + let sender = body + .finish_sender .take() - // Should be unreachable - this is the only place we take the trailers sender, - // at the end of the HostOutgoingBody's lifetime - .ok_or_else(|| anyhow!("trailers_sender missing"))? - .send(trailers.into()) - { - Ok(()) => {} - Err(_) => {} // Ignoring failure: receiver died sending body, but we can't report that - // here. - } + .expect("outgoing-body trailer_sender consumed by a non-owning function"); + + let message = if let Some(ts) = ts { + FinishMessage::Trailers(self.table().get_fields(ts)?.clone().into()) + } else { + FinishMessage::Finished + }; + + // Ignoring failure: receiver died sending body, but we can't report that here. + let _ = sender.send(message.into()); Ok(()) } fn drop_outgoing_body(&mut self, id: OutgoingBody) -> wasmtime::Result<()> { - let _ = self.table().delete_outgoing_body(id)?; + let mut body = self.table().delete_outgoing_body(id)?; + + let sender = body + .finish_sender + .take() + .expect("outgoing-body trailer_sender consumed by a non-owning function"); + + // Ignoring failure: receiver died sending body, but we can't report that here. + let _ = sender.send(FinishMessage::Abort); + Ok(()) } } diff --git a/crates/wasi-http/wit/deps/http/types.wit b/crates/wasi-http/wit/deps/http/types.wit index 821e15f96213..dfcacd8feb73 100644 --- a/crates/wasi-http/wit/deps/http/types.wit +++ b/crates/wasi-http/wit/deps/http/types.wit @@ -81,7 +81,7 @@ interface types { incoming-request-headers: func(request: /* borrow */ incoming-request) -> /* child */ headers // Will return the input-stream child at most once. If called more than // once, subsequent calls will return error. - incoming-request-consume: func(request: /* borrow */ incoming-request) -> result< /* child */ input-stream> + incoming-request-consume: func(request: /* borrow */ incoming-request) -> result< /* own */ incoming-body> type outgoing-request = u32 drop-outgoing-request: func(request: /* own */ outgoing-request) @@ -140,7 +140,7 @@ interface types { // May be called at most once. returns error if called additional times. // TODO: make incoming-request-consume work the same way, giving a child // incoming-body. - incoming-response-consume: func(response: /* borrow */ incoming-response) -> result + incoming-response-consume: func(response: /* borrow */ incoming-response) -> result type incoming-body = u32 drop-incoming-body: func(this: /* own */ incoming-body) @@ -174,16 +174,18 @@ interface types { /// Will give the child outgoing-response at most once. subsequent calls will /// return an error. - outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result + outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result type outgoing-body = u32 drop-outgoing-body: func(this: /* own */ outgoing-body) /// Will give the child output-stream at most once. subsequent calls will /// return an error. outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result - /// Write trailers as the way to finish an outgoing-body. To finish an - /// outgoing-body without writing trailers, use drop-outgoing-body. - outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers) + /// Finalize an outgoing body, optionally providing trailers. This must be + /// called to signal that the response is complete. If the `outgoing-body` is + /// dropped without calling `outgoing-body-finalize`, the implementation + /// should treat the body as corrupted. + outgoing-body-finish: func(this: /* own */ outgoing-body, trailers: /* own */ option) /// The following block defines a special resource type used by the /// `wasi:http/outgoing-handler` interface to emulate diff --git a/crates/wasi/wit/deps/http/types.wit b/crates/wasi/wit/deps/http/types.wit index 821e15f96213..dfcacd8feb73 100644 --- a/crates/wasi/wit/deps/http/types.wit +++ b/crates/wasi/wit/deps/http/types.wit @@ -81,7 +81,7 @@ interface types { incoming-request-headers: func(request: /* borrow */ incoming-request) -> /* child */ headers // Will return the input-stream child at most once. If called more than // once, subsequent calls will return error. - incoming-request-consume: func(request: /* borrow */ incoming-request) -> result< /* child */ input-stream> + incoming-request-consume: func(request: /* borrow */ incoming-request) -> result< /* own */ incoming-body> type outgoing-request = u32 drop-outgoing-request: func(request: /* own */ outgoing-request) @@ -140,7 +140,7 @@ interface types { // May be called at most once. returns error if called additional times. // TODO: make incoming-request-consume work the same way, giving a child // incoming-body. - incoming-response-consume: func(response: /* borrow */ incoming-response) -> result + incoming-response-consume: func(response: /* borrow */ incoming-response) -> result type incoming-body = u32 drop-incoming-body: func(this: /* own */ incoming-body) @@ -174,16 +174,18 @@ interface types { /// Will give the child outgoing-response at most once. subsequent calls will /// return an error. - outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result + outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result type outgoing-body = u32 drop-outgoing-body: func(this: /* own */ outgoing-body) /// Will give the child output-stream at most once. subsequent calls will /// return an error. outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result - /// Write trailers as the way to finish an outgoing-body. To finish an - /// outgoing-body without writing trailers, use drop-outgoing-body. - outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers) + /// Finalize an outgoing body, optionally providing trailers. This must be + /// called to signal that the response is complete. If the `outgoing-body` is + /// dropped without calling `outgoing-body-finalize`, the implementation + /// should treat the body as corrupted. + outgoing-body-finish: func(this: /* own */ outgoing-body, trailers: /* own */ option) /// The following block defines a special resource type used by the /// `wasi:http/outgoing-handler` interface to emulate diff --git a/src/bin/wasmtime.rs b/src/bin/wasmtime.rs index d32a0b63f373..46868b3aca05 100644 --- a/src/bin/wasmtime.rs +++ b/src/bin/wasmtime.rs @@ -5,6 +5,8 @@ use anyhow::Result; use clap::Parser; +#[cfg(feature = "serve")] +use wasmtime_cli::commands::ServeCommand; use wasmtime_cli::commands::{ CompileCommand, ConfigCommand, ExploreCommand, RunCommand, SettingsCommand, WastCommand, }; @@ -53,6 +55,9 @@ enum Subcommand { Explore(ExploreCommand), /// Runs a WebAssembly module Run(RunCommand), + /// Serves requests from a wasi-http proxy component. + #[cfg(feature = "serve")] + Serve(ServeCommand), /// Displays available Cranelift settings for a target. Settings(SettingsCommand), /// Runs a WebAssembly test script file @@ -68,6 +73,8 @@ impl Wasmtime { Subcommand::Compile(c) => c.execute(), Subcommand::Explore(c) => c.execute(), Subcommand::Run(c) => c.execute(), + #[cfg(feature = "serve")] + Subcommand::Serve(c) => c.execute(), Subcommand::Settings(c) => c.execute(), Subcommand::Wast(c) => c.execute(), } diff --git a/src/commands.rs b/src/commands.rs index 764f01e936cd..ae46a2264e17 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -7,4 +7,10 @@ mod run; mod settings; mod wast; +#[cfg(feature = "serve")] +mod serve; + pub use self::{compile::*, config::*, explore::*, run::*, settings::*, wast::*}; + +#[cfg(feature = "serve")] +pub use self::serve::*; diff --git a/src/commands/serve.rs b/src/commands/serve.rs new file mode 100644 index 000000000000..5cd1a8225e4c --- /dev/null +++ b/src/commands/serve.rs @@ -0,0 +1,199 @@ +use anyhow::Result; +use clap::Parser; +use std::{path::PathBuf, pin::Pin, sync::Arc}; +use wasmtime::component::{Component, InstancePre, Linker}; +use wasmtime::{Engine, Store}; +use wasmtime_cli_flags::CommonOptions; +use wasmtime_wasi::preview2::{Table, WasiCtx, WasiCtxBuilder, WasiView}; +use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView}; + +struct Host { + table: Table, + ctx: WasiCtx, + http: WasiHttpCtx, +} + +impl Host { + fn new() -> Result { + let mut table = Table::new(); + let ctx = WasiCtxBuilder::new().build(&mut table)?; + Ok(Host { + table, + ctx, + http: WasiHttpCtx, + }) + } +} + +impl WasiView for Host { + fn table(&self) -> &Table { + &self.table + } + + fn table_mut(&mut self) -> &mut Table { + &mut self.table + } + + fn ctx(&self) -> &WasiCtx { + &self.ctx + } + + fn ctx_mut(&mut self) -> &mut WasiCtx { + &mut self.ctx + } +} + +impl WasiHttpView for Host { + fn table(&mut self) -> &mut Table { + &mut self.table + } + + fn ctx(&mut self) -> &mut WasiHttpCtx { + &mut self.http + } +} + +/// Runs a WebAssembly module +#[derive(Parser)] +#[structopt(name = "run")] +pub struct ServeCommand { + #[clap(flatten)] + common: CommonOptions, + + /// Socket address for the web server to bind to. Defaults to 0.0.0.0:8080. + #[clap(long = "addr", value_name = "SOCKADDR")] + addr: Option, + + /// The WebAssembly component to run. + #[clap(value_name = "WASM", required = true)] + component: PathBuf, +} + +impl ServeCommand { + fn addr(&self) -> std::net::SocketAddr { + self.addr.unwrap_or("0.0.0.0:8080".parse().unwrap()) + } + + /// Start a server to run the given wasi-http proxy component + pub fn execute(mut self) -> Result<()> { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build()?; + + runtime.block_on(async move { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + Ok::<_, anyhow::Error>(()) + } + + res = self.serve() => { + res + } + } + })?; + + Ok(()) + } + + fn add_to_linker(&self, linker: &mut Linker) -> Result<()> { + wasmtime_wasi_http::proxy::add_to_linker(linker)?; + Ok(()) + } + + async fn serve(&mut self) -> Result<()> { + use hyper::server::conn::http1; + + let mut config = self.common.config(None)?; + config.wasm_component_model(true); + config.async_support(true); + + let engine = Arc::new(Engine::new(&config)?); + let mut linker = Linker::new(&engine); + + self.add_to_linker(&mut linker)?; + + let component = Component::from_file(&engine, &self.component)?; + + let instance = Arc::new(linker.instantiate_pre(&component)?); + + let listener = tokio::net::TcpListener::bind(self.addr()).await?; + + loop { + let (stream, _) = listener.accept().await?; + let engine = Arc::clone(&engine); + let instance = Arc::clone(&instance); + tokio::task::spawn(async move { + let handler = ProxyHandler::new(engine, instance); + if let Err(e) = http1::Builder::new() + .keep_alive(true) + .serve_connection(stream, handler) + .await + { + eprintln!("error: {e:?}"); + } + }); + } + } +} + +#[derive(Clone)] +struct ProxyHandler { + engine: Arc, + instance_pre: Arc>, +} + +impl ProxyHandler { + fn new(engine: Arc, instance_pre: Arc>) -> Self { + Self { + engine, + instance_pre, + } + } +} + +type Request = hyper::Request; + +impl hyper::service::Service for ProxyHandler { + type Response = hyper::Response; + type Error = anyhow::Error; + type Future = Pin> + Send>>; + + fn call(&mut self, req: Request) -> Self::Future { + use http_body_util::BodyExt; + + let handler = self.clone(); + + let (sender, receiver) = tokio::sync::oneshot::channel(); + + // TODO: need to track the join handle, but don't want to block the response on it + tokio::task::spawn(async move { + let host = Host::new()?; + let mut store = Store::new(&handler.engine, host); + + let req = store.data_mut().new_incoming_request( + req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()), + )?; + + let out = store.data_mut().new_response_outparam(sender)?; + + let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre( + &mut store, + &handler.instance_pre, + ) + .await?; + + proxy + .wasi_http_incoming_handler() + .call_handle(store, req, out) + .await?; + + Ok::<_, anyhow::Error>(()) + }); + + Box::pin(async move { + let resp = receiver.await.unwrap()?; + Ok(resp) + }) + } +} diff --git a/supply-chain/audits.toml b/supply-chain/audits.toml index ee4eb95a1245..5fb0c2e8c1f8 100644 --- a/supply-chain/audits.toml +++ b/supply-chain/audits.toml @@ -497,6 +497,14 @@ start = "2023-01-20" end = "2024-06-26" notes = "The Bytecode Alliance is the author of this crate." +[[wildcard-audits.wasmtime-wmemcheck]] +who = "Pat Hickey " +criteria = "safe-to-deploy" +user-id = 73222 # wasmtime-publish +start = "2022-11-27" +end = "2024-06-26" +notes = "The Bytecode Alliance is the author of this crate." + [[wildcard-audits.wast]] who = "Alex Crichton " criteria = "safe-to-deploy" @@ -1886,6 +1894,11 @@ criteria = "safe-to-deploy" delta = "0.9.9 -> 0.10.2" notes = "This upgrade is mostly a code refactor, as far as I can tell. No new uses of unsafe nor any new ambient capabilities usage." +[[audits.signal-hook-registry]] +who = "Pat Hickey " +criteria = "safe-to-deploy" +version = "1.4.1" + [[audits.slab]] who = "Pat Hickey " criteria = "safe-to-deploy" diff --git a/supply-chain/config.toml b/supply-chain/config.toml index 28b251f7ac1d..429251b49472 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -169,6 +169,9 @@ audit-as-crates-io = true [policy.wasmtime-wit-bindgen] audit-as-crates-io = true +[policy.wasmtime-wmemcheck] +audit-as-crates-io = true + [policy.wiggle] audit-as-crates-io = true diff --git a/supply-chain/imports.lock b/supply-chain/imports.lock index 29747fbcc690..c7830a53140e 100644 --- a/supply-chain/imports.lock +++ b/supply-chain/imports.lock @@ -353,6 +353,10 @@ audited_as = "11.0.1" version = "14.0.0" audited_as = "12.0.1" +[[unpublished.wasmtime-wmemcheck]] +version = "14.0.0" +audited_as = "13.0.0" + [[unpublished.wiggle]] version = "13.0.0" audited_as = "11.0.1" @@ -1644,6 +1648,12 @@ when = "2023-08-24" user-id = 73222 user-login = "wasmtime-publish" +[[publisher.wasmtime-wmemcheck]] +version = "13.0.0" +when = "2023-09-20" +user-id = 73222 +user-login = "wasmtime-publish" + [[publisher.wast]] version = "62.0.1" when = "2023-07-26"