diff --git a/crates/test-programs/tests/wasi-http-components-sync.rs b/crates/test-programs/tests/wasi-http-components-sync.rs index f525d5b44724..33c638fc6883 100644 --- a/crates/test-programs/tests/wasi-http-components-sync.rs +++ b/crates/test-programs/tests/wasi-http-components-sync.rs @@ -46,12 +46,13 @@ impl WasiView for Ctx { } impl WasiHttpView for Ctx { - fn http_ctx(&self) -> &WasiHttpCtx { - &self.http - } - fn http_ctx_mut(&mut self) -> &mut WasiHttpCtx { + fn ctx(&mut self) -> &mut WasiHttpCtx { &mut self.http } + + fn table(&mut self) -> &mut Table { + &mut self.table + } } fn instantiate_component( @@ -60,7 +61,7 @@ fn instantiate_component( ) -> Result<(Store, Command), anyhow::Error> { let mut linker = Linker::new(&ENGINE); add_to_linker(&mut linker)?; - wasmtime_wasi_http::proxy::sync::add_to_linker(&mut linker)?; + wasmtime_wasi_http::proxy::add_to_linker(&mut linker)?; let mut store = Store::new(&ENGINE, ctx); @@ -84,7 +85,7 @@ fn run(name: &str) -> anyhow::Result<()> { builder.env(var, val); } let wasi = builder.build(&mut table)?; - let http = WasiHttpCtx::new(); + let http = WasiHttpCtx {}; let (mut store, command) = instantiate_component(component, Ctx { table, wasi, http })?; command diff --git a/crates/test-programs/tests/wasi-http-components.rs b/crates/test-programs/tests/wasi-http-components.rs index 1f05411d0178..8d1fa1f96c52 100644 --- a/crates/test-programs/tests/wasi-http-components.rs +++ b/crates/test-programs/tests/wasi-http-components.rs @@ -47,10 +47,10 @@ impl WasiView for Ctx { } impl WasiHttpView for Ctx { - fn http_ctx(&self) -> &WasiHttpCtx { - &self.http + fn table(&mut self) -> &mut Table { + &mut self.table } - fn http_ctx_mut(&mut self) -> &mut WasiHttpCtx { + fn ctx(&mut self) -> &mut WasiHttpCtx { &mut self.http } } @@ -85,16 +85,11 @@ async fn run(name: &str) -> anyhow::Result<()> { builder.env(var, val); } let wasi = builder.build(&mut table)?; - let http = WasiHttpCtx::new(); + let http = WasiHttpCtx; let (mut store, command) = instantiate_component(component, Ctx { table, wasi, http }).await?; - command - .wasi_cli_run() - .call_run(&mut store) - .await? - .map_err(|()| anyhow::anyhow!("run returned a failure"))?; - Ok(()) + command.wasi_cli_run().call_run(&mut store).await }; r.map_err(move |trap: anyhow::Error| { let stdout = stdout.try_into_inner().expect("single ref to stdout"); @@ -109,7 +104,8 @@ async fn run(name: &str) -> anyhow::Result<()> { "error while testing wasi-tests {} with http-components", name )) - })?; + })? + .map_err(|()| anyhow::anyhow!("run returned an error"))?; Ok(()) } diff --git a/crates/test-programs/tests/wasi-http-modules.rs b/crates/test-programs/tests/wasi-http-modules.rs deleted file mode 100644 index e73251d5a1f8..000000000000 --- a/crates/test-programs/tests/wasi-http-modules.rs +++ /dev/null @@ -1,196 +0,0 @@ -#![cfg(all(feature = "test_programs", not(skip_wasi_http_tests)))] -use wasmtime::{Config, Engine, Func, Linker, Module, Store}; -use wasmtime_wasi::preview2::{ - pipe::MemoryOutputPipe, - preview1::{WasiPreview1Adapter, WasiPreview1View}, - IsATTY, Table, WasiCtx, WasiCtxBuilder, WasiView, -}; -use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; - -use test_programs::http_server::{setup_http1, setup_http2}; - -lazy_static::lazy_static! { - static ref ENGINE: Engine = { - let mut config = Config::new(); - config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); - config.wasm_component_model(true); - config.async_support(true); - - let engine = Engine::new(&config).unwrap(); - engine - }; -} -// uses ENGINE, creates a fn get_module(&str) -> Module -include!(concat!(env!("OUT_DIR"), "/wasi_http_tests_modules.rs")); - -struct Ctx { - table: Table, - wasi: WasiCtx, - adapter: WasiPreview1Adapter, - http: WasiHttpCtx, -} - -impl WasiView for Ctx { - fn table(&self) -> &Table { - &self.table - } - fn table_mut(&mut self) -> &mut Table { - &mut self.table - } - fn ctx(&self) -> &WasiCtx { - &self.wasi - } - fn ctx_mut(&mut self) -> &mut WasiCtx { - &mut self.wasi - } -} -impl WasiPreview1View for Ctx { - fn adapter(&self) -> &WasiPreview1Adapter { - &self.adapter - } - fn adapter_mut(&mut self) -> &mut WasiPreview1Adapter { - &mut self.adapter - } -} -impl WasiHttpView for Ctx { - fn http_ctx(&self) -> &WasiHttpCtx { - &self.http - } - fn http_ctx_mut(&mut self) -> &mut WasiHttpCtx { - &mut self.http - } -} - -async fn instantiate_module(module: Module, ctx: Ctx) -> Result<(Store, Func), anyhow::Error> { - let mut linker = Linker::new(&ENGINE); - wasmtime_wasi_http::add_to_linker(&mut linker)?; - wasmtime_wasi::preview2::preview1::add_to_linker_async(&mut linker)?; - - let mut store = Store::new(&ENGINE, ctx); - - let instance = linker.instantiate_async(&mut store, &module).await?; - let command = instance.get_func(&mut store, "_start").unwrap(); - Ok((store, command)) -} - -async fn run(name: &str) -> anyhow::Result<()> { - let stdout = MemoryOutputPipe::new(4096); - let stderr = MemoryOutputPipe::new(4096); - let r = { - let mut table = Table::new(); - let module = get_module(name); - - // Create our wasi context. - let mut builder = WasiCtxBuilder::new(); - builder.stdout(stdout.clone(), IsATTY::No); - builder.stderr(stderr.clone(), IsATTY::No); - builder.arg(name); - for (var, val) in test_programs::wasi_tests_environment() { - builder.env(var, val); - } - let wasi = builder.build(&mut table)?; - let http = WasiHttpCtx::new(); - - let adapter = WasiPreview1Adapter::new(); - - let (mut store, command) = instantiate_module( - module, - Ctx { - table, - wasi, - http, - adapter, - }, - ) - .await?; - command.call_async(&mut store, &[], &mut []).await - }; - r.map_err(move |trap: anyhow::Error| { - let stdout = stdout.try_into_inner().expect("single ref to stdout"); - if !stdout.is_empty() { - println!("[guest] stdout:\n{}\n===", String::from_utf8_lossy(&stdout)); - } - let stderr = stderr.try_into_inner().expect("single ref to stderr"); - if !stderr.is_empty() { - println!("[guest] stderr:\n{}\n===", String::from_utf8_lossy(&stderr)); - } - trap.context(format!( - "error while testing wasi-tests {} with http-modules", - name - )) - })?; - Ok(()) -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -#[cfg_attr( - windows, - ignore = "test is currently flaky in ci and needs to be debugged" -)] -async fn outbound_request_get() { - setup_http1(run("outbound_request_get")).await.unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -#[cfg_attr( - windows, - ignore = "test is currently flaky in ci and needs to be debugged" -)] -async fn outbound_request_post() { - setup_http1(run("outbound_request_post")).await.unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -#[cfg_attr( - windows, - ignore = "test is currently flaky in ci and needs to be debugged" -)] -async fn outbound_request_large_post() { - setup_http1(run("outbound_request_large_post")) - .await - .unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -#[cfg_attr( - windows, - ignore = "test is currently flaky in ci and needs to be debugged" -)] -async fn outbound_request_put() { - setup_http1(run("outbound_request_put")).await.unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -#[cfg_attr( - windows, - ignore = "test is currently flaky in ci and needs to be debugged" -)] -async fn outbound_request_invalid_version() { - setup_http2(run("outbound_request_invalid_version")) - .await - .unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn outbound_request_unknown_method() { - run("outbound_request_unknown_method").await.unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn outbound_request_unsupported_scheme() { - run("outbound_request_unsupported_scheme").await.unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn outbound_request_invalid_port() { - run("outbound_request_invalid_port").await.unwrap(); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -#[cfg_attr( - windows, - ignore = "test is currently flaky in ci and needs to be debugged" -)] -async fn outbound_request_invalid_dnsname() { - run("outbound_request_invalid_dnsname").await.unwrap(); -} diff --git a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_dnsname.rs b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_dnsname.rs index 7c3ba5909b8f..6cc5ce8bc97b 100644 --- a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_dnsname.rs +++ b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_dnsname.rs @@ -15,6 +15,6 @@ async fn run() { ) .await; - let error = res.unwrap_err(); - assert_eq!(error.to_string(), "Error::InvalidUrl(\"invalid dnsname\")"); + let error = res.unwrap_err().to_string(); + assert!(error.starts_with("Error::InvalidUrl(\"failed to lookup address information:")); } diff --git a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_version.rs b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_version.rs index cadabda4dec1..53c767edec8f 100644 --- a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_version.rs +++ b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_invalid_version.rs @@ -17,7 +17,7 @@ async fn run() { let error = res.unwrap_err().to_string(); if error.ne("Error::ProtocolError(\"invalid HTTP version parsed\")") - && error.ne("Error::ProtocolError(\"operation was canceled\")") + && !error.starts_with("Error::ProtocolError(\"operation was canceled") { panic!( r#"assertion failed: `(left == right)` diff --git a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_large_post.rs b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_large_post.rs index 9f4bacef8f73..80e0688d47fc 100644 --- a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_large_post.rs +++ b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_large_post.rs @@ -23,7 +23,7 @@ async fn run() { .context("localhost:3000 /post large") .unwrap(); - println!("localhost:3000 /post large: {res:?}"); + println!("localhost:3000 /post large: {}", res.status); assert_eq!(res.status, 200); let method = res.header("x-wasmtime-test-method").unwrap(); assert_eq!(std::str::from_utf8(method).unwrap(), "POST"); diff --git a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_post.rs b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_post.rs index 69a03d754ee9..131356fa91bc 100644 --- a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_post.rs +++ b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_post.rs @@ -22,5 +22,5 @@ async fn run() { assert_eq!(res.status, 200); let method = res.header("x-wasmtime-test-method").unwrap(); assert_eq!(std::str::from_utf8(method).unwrap(), "POST"); - assert_eq!(res.body, b"{\"foo\": \"bar\"}"); + assert_eq!(res.body, b"{\"foo\": \"bar\"}", "invalid body returned"); } diff --git a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unknown_method.rs b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unknown_method.rs index a2ab5e48dc02..727294861b00 100644 --- a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unknown_method.rs +++ b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unknown_method.rs @@ -18,6 +18,6 @@ async fn run() { let error = res.unwrap_err(); assert_eq!( error.to_string(), - "Error::InvalidUrl(\"unknown method OTHER\")" + "Error::Invalid(\"unknown method OTHER\")" ); } diff --git a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unsupported_scheme.rs b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unsupported_scheme.rs index 482550627e8d..c8a66b7da648 100644 --- a/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unsupported_scheme.rs +++ b/crates/test-programs/wasi-http-tests/src/bin/outbound_request_unsupported_scheme.rs @@ -18,6 +18,6 @@ async fn run() { let error = res.unwrap_err(); assert_eq!( error.to_string(), - "Error::InvalidUrl(\"unsupported scheme WS\")" + "Error::Invalid(\"unsupported scheme WS\")" ); } diff --git a/crates/test-programs/wasi-http-tests/src/lib.rs b/crates/test-programs/wasi-http-tests/src/lib.rs index e1d9fb76dd6c..d516af03da16 100644 --- a/crates/test-programs/wasi-http-tests/src/lib.rs +++ b/crates/test-programs/wasi-http-tests/src/lib.rs @@ -42,29 +42,22 @@ impl Response { } } -struct DropPollable { - pollable: poll::Pollable, -} - -impl Drop for DropPollable { - fn drop(&mut self) { - poll::drop_pollable(self.pollable); - } -} - pub async fn request( method: http_types::Method, scheme: http_types::Scheme, authority: &str, path_with_query: &str, body: Option<&[u8]>, - additional_headers: Option<&[(String, String)]>, + additional_headers: Option<&[(String, Vec)]>, ) -> Result { + fn header_val(v: &str) -> Vec { + v.to_string().into_bytes() + } let headers = http_types::new_fields( &[ &[ - ("User-agent".to_string(), "WASI-HTTP/0.0.1".to_string()), - ("Content-type".to_string(), "application/json".to_string()), + ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")), + ("Content-type".to_string(), header_val("application/json")), ], additional_headers.unwrap_or(&[]), ] @@ -79,15 +72,16 @@ pub async fn request( headers, ); - let request_body = http_types::outgoing_request_write(request) + let outgoing_body = http_types::outgoing_request_write(request) .map_err(|_| anyhow!("outgoing request write failed"))?; if let Some(mut buf) = body { - let sub = DropPollable { - pollable: streams::subscribe_to_output_stream(request_body), - }; + let request_body = http_types::outgoing_body_write(outgoing_body) + .map_err(|_| anyhow!("outgoing request write failed"))?; + + let pollable = streams::subscribe_to_output_stream(request_body); while !buf.is_empty() { - poll::poll_oneoff(&[sub.pollable]); + poll::poll_oneoff(&[pollable]); let permit = match streams::check_write(request_body) { Ok(n) => n, @@ -109,36 +103,39 @@ pub async fn request( _ => {} } - poll::poll_oneoff(&[sub.pollable]); + poll::poll_oneoff(&[pollable]); + poll::drop_pollable(pollable); match streams::check_write(request_body) { Ok(_) => {} Err(_) => anyhow::bail!("output stream error"), }; + + streams::drop_output_stream(request_body); } - let future_response = outgoing_handler::handle(request, None); + let future_response = outgoing_handler::handle(request, None)?; + + // TODO: The current implementation requires this drop after the request is sent. + // The ownership semantics are unclear in wasi-http we should clarify exactly what is + // supposed to happen here. + http_types::drop_outgoing_body(outgoing_body); let incoming_response = match http_types::future_incoming_response_get(future_response) { - Some(result) => result, + Some(result) => result.map_err(|_| anyhow!("incoming response errored"))?, None => { let pollable = http_types::listen_to_future_incoming_response(future_response); let _ = poll::poll_oneoff(&[pollable]); + poll::drop_pollable(pollable); http_types::future_incoming_response_get(future_response) .expect("incoming response available") + .map_err(|_| anyhow!("incoming response errored"))? } } // TODO: maybe anything that appears in the Result<_, E> position should impl // Error? anyway, just use its Debug here: .map_err(|e| anyhow!("{e:?}"))?; - // TODO: The current implementation requires this drop after the request is sent. - // The ownership semantics are unclear in wasi-http we should clarify exactly what is - // supposed to happen here. - streams::drop_output_stream(request_body); - - http_types::drop_outgoing_request(request); - http_types::drop_future_incoming_response(future_response); let status = http_types::incoming_response_status(incoming_response); @@ -147,26 +144,32 @@ pub async fn request( let headers = http_types::fields_entries(headers_handle); http_types::drop_fields(headers_handle); - let body_stream = http_types::incoming_response_consume(incoming_response) + let incoming_body = http_types::incoming_response_consume(incoming_response) .map_err(|()| anyhow!("incoming response has no body stream"))?; - let input_stream_pollable = streams::subscribe_to_input_stream(body_stream); + + http_types::drop_incoming_response(incoming_response); + + let input_stream = http_types::incoming_body_stream(incoming_body).unwrap(); + let input_stream_pollable = streams::subscribe_to_input_stream(input_stream); let mut body = Vec::new(); let mut eof = streams::StreamStatus::Open; while eof != streams::StreamStatus::Ended { - let (mut body_chunk, stream_status) = - streams::read(body_stream, u64::MAX).map_err(|_| anyhow!("body_stream read failed"))?; - eof = if body_chunk.is_empty() { - streams::StreamStatus::Ended - } else { - stream_status - }; - body.append(&mut body_chunk); + poll::poll_oneoff(&[input_stream_pollable]); + + let (mut body_chunk, stream_status) = streams::read(input_stream, 1024 * 1024) + .map_err(|_| anyhow!("input_stream read failed"))?; + + eof = stream_status; + + if !body_chunk.is_empty() { + body.append(&mut body_chunk); + } } poll::drop_pollable(input_stream_pollable); - streams::drop_input_stream(body_stream); - http_types::drop_incoming_response(incoming_response); + streams::drop_input_stream(input_stream); + http_types::drop_incoming_body(incoming_body); Ok(Response { status, diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs new file mode 100644 index 000000000000..298fe7b92bfe --- /dev/null +++ b/crates/wasi-http/src/body.rs @@ -0,0 +1,498 @@ +use crate::{bindings::http::types, types::FieldMap}; +use anyhow::anyhow; +use bytes::Bytes; +use http_body_util::combinators::BoxBody; +use std::future::Future; +use std::{ + convert::Infallible, + pin::Pin, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::sync::{mpsc, oneshot}; +use wasmtime_wasi::preview2::{ + self, AbortOnDropJoinHandle, HostInputStream, HostOutputStream, OutputStreamError, + StreamRuntimeError, StreamState, +}; + +/// Holds onto the things needed to construct a [`HostIncomingBody`] until we are ready to build +/// one. The HostIncomingBody spawns a task that starts consuming the incoming body, and we don't +/// want to do that unless the user asks to consume the body. +pub struct HostIncomingBodyBuilder { + pub body: hyper::body::Incoming, + pub between_bytes_timeout: Duration, +} + +impl HostIncomingBodyBuilder { + /// Consume the state held in the [`HostIncomingBodyBuilder`] to spawn a task that will drive the + /// streaming body to completion. Data segments will be communicated out over the + /// [`HostIncomingBodyStream`], and a [`HostFutureTrailers`] gives a way to block on/retrieve + /// the trailers. + pub fn build(mut self) -> HostIncomingBody { + let (body_writer, body_receiver) = mpsc::channel(1); + let (trailer_writer, trailers) = oneshot::channel(); + + let worker = preview2::spawn(async move { + loop { + let frame = match tokio::time::timeout( + self.between_bytes_timeout, + http_body_util::BodyExt::frame(&mut self.body), + ) + .await + { + Ok(None) => break, + + Ok(Some(Ok(frame))) => frame, + + Ok(Some(Err(e))) => { + match body_writer.send(Err(anyhow::anyhow!(e))).await { + Ok(_) => {} + // If the body read end has dropped, then we report this error with the + // trailers. unwrap and rewrap Err because the Ok side of these two Results + // are different. + Err(e) => { + let _ = trailer_writer.send(Err(e.0.unwrap_err())); + } + } + break; + } + + Err(_) => { + match body_writer + .send(Err(types::Error::TimeoutError( + "data frame timed out".to_string(), + ) + .into())) + .await + { + Ok(_) => {} + Err(e) => { + let _ = trailer_writer.send(Err(e.0.unwrap_err())); + } + } + break; + } + }; + + if frame.is_trailers() { + // We know we're not going to write any more data frames at this point, so we + // explicitly drop the body_writer so that anything waiting on the read end returns + // immediately. + drop(body_writer); + + let trailers = frame.into_trailers().unwrap(); + + // TODO: this will fail in two cases: + // 1. we've already used the channel once, which should be imposible, + // 2. the read end is closed. + // I'm not sure how to differentiate between these two cases, or really + // if we need to do anything to handle either. + let _ = trailer_writer.send(Ok(trailers)); + + break; + } + + assert!(frame.is_data(), "frame wasn't data"); + + let data = frame.into_data().unwrap(); + + // If the receiver no longer exists, thats ok - in that case we want to keep the + // loop running to relieve backpressure, so we get to the trailers. + let _ = body_writer.send(Ok(data)).await; + } + }); + + HostIncomingBody { + worker, + stream: Some(HostIncomingBodyStream::new(body_receiver)), + trailers, + } + } +} + +pub struct HostIncomingBody { + pub worker: AbortOnDropJoinHandle<()>, + pub stream: Option, + pub trailers: oneshot::Receiver>, +} + +impl HostIncomingBody { + pub fn into_future_trailers(self) -> HostFutureTrailers { + HostFutureTrailers { + _worker: self.worker, + state: HostFutureTrailersState::Waiting(self.trailers), + } + } +} + +pub struct HostIncomingBodyStream { + pub open: bool, + pub receiver: mpsc::Receiver>, + pub buffer: Bytes, + pub error: Option, +} + +impl HostIncomingBodyStream { + fn new(receiver: mpsc::Receiver>) -> Self { + Self { + open: true, + receiver, + buffer: Bytes::new(), + error: None, + } + } +} + +#[async_trait::async_trait] +impl HostInputStream for HostIncomingBodyStream { + fn read(&mut self, size: usize) -> anyhow::Result<(Bytes, StreamState)> { + use mpsc::error::TryRecvError; + + if !self.buffer.is_empty() { + let len = size.min(self.buffer.len()); + let chunk = self.buffer.split_to(len); + return Ok((chunk, StreamState::Open)); + } + + if let Some(e) = self.error.take() { + return Err(StreamRuntimeError::from(e).into()); + } + + if !self.open { + return Ok((Bytes::new(), StreamState::Closed)); + } + + match self.receiver.try_recv() { + Ok(Ok(mut bytes)) => { + let len = bytes.len().min(size); + let chunk = bytes.split_to(len); + if !bytes.is_empty() { + self.buffer = bytes; + } + + return Ok((chunk, StreamState::Open)); + } + + Ok(Err(e)) => { + self.open = false; + return Err(StreamRuntimeError::from(e).into()); + } + + Err(TryRecvError::Empty) => { + return Ok((Bytes::new(), StreamState::Open)); + } + + Err(TryRecvError::Disconnected) => { + self.open = false; + return Ok((Bytes::new(), StreamState::Closed)); + } + } + } + + async fn ready(&mut self) -> anyhow::Result<()> { + if !self.buffer.is_empty() { + return Ok(()); + } + + if !self.open { + return Ok(()); + } + + match self.receiver.recv().await { + Some(Ok(bytes)) => self.buffer = bytes, + + Some(Err(e)) => { + self.error = Some(e); + self.open = false; + } + + None => self.open = false, + } + + Ok(()) + } +} + +pub struct HostFutureTrailers { + _worker: AbortOnDropJoinHandle<()>, + pub state: HostFutureTrailersState, +} + +pub enum HostFutureTrailersState { + Waiting(oneshot::Receiver>), + Done(Result), +} + +impl HostFutureTrailers { + pub async fn ready(&mut self) -> anyhow::Result<()> { + if let HostFutureTrailersState::Waiting(rx) = &mut self.state { + let result = match rx.await { + Ok(Ok(headers)) => Ok(FieldMap::from(headers)), + Ok(Err(e)) => Err(types::Error::ProtocolError(format!("hyper error: {e:?}"))), + Err(_) => Err(types::Error::ProtocolError( + "stream hung up before trailers were received".to_string(), + )), + }; + self.state = HostFutureTrailersState::Done(result); + } + Ok(()) + } +} + +pub type HyperBody = BoxBody; + +pub struct HostOutgoingBody { + pub body_output_stream: Option>, + pub trailers_sender: Option>, +} + +impl HostOutgoingBody { + pub fn new() -> (Self, HyperBody) { + use http_body_util::BodyExt; + use hyper::{ + body::{Body, Frame}, + HeaderMap, + }; + use std::task::{Context, Poll}; + use tokio::sync::oneshot::error::RecvError; + struct BodyImpl { + body_receiver: mpsc::Receiver, + trailers_receiver: Option>, + } + impl Body for BodyImpl { + type Data = Bytes; + type Error = Infallible; + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.as_mut().body_receiver.poll_recv(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(frame)) => Poll::Ready(Some(Ok(Frame::data(frame)))), + + // This means that the `body_sender` end of the channel has been dropped. + Poll::Ready(None) => { + if let Some(mut trailers_receiver) = self.as_mut().trailers_receiver.take() + { + match Pin::new(&mut trailers_receiver).poll(cx) { + Poll::Pending => { + self.as_mut().trailers_receiver = Some(trailers_receiver); + Poll::Pending + } + Poll::Ready(Ok(trailers)) => { + Poll::Ready(Some(Ok(Frame::trailers(trailers)))) + } + Poll::Ready(Err(RecvError { .. })) => Poll::Ready(None), + } + } else { + Poll::Ready(None) + } + } + } + } + } + + let (body_sender, body_receiver) = mpsc::channel(1); + let (trailers_sender, trailers_receiver) = oneshot::channel(); + let body_impl = BodyImpl { + body_receiver, + trailers_receiver: Some(trailers_receiver), + } + .boxed(); + ( + Self { + // TODO: this capacity constant is arbitrary, and should be configurable + body_output_stream: Some(Box::new(BodyWriteStream::new(1024 * 1024, body_sender))), + trailers_sender: Some(trailers_sender), + }, + body_impl, + ) + } +} + +// copied in from preview2::write_stream + +#[derive(Debug)] +struct WorkerState { + alive: bool, + items: std::collections::VecDeque, + write_budget: usize, + flush_pending: bool, + error: Option, +} + +impl WorkerState { + fn check_error(&mut self) -> Result<(), OutputStreamError> { + if let Some(e) = self.error.take() { + return Err(OutputStreamError::LastOperationFailed(e)); + } + if !self.alive { + return Err(OutputStreamError::Closed); + } + Ok(()) + } +} + +struct Worker { + state: Mutex, + new_work: tokio::sync::Notify, + write_ready_changed: tokio::sync::Notify, +} + +enum Job { + Flush, + Write(Bytes), +} + +enum WriteStatus<'a> { + Done(Result), + Pending(tokio::sync::futures::Notified<'a>), +} + +impl Worker { + fn new(write_budget: usize) -> Self { + Self { + state: Mutex::new(WorkerState { + alive: true, + items: std::collections::VecDeque::new(), + write_budget, + flush_pending: false, + error: None, + }), + new_work: tokio::sync::Notify::new(), + write_ready_changed: tokio::sync::Notify::new(), + } + } + fn check_write(&self) -> WriteStatus<'_> { + let mut state = self.state(); + if let Err(e) = state.check_error() { + return WriteStatus::Done(Err(e)); + } + + if state.flush_pending || state.write_budget == 0 { + return WriteStatus::Pending(self.write_ready_changed.notified()); + } + + WriteStatus::Done(Ok(state.write_budget)) + } + fn state(&self) -> std::sync::MutexGuard { + self.state.lock().unwrap() + } + fn pop(&self) -> Option { + let mut state = self.state(); + if state.items.is_empty() { + if state.flush_pending { + return Some(Job::Flush); + } + } else if let Some(bytes) = state.items.pop_front() { + return Some(Job::Write(bytes)); + } + + None + } + fn report_error(&self, e: std::io::Error) { + { + let mut state = self.state(); + state.alive = false; + state.error = Some(e.into()); + state.flush_pending = false; + } + self.write_ready_changed.notify_waiters(); + } + + async fn work(&self, writer: mpsc::Sender) { + loop { + let notified = self.new_work.notified(); + while let Some(job) = self.pop() { + match job { + Job::Flush => { + self.state().flush_pending = false; + } + + Job::Write(bytes) => { + tracing::debug!("worker writing: {bytes:?}"); + let len = bytes.len(); + match writer.send(bytes).await { + Err(_) => { + self.report_error(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "Outgoing stream body reader has dropped", + )); + return; + } + Ok(_) => { + self.state().write_budget += len; + } + } + } + } + + self.write_ready_changed.notify_waiters(); + } + + notified.await; + } + } +} + +/// Provides a [`HostOutputStream`] impl from a [`tokio::sync::mpsc::Sender`]. +pub struct BodyWriteStream { + worker: Arc, + _join_handle: preview2::AbortOnDropJoinHandle<()>, +} + +impl BodyWriteStream { + /// Create a [`BodyWriteStream`]. + pub fn new(write_budget: usize, writer: mpsc::Sender) -> Self { + let worker = Arc::new(Worker::new(write_budget)); + + let w = Arc::clone(&worker); + let join_handle = preview2::spawn(async move { w.work(writer).await }); + + BodyWriteStream { + worker, + _join_handle: join_handle, + } + } +} + +#[async_trait::async_trait] +impl HostOutputStream for BodyWriteStream { + fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { + let mut state = self.worker.state(); + state.check_error()?; + if state.flush_pending { + return Err(OutputStreamError::Trap(anyhow!( + "write not permitted while flush pending" + ))); + } + match state.write_budget.checked_sub(bytes.len()) { + Some(remaining_budget) => { + state.write_budget = remaining_budget; + state.items.push_back(bytes); + } + None => return Err(OutputStreamError::Trap(anyhow!("write exceeded budget"))), + } + drop(state); + self.worker.new_work.notify_waiters(); + Ok(()) + } + fn flush(&mut self) -> Result<(), OutputStreamError> { + let mut state = self.worker.state(); + state.check_error()?; + + state.flush_pending = true; + self.worker.new_work.notify_waiters(); + + Ok(()) + } + + async fn write_ready(&mut self) -> Result { + loop { + match self.worker.check_write() { + WriteStatus::Done(r) => return r, + WriteStatus::Pending(notifier) => notifier.await, + } + } + } +} diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index e181ef143e0d..570d229cf7b2 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -1,122 +1,45 @@ -use crate::bindings::http::types::{ - FutureIncomingResponse, OutgoingRequest, RequestOptions, Scheme, +use crate::bindings::http::{ + outgoing_handler, + types::{FutureIncomingResponse, OutgoingRequest, RequestOptions, Scheme}, }; -use crate::types::{ActiveFields, ActiveFuture, ActiveResponse, HttpResponse, TableHttpExt}; +use crate::types::{HostFutureIncomingResponse, IncomingResponseInternal, TableHttpExt}; use crate::WasiHttpView; use anyhow::Context; -use bytes::{Bytes, BytesMut}; -use http_body_util::{BodyExt, Empty, Full}; -use hyper::{Method, Request}; -#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] -use std::sync::Arc; +use bytes::Bytes; +use http_body_util::{BodyExt, Empty}; +use hyper::Method; use std::time::Duration; use tokio::net::TcpStream; use tokio::time::timeout; -#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] -use tokio_rustls::rustls::{self, OwnedTrustAnchor}; -use wasmtime_wasi::preview2::{StreamState, TableStreamExt}; +use wasmtime_wasi::preview2; -#[async_trait::async_trait] -impl crate::bindings::http::outgoing_handler::Host for T { - async fn handle( +impl outgoing_handler::Host for T { + fn handle( &mut self, request_id: OutgoingRequest, options: Option, - ) -> wasmtime::Result { - let future = ActiveFuture::new(request_id, options); - let future_id = self - .table_mut() - .push_future(Box::new(future)) - .context("[handle] pushing future")?; - Ok(future_id) - } -} - -#[cfg(feature = "sync")] -pub mod sync { - use crate::bindings::http::outgoing_handler::{ - Host as AsyncHost, RequestOptions as AsyncRequestOptions, - }; - use crate::bindings::sync::http::types::{ - FutureIncomingResponse, OutgoingRequest, RequestOptions, - }; - use crate::WasiHttpView; - use wasmtime_wasi::preview2::in_tokio; - - // same boilerplate everywhere, converting between two identical types with different - // definition sites. one day wasmtime-wit-bindgen will make all this unnecessary - impl From for AsyncRequestOptions { - fn from(other: RequestOptions) -> Self { - Self { - connect_timeout_ms: other.connect_timeout_ms, - first_byte_timeout_ms: other.first_byte_timeout_ms, - between_bytes_timeout_ms: other.between_bytes_timeout_ms, - } - } - } - - impl crate::bindings::sync::http::outgoing_handler::Host for T { - fn handle( - &mut self, - request_id: OutgoingRequest, - options: Option, - ) -> wasmtime::Result { - in_tokio(async { AsyncHost::handle(self, request_id, options.map(|v| v.into())).await }) - } - } -} - -fn port_for_scheme(scheme: &Option) -> &str { - match scheme { - Some(s) => match s { - Scheme::Http => ":80", - Scheme::Https => ":443", - // This should never happen. - _ => panic!("unsupported scheme!"), - }, - None => ":443", - } -} + ) -> wasmtime::Result> { + let connect_timeout = Duration::from_millis( + options + .and_then(|opts| opts.connect_timeout_ms) + .unwrap_or(600 * 1000) as u64, + ); -#[async_trait::async_trait] -pub trait WasiHttpViewExt { - async fn handle_async( - &mut self, - request_id: OutgoingRequest, - options: Option, - ) -> wasmtime::Result; -} + let first_byte_timeout = Duration::from_millis( + options + .and_then(|opts| opts.first_byte_timeout_ms) + .unwrap_or(600 * 1000) as u64, + ); -#[async_trait::async_trait] -impl WasiHttpViewExt for T { - async fn handle_async( - &mut self, - request_id: OutgoingRequest, - options: Option, - ) -> wasmtime::Result { - tracing::debug!("preparing outgoing request"); - let opts = options.unwrap_or( - // TODO: Configurable defaults here? - RequestOptions { - connect_timeout_ms: Some(600 * 1000), - first_byte_timeout_ms: Some(600 * 1000), - between_bytes_timeout_ms: Some(600 * 1000), - }, + let between_bytes_timeout = Duration::from_millis( + options + .and_then(|opts| opts.between_bytes_timeout_ms) + .unwrap_or(600 * 1000) as u64, ); - let connect_timeout = - Duration::from_millis(opts.connect_timeout_ms.unwrap_or(600 * 1000).into()); - let first_bytes_timeout = - Duration::from_millis(opts.first_byte_timeout_ms.unwrap_or(600 * 1000).into()); - let between_bytes_timeout = - Duration::from_millis(opts.between_bytes_timeout_ms.unwrap_or(600 * 1000).into()); - let request = self - .table() - .get_request(request_id) - .context("[handle_async] getting request")?; - tracing::debug!("http request retrieved from table"); + let req = self.table().delete_outgoing_request(request_id)?; - let method = match request.method() { + let method = match req.method { crate::bindings::http::types::Method::Get => Method::GET, crate::bindings::http::types::Method::Head => Method::HEAD, crate::bindings::http::types::Method::Post => Method::POST, @@ -126,214 +49,155 @@ impl WasiHttpViewExt for T { crate::bindings::http::types::Method::Options => Method::OPTIONS, crate::bindings::http::types::Method::Trace => Method::TRACE, crate::bindings::http::types::Method::Patch => Method::PATCH, - crate::bindings::http::types::Method::Other(s) => { - return Err(crate::bindings::http::types::Error::InvalidUrl(format!( - "unknown method {}", - s - )) - .into()); + crate::bindings::http::types::Method::Other(method) => { + return Ok(Err(outgoing_handler::Error::Invalid(format!( + "unknown method {method}" + )))); } }; - let scheme = match request.scheme().as_ref().unwrap_or(&Scheme::Https) { - Scheme::Http => "http://", - Scheme::Https => "https://", - Scheme::Other(s) => { - return Err(crate::bindings::http::types::Error::InvalidUrl(format!( - "unsupported scheme {}", - s - )) - .into()); + let (use_tls, scheme, port) = match req.scheme.unwrap_or(Scheme::Https) { + Scheme::Http => (false, "http://", 80), + Scheme::Https => (true, "https://", 443), + Scheme::Other(scheme) => { + return Ok(Err(outgoing_handler::Error::Invalid(format!( + "unsupported scheme {scheme}" + )))) } }; - // Largely adapted from https://hyper.rs/guides/1/client/basic/ - let authority = match request.authority().find(":") { - Some(_) => request.authority().to_owned(), - None => request.authority().to_owned() + port_for_scheme(request.scheme()), + let authority = if req.authority.find(':').is_some() { + req.authority.clone() + } else { + format!("{}:{port}", req.authority) }; - let tcp_stream = TcpStream::connect(authority.clone()).await?; - let mut sender = if scheme == "https://" { - tracing::debug!("initiating client connection client with TLS"); - #[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] - { - //TODO: uncomment this code and make the tls implementation a feature decision. - //let connector = tokio_native_tls::native_tls::TlsConnector::builder().build()?; - //let connector = tokio_native_tls::TlsConnector::from(connector); - //let host = authority.split(":").next().unwrap_or(&authority); - //let stream = connector.connect(&host, stream).await?; - // derived from https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/client/src/main.rs - let mut root_cert_store = rustls::RootCertStore::empty(); - root_cert_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map( - |ta| { - OwnedTrustAnchor::from_subject_spki_name_constraints( - ta.subject, - ta.spki, - ta.name_constraints, - ) - }, - )); - let config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_cert_store) - .with_no_client_auth(); - let connector = tokio_rustls::TlsConnector::from(Arc::new(config)); - let mut parts = authority.split(":"); - let host = parts.next().unwrap_or(&authority); - let domain = rustls::ServerName::try_from(host)?; - let stream = connector.connect(domain, tcp_stream).await.map_err(|e| { - crate::bindings::http::types::Error::ProtocolError(e.to_string()) - })?; + let mut builder = hyper::Request::builder() + .method(method) + .uri(format!("{scheme}{authority}{}", req.path_with_query)) + .header(hyper::header::HOST, &authority); - let t = timeout( - connect_timeout, - hyper::client::conn::http1::handshake(stream), - ) - .await?; - let (s, conn) = t?; - tokio::task::spawn(async move { - if let Err(err) = conn.await { - tracing::debug!("[host/client] Connection failed: {:?}", err); - } - }); - s - } - #[cfg(any(target_arch = "riscv64", target_arch = "s390x"))] - return Err(crate::bindings::http::types::Error::UnexpectedError( - "unsupported architecture for SSL".to_string(), - )); - } else { - tracing::debug!("initiating client connection without TLS"); - let t = timeout( - connect_timeout, - hyper::client::conn::http1::handshake(tcp_stream), - ) - .await?; - let (s, conn) = t?; - tokio::task::spawn(async move { - if let Err(err) = conn.await { - tracing::debug!("[host/client] Connection failed: {:?}", err); - } - }); - s - }; + for (k, v) in req.headers.iter() { + builder = builder.header(k, v); + } - let url = scheme.to_owned() + &request.authority() + &request.path_with_query(); + let body = req.body.unwrap_or_else(|| Empty::::new().boxed()); - tracing::debug!("request to url {:?}", &url); - let mut call = Request::builder() - .method(method) - .uri(url) - .header(hyper::header::HOST, request.authority()); + let request = builder.body(body).map_err(http_protocol_error)?; - if let Some(headers) = request.headers() { - for (key, val) in self - .table() - .get_fields(headers) - .context("[handle_async] getting request headers")? - .iter() - { - for item in val { - call = call.header(key, item.clone()); + let handle = preview2::spawn(async move { + let tcp_stream = TcpStream::connect(authority.clone()) + .await + .map_err(invalid_url)?; + + let (mut sender, worker) = if use_tls { + #[cfg(any(target_arch = "riscv64", target_arch = "s390x"))] + { + anyhow::bail!(crate::bindings::http::types::Error::UnexpectedError( + "unsupported architecture for SSL".to_string(), + )); } - } - } - let mut response = ActiveResponse::new(); - let body = match request.body() { - Some(id) => { - let table = self.table_mut(); - let stream = table - .get_stream(id) - .context("[handle_async] getting stream")?; - let input_stream = table - .get_input_stream_mut(stream.incoming()) - .context("[handle_async] getting mutable input stream")?; - let mut bytes = BytesMut::new(); - let mut eof = StreamState::Open; - while eof != StreamState::Closed { - let (chunk, state) = input_stream.read(4096)?; - eof = if chunk.is_empty() { - StreamState::Closed - } else { - state - }; - bytes.extend_from_slice(&chunk[..]); + #[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] + { + use tokio_rustls::rustls::OwnedTrustAnchor; + + // derived from https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/client/src/main.rs + let mut root_cert_store = rustls::RootCertStore::empty(); + root_cert_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map( + |ta| { + OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject, + ta.spki, + ta.name_constraints, + ) + }, + )); + let config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + let connector = tokio_rustls::TlsConnector::from(std::sync::Arc::new(config)); + let mut parts = authority.split(":"); + let host = parts.next().unwrap_or(&authority); + let domain = rustls::ServerName::try_from(host)?; + let stream = connector.connect(domain, tcp_stream).await.map_err(|e| { + crate::bindings::http::types::Error::ProtocolError(e.to_string()) + })?; + + let (sender, conn) = timeout( + connect_timeout, + hyper::client::conn::http1::handshake(stream), + ) + .await + .map_err(|_| timeout_error("connection"))??; + + let worker = preview2::spawn(async move { + conn.await.context("hyper connection failed")?; + Ok::<_, anyhow::Error>(()) + }); + + (sender, worker) } - Full::::new(bytes.freeze()).boxed() - } - None => Empty::::new().boxed(), - }; - let request = call.body(body)?; - tracing::trace!("hyper request {:?}", request); - let t = timeout(first_bytes_timeout, sender.send_request(request)).await?; - let mut res = t?; - tracing::trace!("hyper response {:?}", res); - response.status = res.status().as_u16(); + } else { + let (sender, conn) = timeout( + connect_timeout, + // TODO: we should plumb the builder through the http context, and use it here + hyper::client::conn::http1::handshake(tcp_stream), + ) + .await + .map_err(|_| timeout_error("connection"))??; - let mut map = ActiveFields::new(); - for (key, value) in res.headers().iter() { - let mut vec = Vec::new(); - vec.push(value.as_bytes().to_vec()); - map.insert(key.as_str().to_string(), vec); - } - let headers = self - .table_mut() - .push_fields(Box::new(map)) - .context("[handle_async] pushing response headers")?; - response.set_headers(headers); + let worker = preview2::spawn(async move { + conn.await.context("hyper connection failed")?; + Ok::<_, anyhow::Error>(()) + }); - let mut buf: Vec = Vec::new(); - while let Some(next) = timeout(between_bytes_timeout, res.frame()).await? { - let frame = next?; - tracing::debug!("response body next frame"); - if let Some(chunk) = frame.data_ref() { - tracing::trace!("response body chunk size {:?}", chunk.len()); - buf.extend_from_slice(chunk); - } - if let Some(trailers) = frame.trailers_ref() { - tracing::debug!("response trailers present"); - let mut map = ActiveFields::new(); - for (name, value) in trailers.iter() { - let key = name.to_string(); - match map.get_mut(&key) { - Some(vec) => vec.push(value.as_bytes().to_vec()), - None => { - let mut vec = Vec::new(); - vec.push(value.as_bytes().to_vec()); - map.insert(key, vec); - } - }; - } - let trailers = self - .table_mut() - .push_fields(Box::new(map)) - .context("[handle_async] pushing response trailers")?; - response.set_trailers(trailers); - tracing::debug!("http trailers saved to table"); - } - } + (sender, worker) + }; - let response_id = self - .table_mut() - .push_response(Box::new(response)) - .context("[handle_async] pushing response")?; - tracing::trace!("response body {:?}", std::str::from_utf8(&buf[..]).unwrap()); - let (stream_id, stream) = self - .table_mut() - .push_stream(Bytes::from(buf), response_id) - .await - .context("[handle_async] pushing stream")?; - let response = self - .table_mut() - .get_response_mut(response_id) - .context("[handle_async] getting mutable response")?; - response.set_body(stream_id); - tracing::debug!("http response saved to table with id {:?}", response_id); + let resp = timeout(first_byte_timeout, sender.send_request(request)) + .await + .map_err(|_| timeout_error("first byte"))? + .map_err(hyper_protocol_error)?; - self.http_ctx_mut().streams.insert(stream_id, stream); + Ok(IncomingResponseInternal { + resp, + worker, + between_bytes_timeout, + }) + }); - Ok(response_id) + let fut = self + .table() + .push_future_incoming_response(HostFutureIncomingResponse::new(handle))?; + + Ok(Ok(fut)) } } + +fn timeout_error(kind: &str) -> anyhow::Error { + anyhow::anyhow!(crate::bindings::http::types::Error::TimeoutError(format!( + "{kind} timed out" + ))) +} + +fn http_protocol_error(e: http::Error) -> anyhow::Error { + anyhow::anyhow!(crate::bindings::http::types::Error::ProtocolError( + e.to_string() + )) +} + +fn hyper_protocol_error(e: hyper::Error) -> anyhow::Error { + anyhow::anyhow!(crate::bindings::http::types::Error::ProtocolError( + e.to_string() + )) +} + +fn invalid_url(e: std::io::Error) -> anyhow::Error { + // TODO: DNS errors show up as a Custom io error, what subset of errors should we consider for + // InvalidUrl here? + anyhow::anyhow!(crate::bindings::http::types::Error::InvalidUrl( + e.to_string() + )) +} diff --git a/crates/wasi-http/src/incoming_handler.rs b/crates/wasi-http/src/incoming_handler.rs index e65a88e27d53..3fd26bf7d0af 100644 --- a/crates/wasi-http/src/incoming_handler.rs +++ b/crates/wasi-http/src/incoming_handler.rs @@ -1,9 +1,8 @@ use crate::bindings::http::types::{IncomingRequest, ResponseOutparam}; use crate::WasiHttpView; -#[async_trait::async_trait] impl crate::bindings::http::incoming_handler::Host for T { - async fn handle( + fn handle( &mut self, _request: IncomingRequest, _response_out: ResponseOutparam, @@ -11,21 +10,3 @@ impl crate::bindings::http::incoming_handler::Host for T { anyhow::bail!("unimplemented: [incoming_handler] handle") } } - -#[cfg(feature = "sync")] -pub mod sync { - use crate::bindings::http::incoming_handler::Host as AsyncHost; - use crate::bindings::sync::http::types::{IncomingRequest, ResponseOutparam}; - use crate::WasiHttpView; - use wasmtime_wasi::preview2::in_tokio; - - impl crate::bindings::sync::http::incoming_handler::Host for T { - fn handle( - &mut self, - request: IncomingRequest, - response_out: ResponseOutparam, - ) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::handle(self, request, response_out).await }) - } - } -} diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index 7a99a0b79a8f..c6b072b1e60f 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -1,9 +1,8 @@ -pub use crate::http_impl::WasiHttpViewExt; pub use crate::types::{WasiHttpCtx, WasiHttpView}; use core::fmt::Formatter; use std::fmt::{self, Display}; -pub mod component_impl; +pub mod body; pub mod http_impl; pub mod incoming_handler; pub mod proxy; @@ -11,56 +10,22 @@ pub mod types; pub mod types_impl; pub mod bindings { - #[cfg(feature = "sync")] - pub mod sync { - pub(crate) mod _internal { - wasmtime::component::bindgen!({ - path: "wit", - interfaces: " - import wasi:http/incoming-handler - import wasi:http/outgoing-handler - import wasi:http/types - ", - tracing: true, - with: { - "wasi:io/streams": wasmtime_wasi::preview2::bindings::sync_io::io::streams, - "wasi:poll/poll": wasmtime_wasi::preview2::bindings::sync_io::poll::poll, - } - }); - } - pub use self::_internal::wasi::http; - } - - pub(crate) mod _internal_rest { - wasmtime::component::bindgen!({ - path: "wit", - interfaces: " + wasmtime::component::bindgen!({ + path: "wit", + interfaces: " import wasi:http/incoming-handler import wasi:http/outgoing-handler import wasi:http/types ", - tracing: true, - async: true, - with: { - "wasi:io/streams": wasmtime_wasi::preview2::bindings::io::streams, - "wasi:poll/poll": wasmtime_wasi::preview2::bindings::poll::poll, - } - }); - } - - pub use self::_internal_rest::wasi::http; -} - -pub fn add_to_linker(linker: &mut wasmtime::Linker) -> anyhow::Result<()> { - crate::component_impl::add_component_to_linker::(linker, |t| t) -} - -pub mod sync { - use crate::types::WasiHttpView; + tracing: true, + async: false, + with: { + "wasi:io/streams": wasmtime_wasi::preview2::bindings::io::streams, + "wasi:poll/poll": wasmtime_wasi::preview2::bindings::poll::poll, + } + }); - pub fn add_to_linker(linker: &mut wasmtime::Linker) -> anyhow::Result<()> { - crate::component_impl::sync::add_component_to_linker::(linker, |t| t) - } + pub use wasi::http; } impl std::error::Error for crate::bindings::http::types::Error {} diff --git a/crates/wasi-http/src/proxy.rs b/crates/wasi-http/src/proxy.rs index 8cd78d700265..9f156fd09a14 100644 --- a/crates/wasi-http/src/proxy.rs +++ b/crates/wasi-http/src/proxy.rs @@ -4,7 +4,7 @@ use wasmtime_wasi::preview2; wasmtime::component::bindgen!({ world: "wasi:http/proxy", tracing: true, - async: true, + async: false, with: { "wasi:cli/stderr": preview2::bindings::cli::stderr, "wasi:cli/stdin": preview2::bindings::cli::stdin, @@ -30,39 +30,3 @@ where bindings::http::types::add_to_linker(l, |t| t)?; Ok(()) } - -#[cfg(feature = "sync")] -pub mod sync { - use crate::{bindings, WasiHttpView}; - use wasmtime_wasi::preview2; - - wasmtime::component::bindgen!({ - world: "wasi:http/proxy", - tracing: true, - async: false, - with: { - "wasi:cli/stderr": preview2::bindings::cli::stderr, - "wasi:cli/stdin": preview2::bindings::cli::stdin, - "wasi:cli/stdout": preview2::bindings::cli::stdout, - "wasi:clocks/monotonic-clock": preview2::bindings::clocks::monotonic_clock, - "wasi:clocks/timezone": preview2::bindings::clocks::timezone, - "wasi:clocks/wall-clock": preview2::bindings::clocks::wall_clock, - "wasi:http/incoming-handler": bindings::sync::http::incoming_handler, - "wasi:http/outgoing-handler": bindings::sync::http::outgoing_handler, - "wasi:http/types": bindings::sync::http::types, - "wasi:io/streams": preview2::bindings::sync_io::io::streams, - "wasi:poll/poll": preview2::bindings::sync_io::poll::poll, - "wasi:random/random": preview2::bindings::random::random, - }, - }); - - pub fn add_to_linker(l: &mut wasmtime::component::Linker) -> anyhow::Result<()> - where - T: WasiHttpView + bindings::sync::http::types::Host, - { - bindings::sync::http::incoming_handler::add_to_linker(l, |t| t)?; - bindings::sync::http::outgoing_handler::add_to_linker(l, |t| t)?; - bindings::sync::http::types::add_to_linker(l, |t| t)?; - Ok(()) - } -} diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index 050a83470947..3b73f7f3fd49 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -1,449 +1,302 @@ //! Implements the base structure (i.e. [WasiHttpCtx]) that will provide the //! implementation of the wasi-http API. -use crate::bindings::http::types::{ - IncomingStream, Method, OutgoingRequest, OutgoingStream, RequestOptions, Scheme, +use crate::{ + bindings::http::types::{FutureTrailers, IncomingBody, Method, OutgoingBody, Scheme}, + body::{ + HostFutureTrailers, HostIncomingBody, HostIncomingBodyBuilder, HostOutgoingBody, HyperBody, + }, }; -use bytes::Bytes; use std::any::Any; -use std::collections::HashMap; -use std::ops::{Deref, DerefMut}; -use wasmtime_wasi::preview2::{ - pipe::{AsyncReadStream, AsyncWriteStream}, - HostInputStream, HostOutputStream, Table, TableError, TableStreamExt, WasiView, -}; - -const MAX_BUF_SIZE: usize = 65_536; +use std::pin::Pin; +use std::task; +use wasmtime_wasi::preview2::{AbortOnDropJoinHandle, Table, TableError}; /// Capture the state necessary for use in the wasi-http API implementation. -pub struct WasiHttpCtx { - pub streams: HashMap, -} - -impl WasiHttpCtx { - /// Make a new context from the default state. - pub fn new() -> Self { - Self { - streams: HashMap::new(), - } - } -} +pub struct WasiHttpCtx; -pub trait WasiHttpView: WasiView { - fn http_ctx(&self) -> &WasiHttpCtx; - fn http_ctx_mut(&mut self) -> &mut WasiHttpCtx; +pub trait WasiHttpView: Send { + fn ctx(&mut self) -> &mut WasiHttpCtx; + fn table(&mut self) -> &mut Table; } -pub type FieldsMap = HashMap>>; - -#[derive(Clone, Debug)] -pub struct ActiveRequest { - pub active: bool, +pub struct HostOutgoingRequest { pub method: Method, pub scheme: Option, pub path_with_query: String, pub authority: String, - pub headers: Option, - pub body: Option, + pub headers: FieldMap, + pub body: Option, } -pub trait HttpRequest: Send + Sync { - fn new() -> Self - where - Self: Sized; - - fn as_any(&self) -> &dyn Any; - - fn method(&self) -> &Method; - fn scheme(&self) -> &Option; - fn path_with_query(&self) -> &str; - fn authority(&self) -> &str; - fn headers(&self) -> Option; - fn set_headers(&mut self, headers: u32); - fn body(&self) -> Option; - fn set_body(&mut self, body: u32); +pub struct HostIncomingResponse { + pub status: u16, + pub headers: FieldMap, + pub body: Option, + pub worker: AbortOnDropJoinHandle>, } -impl HttpRequest for ActiveRequest { - fn new() -> Self { - Self { - active: false, - method: Method::Get, - scheme: Some(Scheme::Http), - path_with_query: "".to_string(), - authority: "".to_string(), - headers: None, - body: None, - } - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn method(&self) -> &Method { - &self.method - } +pub type FieldMap = hyper::HeaderMap; - fn scheme(&self) -> &Option { - &self.scheme - } - - fn path_with_query(&self) -> &str { - &self.path_with_query - } - - fn authority(&self) -> &str { - &self.authority - } - - fn headers(&self) -> Option { - self.headers - } - - fn set_headers(&mut self, headers: u32) { - self.headers = Some(headers); - } - - fn body(&self) -> Option { - self.body - } +pub enum HostFields { + Ref { + parent: u32, - fn set_body(&mut self, body: u32) { - self.body = Some(body); - } + // NOTE: there's not failure in the result here because we assume that HostFields will + // always be registered as a child of the entry with the `parent` id. This ensures that the + // entry will always exist while this `HostFields::Ref` entry exists in the table, thus we + // don't need to account for failure when fetching the fields ref from the parent. + get_fields: for<'a> fn(elem: &'a mut (dyn Any + 'static)) -> &'a mut FieldMap, + }, + Owned { + fields: FieldMap, + }, } -#[derive(Clone, Debug)] -pub struct ActiveResponse { - pub active: bool, - pub status: u16, - pub headers: Option, - pub body: Option, - pub trailers: Option, +pub struct IncomingResponseInternal { + pub resp: hyper::Response, + pub worker: AbortOnDropJoinHandle>, + pub between_bytes_timeout: std::time::Duration, } -pub trait HttpResponse: Send + Sync { - fn new() -> Self - where - Self: Sized; +type FutureIncomingResponseHandle = AbortOnDropJoinHandle>; - fn as_any(&self) -> &dyn Any; - - fn status(&self) -> u16; - fn headers(&self) -> Option; - fn set_headers(&mut self, headers: u32); - fn body(&self) -> Option; - fn set_body(&mut self, body: u32); - fn trailers(&self) -> Option; - fn set_trailers(&mut self, trailers: u32); +pub enum HostFutureIncomingResponse { + Pending(FutureIncomingResponseHandle), + Ready(anyhow::Result), + Consumed, } -impl HttpResponse for ActiveResponse { - fn new() -> Self { - Self { - active: false, - status: 0, - headers: None, - body: None, - trailers: None, - } +impl HostFutureIncomingResponse { + pub fn new(handle: FutureIncomingResponseHandle) -> Self { + Self::Pending(handle) } - fn as_any(&self) -> &dyn Any { - self + pub fn is_ready(&self) -> bool { + matches!(self, Self::Ready(_)) } - fn status(&self) -> u16 { - self.status + pub fn unwrap_ready(self) -> anyhow::Result { + match self { + Self::Ready(res) => res, + Self::Pending(_) | Self::Consumed => { + panic!("unwrap_ready called on a pending HostFutureIncomingResponse") + } + } } +} - fn headers(&self) -> Option { - self.headers +impl std::future::Future for HostFutureIncomingResponse { + type Output = anyhow::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + let s = self.get_mut(); + match s { + Self::Pending(ref mut handle) => match Pin::new(handle).poll(cx) { + task::Poll::Pending => task::Poll::Pending, + task::Poll::Ready(r) => { + *s = Self::Ready(r); + task::Poll::Ready(Ok(())) + } + }, + + Self::Consumed | Self::Ready(_) => task::Poll::Ready(Ok(())), + } } +} - fn set_headers(&mut self, headers: u32) { - self.headers = Some(headers); - } +#[async_trait::async_trait] +pub trait TableHttpExt { + fn push_outgoing_response(&mut self, request: HostOutgoingRequest) -> Result; + fn get_outgoing_request(&self, id: u32) -> Result<&HostOutgoingRequest, TableError>; + fn get_outgoing_request_mut(&mut self, id: u32) + -> Result<&mut HostOutgoingRequest, TableError>; + fn delete_outgoing_request(&mut self, id: u32) -> Result; + + fn push_incoming_response(&mut self, response: HostIncomingResponse) + -> Result; + fn get_incoming_response(&self, id: u32) -> Result<&HostIncomingResponse, TableError>; + fn get_incoming_response_mut( + &mut self, + id: u32, + ) -> Result<&mut HostIncomingResponse, TableError>; + fn delete_incoming_response(&mut self, id: u32) -> Result; - fn body(&self) -> Option { - self.body - } + fn push_fields(&mut self, fields: HostFields) -> Result; + fn get_fields(&mut self, id: u32) -> Result<&mut FieldMap, TableError>; + fn delete_fields(&mut self, id: u32) -> Result; - fn set_body(&mut self, body: u32) { - self.body = Some(body); - } + fn push_future_incoming_response( + &mut self, + response: HostFutureIncomingResponse, + ) -> Result; + fn get_future_incoming_response( + &self, + id: u32, + ) -> Result<&HostFutureIncomingResponse, TableError>; + fn get_future_incoming_response_mut( + &mut self, + id: u32, + ) -> Result<&mut HostFutureIncomingResponse, TableError>; + fn delete_future_incoming_response( + &mut self, + id: u32, + ) -> Result; - fn trailers(&self) -> Option { - self.trailers - } + fn push_incoming_body(&mut self, body: HostIncomingBody) -> Result; + fn get_incoming_body(&mut self, id: IncomingBody) -> Result<&mut HostIncomingBody, TableError>; + fn delete_incoming_body(&mut self, id: IncomingBody) -> Result; - fn set_trailers(&mut self, trailers: u32) { - self.trailers = Some(trailers); - } -} + fn push_outgoing_body(&mut self, body: HostOutgoingBody) -> Result; + fn get_outgoing_body(&mut self, id: OutgoingBody) -> Result<&mut HostOutgoingBody, TableError>; + fn delete_outgoing_body(&mut self, id: OutgoingBody) -> Result; -#[derive(Clone, Debug)] -pub struct ActiveFuture { - request_id: OutgoingRequest, - options: Option, - response_id: Option, - pollable_id: Option, + fn push_future_trailers( + &mut self, + trailers: HostFutureTrailers, + ) -> Result; + fn get_future_trailers( + &mut self, + id: FutureTrailers, + ) -> Result<&mut HostFutureTrailers, TableError>; + fn delete_future_trailers( + &mut self, + id: FutureTrailers, + ) -> Result; } -impl ActiveFuture { - pub fn new(request_id: OutgoingRequest, options: Option) -> Self { - Self { - request_id, - options, - response_id: None, - pollable_id: None, - } - } - - pub fn request_id(&self) -> u32 { - self.request_id - } - - pub fn options(&self) -> Option { - self.options - } - - pub fn response_id(&self) -> Option { - self.response_id +#[async_trait::async_trait] +impl TableHttpExt for Table { + fn push_outgoing_response(&mut self, request: HostOutgoingRequest) -> Result { + self.push(Box::new(request)) } - - pub fn set_response_id(&mut self, response_id: u32) { - self.response_id = Some(response_id); + fn get_outgoing_request(&self, id: u32) -> Result<&HostOutgoingRequest, TableError> { + self.get::(id) } - - pub fn pollable_id(&self) -> Option { - self.pollable_id + fn get_outgoing_request_mut( + &mut self, + id: u32, + ) -> Result<&mut HostOutgoingRequest, TableError> { + self.get_mut::(id) } - - pub fn set_pollable_id(&mut self, pollable_id: u32) { - self.pollable_id = Some(pollable_id); + fn delete_outgoing_request(&mut self, id: u32) -> Result { + let req = self.delete::(id)?; + Ok(req) } -} - -#[derive(Clone, Debug)] -pub struct ActiveFields(HashMap>>); -impl ActiveFields { - pub fn new() -> Self { - Self(FieldsMap::new()) + fn push_incoming_response( + &mut self, + response: HostIncomingResponse, + ) -> Result { + self.push(Box::new(response)) } -} - -pub trait HttpFields: Send + Sync { - fn as_any(&self) -> &dyn Any; -} - -impl HttpFields for ActiveFields { - fn as_any(&self) -> &dyn Any { - self + fn get_incoming_response(&self, id: u32) -> Result<&HostIncomingResponse, TableError> { + self.get::(id) } -} - -impl Deref for ActiveFields { - type Target = FieldsMap; - fn deref(&self) -> &FieldsMap { - &self.0 + fn get_incoming_response_mut( + &mut self, + id: u32, + ) -> Result<&mut HostIncomingResponse, TableError> { + self.get_mut::(id) } -} - -impl DerefMut for ActiveFields { - fn deref_mut(&mut self) -> &mut FieldsMap { - &mut self.0 + fn delete_incoming_response(&mut self, id: u32) -> Result { + let resp = self.delete::(id)?; + Ok(resp) } -} - -#[derive(Clone, Debug)] -pub struct Stream { - input_id: u32, - output_id: u32, - parent_id: u32, -} -impl Stream { - pub fn new(input_id: u32, output_id: u32, parent_id: u32) -> Self { - Self { - input_id, - output_id, - parent_id, + fn push_fields(&mut self, fields: HostFields) -> Result { + match fields { + HostFields::Ref { parent, .. } => self.push_child(Box::new(fields), parent), + HostFields::Owned { .. } => self.push(Box::new(fields)), } } + fn get_fields(&mut self, id: u32) -> Result<&mut FieldMap, TableError> { + let fields = self.get_mut::(id)?; + if let HostFields::Ref { parent, get_fields } = *fields { + let entry = self.get_any_mut(parent)?; + return Ok(get_fields(entry)); + } - pub fn incoming(&self) -> IncomingStream { - self.input_id + match self.get_mut::(id)? { + HostFields::Owned { fields } => Ok(fields), + // NB: ideally the `if let` above would go here instead. That makes + // the borrow-checker unhappy. Unclear why. If you, dear reader, can + // refactor this to remove the `unreachable!` please do. + HostFields::Ref { .. } => unreachable!(), + } } - - pub fn outgoing(&self) -> OutgoingStream { - self.output_id + fn delete_fields(&mut self, id: u32) -> Result { + let fields = self.delete::(id)?; + Ok(fields) } - pub fn parent_id(&self) -> u32 { - self.parent_id - } -} - -#[async_trait::async_trait] -pub trait TableHttpExt { - fn push_request(&mut self, request: Box) -> Result; - fn get_request(&self, id: u32) -> Result<&(dyn HttpRequest), TableError>; - fn get_request_mut(&mut self, id: u32) -> Result<&mut Box, TableError>; - fn delete_request(&mut self, id: u32) -> Result<(), TableError>; - - fn push_response(&mut self, response: Box) -> Result; - fn get_response(&self, id: u32) -> Result<&dyn HttpResponse, TableError>; - fn get_response_mut(&mut self, id: u32) -> Result<&mut Box, TableError>; - fn delete_response(&mut self, id: u32) -> Result<(), TableError>; - - fn push_future(&mut self, future: Box) -> Result; - fn get_future(&self, id: u32) -> Result<&ActiveFuture, TableError>; - fn get_future_mut(&mut self, id: u32) -> Result<&mut Box, TableError>; - fn delete_future(&mut self, id: u32) -> Result<(), TableError>; - - fn push_fields(&mut self, fields: Box) -> Result; - fn get_fields(&self, id: u32) -> Result<&ActiveFields, TableError>; - fn get_fields_mut(&mut self, id: u32) -> Result<&mut Box, TableError>; - fn delete_fields(&mut self, id: u32) -> Result<(), TableError>; - - async fn push_stream( + fn push_future_incoming_response( &mut self, - content: Bytes, - parent: u32, - ) -> Result<(u32, Stream), TableError>; - fn get_stream(&self, id: u32) -> Result<&Stream, TableError>; - fn get_stream_mut(&mut self, id: u32) -> Result<&mut Box, TableError>; - fn delete_stream(&mut self, id: u32) -> Result<(), TableError>; -} - -#[async_trait::async_trait] -impl TableHttpExt for Table { - fn push_request(&mut self, request: Box) -> Result { - self.push(Box::new(request)) - } - fn get_request(&self, id: u32) -> Result<&dyn HttpRequest, TableError> { - self.get::>(id).map(|f| f.as_ref()) - } - fn get_request_mut(&mut self, id: u32) -> Result<&mut Box, TableError> { - self.get_mut::>(id) - } - fn delete_request(&mut self, id: u32) -> Result<(), TableError> { - self.delete::>(id).map(|_old| ()) - } - - fn push_response(&mut self, response: Box) -> Result { + response: HostFutureIncomingResponse, + ) -> Result { self.push(Box::new(response)) } - fn get_response(&self, id: u32) -> Result<&dyn HttpResponse, TableError> { - self.get::>(id).map(|f| f.as_ref()) + fn get_future_incoming_response( + &self, + id: u32, + ) -> Result<&HostFutureIncomingResponse, TableError> { + self.get::(id) } - fn get_response_mut(&mut self, id: u32) -> Result<&mut Box, TableError> { - self.get_mut::>(id) + fn get_future_incoming_response_mut( + &mut self, + id: u32, + ) -> Result<&mut HostFutureIncomingResponse, TableError> { + self.get_mut::(id) } - fn delete_response(&mut self, id: u32) -> Result<(), TableError> { - self.delete::>(id).map(|_old| ()) + fn delete_future_incoming_response( + &mut self, + id: u32, + ) -> Result { + self.delete(id) } - fn push_future(&mut self, future: Box) -> Result { - self.push(Box::new(future)) - } - fn get_future(&self, id: u32) -> Result<&ActiveFuture, TableError> { - self.get::>(id).map(|f| f.as_ref()) + fn push_incoming_body(&mut self, body: HostIncomingBody) -> Result { + self.push(Box::new(body)) } - fn get_future_mut(&mut self, id: u32) -> Result<&mut Box, TableError> { - self.get_mut::>(id) - } - fn delete_future(&mut self, id: u32) -> Result<(), TableError> { - self.delete::>(id).map(|_old| ()) + + fn get_incoming_body(&mut self, id: IncomingBody) -> Result<&mut HostIncomingBody, TableError> { + self.get_mut(id) } - fn push_fields(&mut self, fields: Box) -> Result { - self.push(Box::new(fields)) + fn delete_incoming_body(&mut self, id: IncomingBody) -> Result { + self.delete(id) } - fn get_fields(&self, id: u32) -> Result<&ActiveFields, TableError> { - self.get::>(id).map(|f| f.as_ref()) + + fn push_outgoing_body(&mut self, body: HostOutgoingBody) -> Result { + self.push(Box::new(body)) } - fn get_fields_mut(&mut self, id: u32) -> Result<&mut Box, TableError> { - self.get_mut::>(id) + + fn get_outgoing_body(&mut self, id: OutgoingBody) -> Result<&mut HostOutgoingBody, TableError> { + self.get_mut(id) } - fn delete_fields(&mut self, id: u32) -> Result<(), TableError> { - self.delete::>(id).map(|_old| ()) + + fn delete_outgoing_body(&mut self, id: OutgoingBody) -> Result { + self.delete(id) } - async fn push_stream( + fn push_future_trailers( &mut self, - mut content: Bytes, - parent: u32, - ) -> Result<(u32, Stream), TableError> { - tracing::debug!("preparing http body stream"); - let (a, b) = tokio::io::duplex(MAX_BUF_SIZE); - let (_, write_stream) = tokio::io::split(a); - let (read_stream, _) = tokio::io::split(b); - let input_stream = AsyncReadStream::new(read_stream); - // TODO: more informed budget here - let mut output_stream = AsyncWriteStream::new(4096, write_stream); - - while !content.is_empty() { - let permit = output_stream - .write_ready() - .await - .map_err(|_| TableError::NotPresent)?; - - let len = content.len().min(permit); - let chunk = content.split_to(len); - - output_stream - .write(chunk) - .map_err(|_| TableError::NotPresent)?; - } - output_stream.flush().map_err(|_| TableError::NotPresent)?; - let _readiness = tokio::time::timeout( - std::time::Duration::from_millis(10), - output_stream.write_ready(), - ) - .await; - - let input_stream = Box::new(input_stream); - let output_id = self.push_output_stream(Box::new(output_stream))?; - let input_id = self.push_input_stream(input_stream)?; - let stream = Stream::new(input_id, output_id, parent); - let cloned_stream = stream.clone(); - let stream_id = self.push(Box::new(Box::new(stream)))?; - tracing::trace!( - "http body stream details ( id: {:?}, input: {:?}, output: {:?} )", - stream_id, - input_id, - output_id - ); - Ok((stream_id, cloned_stream)) + trailers: HostFutureTrailers, + ) -> Result { + self.push(Box::new(trailers)) } - fn get_stream(&self, id: u32) -> Result<&Stream, TableError> { - self.get::>(id).map(|f| f.as_ref()) - } - fn get_stream_mut(&mut self, id: u32) -> Result<&mut Box, TableError> { - self.get_mut::>(id) - } - fn delete_stream(&mut self, id: u32) -> Result<(), TableError> { - let stream = self.get_stream_mut(id)?; - let input_stream = stream.incoming(); - let output_stream = stream.outgoing(); - self.delete::>(id).map(|_old| ())?; - self.delete::>(input_stream) - .map(|_old| ())?; - self.delete::>(output_stream) - .map(|_old| ()) - } -} -#[cfg(test)] -mod test { - use super::*; + fn get_future_trailers( + &mut self, + id: FutureTrailers, + ) -> Result<&mut HostFutureTrailers, TableError> { + self.get_mut(id) + } - #[test] - fn instantiate() { - WasiHttpCtx::new(); + fn delete_future_trailers( + &mut self, + id: FutureTrailers, + ) -> Result { + self.delete(id) } } diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index c044cd1bdde9..1b2326f3553b 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -1,191 +1,156 @@ use crate::bindings::http::types::{ - Error, Fields, FutureIncomingResponse, Headers, IncomingRequest, IncomingResponse, - IncomingStream, Method, OutgoingRequest, OutgoingResponse, OutgoingStream, ResponseOutparam, + Error, Fields, FutureIncomingResponse, FutureTrailers, Headers, IncomingBody, IncomingRequest, + IncomingResponse, Method, OutgoingBody, OutgoingRequest, OutgoingResponse, ResponseOutparam, Scheme, StatusCode, Trailers, }; -use crate::http_impl::WasiHttpViewExt; -use crate::types::{ActiveFields, ActiveRequest, HttpRequest, TableHttpExt}; +use crate::body::{HostFutureTrailers, HostFutureTrailersState}; +use crate::types::FieldMap; use crate::WasiHttpView; -use anyhow::{anyhow, bail, Context}; -use bytes::Bytes; -use wasmtime_wasi::preview2::{bindings::poll::poll::Pollable, HostPollable, TablePollableExt}; - -#[async_trait::async_trait] -impl crate::bindings::http::types::Host for T { - async fn drop_fields(&mut self, fields: Fields) -> wasmtime::Result<()> { - self.table_mut() +use crate::{ + body::{HostIncomingBodyBuilder, HostOutgoingBody}, + types::{ + HostFields, HostFutureIncomingResponse, HostIncomingResponse, HostOutgoingRequest, + TableHttpExt, + }, +}; +use anyhow::{anyhow, Context}; +use std::any::Any; +use wasmtime_wasi::preview2::{ + bindings::io::streams::{InputStream, OutputStream}, + bindings::poll::poll::Pollable, + HostPollable, PollableFuture, TablePollableExt, TableStreamExt, +}; + +impl crate::bindings::http::types::Host for T { + fn drop_fields(&mut self, fields: Fields) -> wasmtime::Result<()> { + self.table() .delete_fields(fields) .context("[drop_fields] deleting fields")?; Ok(()) } - async fn new_fields(&mut self, entries: Vec<(String, String)>) -> wasmtime::Result { - let mut map = ActiveFields::new(); - for (key, value) in entries { - map.insert(key, vec![value.clone().into_bytes()]); + fn new_fields(&mut self, entries: Vec<(String, Vec)>) -> wasmtime::Result { + let mut map = hyper::HeaderMap::new(); + + for (header, value) in entries { + let header = hyper::header::HeaderName::from_bytes(header.as_bytes())?; + let value = hyper::header::HeaderValue::from_bytes(&value)?; + map.append(header, value); } let id = self - .table_mut() - .push_fields(Box::new(map)) + .table() + .push_fields(HostFields::Owned { fields: map }) .context("[new_fields] pushing fields")?; Ok(id) } - async fn fields_get(&mut self, fields: Fields, name: String) -> wasmtime::Result>> { + fn fields_get(&mut self, fields: Fields, name: String) -> wasmtime::Result>> { let res = self - .table_mut() + .table() .get_fields(fields) .context("[fields_get] getting fields")? - .get(&name) - .ok_or_else(|| anyhow!("key not found: {name}"))? - .clone(); + .get_all(hyper::header::HeaderName::from_bytes(name.as_bytes())?) + .into_iter() + .map(|val| val.as_bytes().to_owned()) + .collect(); Ok(res) } - async fn fields_set( + fn fields_set( &mut self, fields: Fields, name: String, - value: Vec>, + values: Vec>, ) -> wasmtime::Result<()> { - match self.table_mut().get_fields_mut(fields) { - Ok(m) => { - m.insert(name, value.clone()); - Ok(()) - } - Err(_) => bail!("fields not found"), + let m = self.table().get_fields(fields)?; + + let header = hyper::header::HeaderName::from_bytes(name.as_bytes())?; + + m.remove(&header); + for value in values { + let value = hyper::header::HeaderValue::from_bytes(&value)?; + m.append(&header, value); } + + Ok(()) } - async fn fields_delete(&mut self, fields: Fields, name: String) -> wasmtime::Result<()> { - match self.table_mut().get_fields_mut(fields) { - Ok(m) => m.remove(&name), - Err(_) => None, - }; + fn fields_delete(&mut self, fields: Fields, name: String) -> wasmtime::Result<()> { + let m = self.table().get_fields(fields)?; + let header = hyper::header::HeaderName::from_bytes(name.as_bytes())?; + m.remove(header); Ok(()) } - async fn fields_append( + fn fields_append( &mut self, fields: Fields, name: String, value: Vec, ) -> wasmtime::Result<()> { let m = self - .table_mut() - .get_fields_mut(fields) + .table() + .get_fields(fields) .context("[fields_append] getting mutable fields")?; - match m.get_mut(&name) { - Some(v) => v.push(value), - None => { - let mut vec = std::vec::Vec::new(); - vec.push(value); - m.insert(name, vec); - } - }; + let header = hyper::header::HeaderName::from_bytes(name.as_bytes())?; + let value = hyper::header::HeaderValue::from_bytes(&value)?; + m.append(header, value); Ok(()) } - async fn fields_entries(&mut self, fields: Fields) -> wasmtime::Result)>> { - let field_map = match self.table().get_fields(fields) { - Ok(m) => m.iter(), - Err(_) => bail!("fields not found."), - }; - let mut result = Vec::new(); - for (name, value) in field_map { - result.push((name.clone(), value[0].clone())); - } + fn fields_entries(&mut self, fields: Fields) -> wasmtime::Result)>> { + let fields = self.table().get_fields(fields)?; + let result = fields + .iter() + .map(|(name, value)| (name.as_str().to_owned(), value.as_bytes().to_owned())) + .collect(); Ok(result) } - async fn fields_clone(&mut self, fields: Fields) -> wasmtime::Result { - let table = self.table_mut(); - let m = table + fn fields_clone(&mut self, fields: Fields) -> wasmtime::Result { + let fields = self + .table() .get_fields(fields) - .context("[fields_clone] getting fields")?; - let id = table - .push_fields(Box::new(m.clone())) + .context("[fields_clone] getting fields")? + .clone(); + let id = self + .table() + .push_fields(HostFields::Owned { fields }) .context("[fields_clone] pushing fields")?; Ok(id) } - async fn finish_incoming_stream( - &mut self, - stream_id: IncomingStream, - ) -> wasmtime::Result> { - for (_, stream) in self.http_ctx().streams.iter() { - if stream_id == stream.incoming() { - let response = self - .table() - .get_response(stream.parent_id()) - .context("[finish_incoming_stream] get trailers from response")?; - return Ok(response.trailers()); - } - } - bail!("unknown stream!") + fn drop_incoming_request(&mut self, _request: IncomingRequest) -> wasmtime::Result<()> { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn finish_outgoing_stream( - &mut self, - _s: OutgoingStream, - _trailers: Option, - ) -> wasmtime::Result<()> { - bail!("unimplemented: finish_outgoing_stream") - } - async fn drop_incoming_request(&mut self, _request: IncomingRequest) -> wasmtime::Result<()> { - bail!("unimplemented: drop_incoming_request") - } - async fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { - let r = self - .table_mut() - .get_request(request) - .context("[drop_outgoing_request] getting fields")?; - - // Cleanup dependent resources - let body = r.body(); - let headers = r.headers(); - if let Some(b) = body { - self.table_mut().delete_stream(b).ok(); - } - if let Some(h) = headers { - self.table_mut().delete_fields(h).ok(); - } - - self.table_mut() - .delete_request(request) - .context("[drop_outgoing_request] deleting request")?; - + fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { + self.table().delete_outgoing_request(request)?; Ok(()) } - async fn incoming_request_method( - &mut self, - _request: IncomingRequest, - ) -> wasmtime::Result { - bail!("unimplemented: incoming_request_method") + fn incoming_request_method(&mut self, _request: IncomingRequest) -> wasmtime::Result { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn incoming_request_path_with_query( + fn incoming_request_path_with_query( &mut self, _request: IncomingRequest, ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_path") + todo!("we haven't implemented the server side of wasi-http yet") } - async fn incoming_request_scheme( + fn incoming_request_scheme( &mut self, _request: IncomingRequest, ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_scheme") + todo!("we haven't implemented the server side of wasi-http yet") } - async fn incoming_request_authority( + fn incoming_request_authority( &mut self, _request: IncomingRequest, ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_authority") + todo!("we haven't implemented the server side of wasi-http yet") } - async fn incoming_request_headers( - &mut self, - _request: IncomingRequest, - ) -> wasmtime::Result { - bail!("unimplemented: incoming_request_headers") + fn incoming_request_headers(&mut self, _request: IncomingRequest) -> wasmtime::Result { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn incoming_request_consume( + fn incoming_request_consume( &mut self, _request: IncomingRequest, - ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_consume") + ) -> wasmtime::Result> { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn new_outgoing_request( + fn new_outgoing_request( &mut self, method: Method, path_with_query: Option, @@ -193,507 +158,305 @@ impl crate::bindings::http::types::Host for T authority: Option, headers: Headers, ) -> wasmtime::Result { - let mut req = ActiveRequest::new(); - req.path_with_query = path_with_query.unwrap_or("".to_string()); - req.authority = authority.unwrap_or("".to_string()); - req.method = method; - req.headers = Some(headers); - req.scheme = scheme; + let headers = self.table().get_fields(headers)?.clone(); + + let req = HostOutgoingRequest { + path_with_query: path_with_query.unwrap_or("".to_string()), + authority: authority.unwrap_or("".to_string()), + method, + headers, + scheme, + body: None, + }; let id = self - .table_mut() - .push_request(Box::new(req)) + .table() + .push_outgoing_response(req) .context("[new_outgoing_request] pushing request")?; Ok(id) } - async fn outgoing_request_write( + fn outgoing_request_write( &mut self, request: OutgoingRequest, - ) -> wasmtime::Result> { + ) -> wasmtime::Result> { let req = self .table() - .get_request(request) + .get_outgoing_request_mut(request) .context("[outgoing_request_write] getting request")?; - let stream_id = if let Some(stream_id) = req.body() { - stream_id - } else { - let (new, stream) = self - .table_mut() - .push_stream(Bytes::new(), request) - .await - .expect("[outgoing_request_write] valid output stream"); - self.http_ctx_mut().streams.insert(new, stream); - let req = self - .table_mut() - .get_request_mut(request) - .expect("[outgoing_request_write] request to be found"); - req.set_body(new); - new - }; - let stream = self - .table() - .get_stream(stream_id) - .context("[outgoing_request_write] getting stream")?; - Ok(Ok(stream.outgoing())) + + if req.body.is_some() { + return Ok(Err(())); + } + + let (host_body, hyper_body) = HostOutgoingBody::new(); + + req.body = Some(hyper_body); + + // The output stream will necessarily outlive the request, because we could be still + // writing to the stream after `outgoing-handler.handle` is called. + let outgoing_body = self.table().push_outgoing_body(host_body)?; + + Ok(Ok(outgoing_body)) } - async fn drop_response_outparam( - &mut self, - _response: ResponseOutparam, - ) -> wasmtime::Result<()> { - bail!("unimplemented: drop_response_outparam") + fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn set_response_outparam( + fn set_response_outparam( &mut self, _outparam: ResponseOutparam, _response: Result, - ) -> wasmtime::Result> { - bail!("unimplemented: set_response_outparam") + ) -> wasmtime::Result<()> { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { - let r = self - .table() - .get_response(response) - .context("[drop_incoming_response] getting response")?; - - // Cleanup dependent resources - let body = r.body(); - let headers = r.headers(); - if let Some(id) = body { - let stream = self - .table() - .get_stream(id) - .context("[drop_incoming_response] getting stream")?; - let incoming_id = stream.incoming(); - if let Some(trailers) = self.finish_incoming_stream(incoming_id).await? { - self.table_mut() - .delete_fields(trailers) - .context("[drop_incoming_response] deleting trailers") - .unwrap_or_else(|_| ()); - } - self.table_mut().delete_stream(id).ok(); - } - if let Some(h) = headers { - self.table_mut().delete_fields(h).ok(); - } - - self.table_mut() - .delete_response(response) + fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { + self.table() + .delete_incoming_response(response) .context("[drop_incoming_response] deleting response")?; Ok(()) } - async fn drop_outgoing_response( - &mut self, - _response: OutgoingResponse, - ) -> wasmtime::Result<()> { - bail!("unimplemented: drop_outgoing_response") + fn drop_outgoing_response(&mut self, _response: OutgoingResponse) -> wasmtime::Result<()> { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn incoming_response_status( + fn incoming_response_status( &mut self, response: IncomingResponse, ) -> wasmtime::Result { let r = self .table() - .get_response(response) + .get_incoming_response(response) .context("[incoming_response_status] getting response")?; - Ok(r.status()) + Ok(r.status) } - async fn incoming_response_headers( + fn incoming_response_headers( &mut self, response: IncomingResponse, ) -> wasmtime::Result { - let r = self + let _ = self .table() - .get_response(response) + .get_incoming_response_mut(response) .context("[incoming_response_headers] getting response")?; - Ok(r.headers().unwrap_or(0 as Headers)) + + fn get_fields(elem: &mut dyn Any) -> &mut FieldMap { + &mut elem.downcast_mut::().unwrap().headers + } + + let id = self.table().push_fields(HostFields::Ref { + parent: response, + get_fields, + })?; + + Ok(id) } - async fn incoming_response_consume( + fn incoming_response_consume( &mut self, response: IncomingResponse, - ) -> wasmtime::Result> { - let table = self.table_mut(); + ) -> wasmtime::Result> { + let table = self.table(); let r = table - .get_response(response) + .get_incoming_response_mut(response) .context("[incoming_response_consume] getting response")?; - Ok(Ok(r - .body() - .map(|id| { - table - .get_stream(id) - .map(|stream| stream.incoming()) - .expect("[incoming_response_consume] response body stream") - }) - .unwrap_or(0 as IncomingStream))) - } - async fn new_outgoing_response( + + match r.body.take() { + Some(builder) => { + let id = self.table().push_incoming_body(builder.build())?; + Ok(Ok(id)) + } + + None => Ok(Err(())), + } + } + fn drop_future_trailers(&mut self, id: FutureTrailers) -> wasmtime::Result<()> { + self.table() + .delete_future_trailers(id) + .context("[drop future-trailers] deleting future-trailers")?; + Ok(()) + } + + fn future_trailers_subscribe(&mut self, index: FutureTrailers) -> wasmtime::Result { + // Eagerly force errors about the validity of the index. + let _ = self.table().get_future_trailers(index)?; + + fn make_future(elem: &mut dyn Any) -> PollableFuture { + Box::pin(elem.downcast_mut::().unwrap().ready()) + } + + let id = self + .table() + .push_host_pollable(HostPollable::TableEntry { index, make_future })?; + + Ok(id) + } + + fn future_trailers_get( + &mut self, + id: FutureTrailers, + ) -> wasmtime::Result>> { + let trailers = self.table().get_future_trailers(id)?; + match &trailers.state { + HostFutureTrailersState::Waiting(_) => return Ok(None), + HostFutureTrailersState::Done(Err(e)) => return Ok(Some(Err(e.clone()))), + HostFutureTrailersState::Done(Ok(_)) => {} + } + + fn get_fields(elem: &mut dyn Any) -> &mut FieldMap { + let trailers = elem.downcast_mut::().unwrap(); + match &mut trailers.state { + HostFutureTrailersState::Done(Ok(e)) => e, + _ => unreachable!(), + } + } + + let hdrs = self.table().push_fields(HostFields::Ref { + parent: id, + get_fields, + })?; + + Ok(Some(Ok(hdrs))) + } + + fn new_outgoing_response( &mut self, _status_code: StatusCode, _headers: Headers, ) -> wasmtime::Result { - bail!("unimplemented: new_outgoing_response") + todo!("we haven't implemented the server side of wasi-http yet") } - async fn outgoing_response_write( + fn outgoing_response_write( &mut self, _response: OutgoingResponse, - ) -> wasmtime::Result> { - bail!("unimplemented: outgoing_response_write") + ) -> wasmtime::Result> { + todo!("we haven't implemented the server side of wasi-http yet") } - async fn drop_future_incoming_response( + fn drop_future_incoming_response( &mut self, - future: FutureIncomingResponse, + id: FutureIncomingResponse, ) -> wasmtime::Result<()> { - self.table_mut() - .delete_future(future) - .context("[drop_future_incoming_response] deleting future")?; + let _ = self.table().delete_future_incoming_response(id)?; Ok(()) } - async fn future_incoming_response_get( + fn future_incoming_response_get( &mut self, - future: FutureIncomingResponse, - ) -> wasmtime::Result>> { - let f = self - .table() - .get_future(future) - .context("[future_incoming_response_get] getting future")?; - Ok(match f.pollable_id() { - Some(_) => { - let result = match f.response_id() { - Some(id) => Ok(id), - None => { - let response = self.handle_async(f.request_id(), f.options()).await; - match response { - Ok(id) => { - tracing::debug!( - "including response id to future incoming response" - ); - let future_mut = self.table_mut().get_future_mut(future)?; - future_mut.set_response_id(id); - tracing::trace!( - "future incoming response details {:?}", - *future_mut - ); - } - _ => {} - } - response - } - }; - Some(result) - } - None => None, - }) + id: FutureIncomingResponse, + ) -> wasmtime::Result, ()>>> { + let resp = self.table().get_future_incoming_response_mut(id)?; + + match resp { + HostFutureIncomingResponse::Pending(_) => return Ok(None), + HostFutureIncomingResponse::Consumed => return Ok(Some(Err(()))), + HostFutureIncomingResponse::Ready(_) => {} + } + + let resp = + match std::mem::replace(resp, HostFutureIncomingResponse::Consumed).unwrap_ready() { + Err(e) => { + // Trapping if it's not possible to downcast to an wasi-http error + let e = e.downcast::()?; + return Ok(Some(Ok(Err(e)))); + } + + Ok(resp) => resp, + }; + + let (parts, body) = resp.resp.into_parts(); + + let resp = self.table().push_incoming_response(HostIncomingResponse { + status: parts.status.as_u16(), + headers: FieldMap::from(parts.headers), + body: Some(HostIncomingBodyBuilder { + body, + between_bytes_timeout: resp.between_bytes_timeout, + }), + worker: resp.worker, + })?; + + Ok(Some(Ok(Ok(resp)))) } - async fn listen_to_future_incoming_response( + fn listen_to_future_incoming_response( &mut self, - future: FutureIncomingResponse, + id: FutureIncomingResponse, ) -> wasmtime::Result { - let f = self - .table() - .get_future(future) - .context("[listen_to_future_incoming_response] getting future")?; - Ok(match f.pollable_id() { - Some(pollable_id) => pollable_id, - None => { - tracing::debug!("including pollable id to future incoming response"); - let pollable = - HostPollable::Closure(Box::new(|| Box::pin(futures::future::ready(Ok(()))))); - let pollable_id = self - .table_mut() - .push_host_pollable(pollable) - .context("[listen_to_future_incoming_response] pushing host pollable")?; - let f = self - .table_mut() - .get_future_mut(future) - .context("[listen_to_future_incoming_response] getting future")?; - f.set_pollable_id(pollable_id); - tracing::trace!("future incoming response details {:?}", *f); - pollable_id - } - }) - } -} + let _ = self.table().get_future_incoming_response(id)?; -#[cfg(feature = "sync")] -pub mod sync { - use crate::bindings::http::types::{ - Error as AsyncError, Host as AsyncHost, Method as AsyncMethod, Scheme as AsyncScheme, - }; - use crate::bindings::sync::http::types::{ - Error, Fields, FutureIncomingResponse, Headers, IncomingRequest, IncomingResponse, - IncomingStream, Method, OutgoingRequest, OutgoingResponse, OutgoingStream, - ResponseOutparam, Scheme, StatusCode, Trailers, - }; - use crate::http_impl::WasiHttpViewExt; - use crate::WasiHttpView; - use wasmtime_wasi::preview2::{bindings::poll::poll::Pollable, in_tokio}; - - // same boilerplate everywhere, converting between two identical types with different - // definition sites. one day wasmtime-wit-bindgen will make all this unnecessary - impl From for Error { - fn from(other: AsyncError) -> Self { - match other { - AsyncError::InvalidUrl(v) => Self::InvalidUrl(v), - AsyncError::ProtocolError(v) => Self::ProtocolError(v), - AsyncError::TimeoutError(v) => Self::TimeoutError(v), - AsyncError::UnexpectedError(v) => Self::UnexpectedError(v), - } + fn make_future<'a>(elem: &'a mut dyn Any) -> PollableFuture<'a> { + Box::pin( + elem.downcast_mut::() + .expect("parent resource is HostFutureIncomingResponse"), + ) } + + let pollable = self.table().push_host_pollable(HostPollable::TableEntry { + index: id, + make_future, + })?; + + Ok(pollable) } - impl From for AsyncError { - fn from(other: Error) -> Self { - match other { - Error::InvalidUrl(v) => Self::InvalidUrl(v), - Error::ProtocolError(v) => Self::ProtocolError(v), - Error::TimeoutError(v) => Self::TimeoutError(v), - Error::UnexpectedError(v) => Self::UnexpectedError(v), - } + fn incoming_body_stream( + &mut self, + id: IncomingBody, + ) -> wasmtime::Result> { + let body = self.table().get_incoming_body(id)?; + + if let Some(stream) = body.stream.take() { + let stream = self.table().push_input_stream_child(Box::new(stream), id)?; + return Ok(Ok(stream)); } + + Ok(Err(())) } - impl From for Method { - fn from(other: AsyncMethod) -> Self { - match other { - AsyncMethod::Connect => Self::Connect, - AsyncMethod::Delete => Self::Delete, - AsyncMethod::Get => Self::Get, - AsyncMethod::Head => Self::Head, - AsyncMethod::Options => Self::Options, - AsyncMethod::Patch => Self::Patch, - AsyncMethod::Post => Self::Post, - AsyncMethod::Put => Self::Put, - AsyncMethod::Trace => Self::Trace, - AsyncMethod::Other(v) => Self::Other(v), - } - } + fn incoming_body_finish(&mut self, id: IncomingBody) -> wasmtime::Result { + let body = self.table().delete_incoming_body(id)?; + let trailers = self + .table() + .push_future_trailers(body.into_future_trailers())?; + Ok(trailers) } - impl From for AsyncMethod { - fn from(other: Method) -> Self { - match other { - Method::Connect => Self::Connect, - Method::Delete => Self::Delete, - Method::Get => Self::Get, - Method::Head => Self::Head, - Method::Options => Self::Options, - Method::Patch => Self::Patch, - Method::Post => Self::Post, - Method::Put => Self::Put, - Method::Trace => Self::Trace, - Method::Other(v) => Self::Other(v), - } - } + fn drop_incoming_body(&mut self, id: IncomingBody) -> wasmtime::Result<()> { + let _ = self.table().delete_incoming_body(id)?; + Ok(()) } - impl From for Scheme { - fn from(other: AsyncScheme) -> Self { - match other { - AsyncScheme::Http => Self::Http, - AsyncScheme::Https => Self::Https, - AsyncScheme::Other(v) => Self::Other(v), - } + fn outgoing_body_write( + &mut self, + id: OutgoingBody, + ) -> wasmtime::Result> { + let body = self.table().get_outgoing_body(id)?; + if let Some(stream) = body.body_output_stream.take() { + let id = self.table().push_output_stream_child(stream, id)?; + Ok(Ok(id)) + } else { + Ok(Err(())) } } - impl From for AsyncScheme { - fn from(other: Scheme) -> Self { - match other { - Scheme::Http => Self::Http, - Scheme::Https => Self::Https, - Scheme::Other(v) => Self::Other(v), - } + fn outgoing_body_write_trailers( + &mut self, + id: OutgoingBody, + ts: Trailers, + ) -> wasmtime::Result<()> { + let mut body = self.table().delete_outgoing_body(id)?; + let trailers = self.table().get_fields(ts)?.clone(); + + match body + .trailers_sender + .take() + // Should be unreachable - this is the only place we take the trailers sender, + // at the end of the HostOutgoingBody's lifetime + .ok_or_else(|| anyhow!("trailers_sender missing"))? + .send(trailers.into()) + { + Ok(()) => {} + Err(_) => {} // Ignoring failure: receiver died sending body, but we can't report that + // here. } + + Ok(()) } - impl crate::bindings::sync::http::types::Host for T { - fn drop_fields(&mut self, fields: Fields) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::drop_fields(self, fields).await }) - } - fn new_fields(&mut self, entries: Vec<(String, String)>) -> wasmtime::Result { - in_tokio(async { AsyncHost::new_fields(self, entries).await }) - } - fn fields_get(&mut self, fields: Fields, name: String) -> wasmtime::Result>> { - in_tokio(async { AsyncHost::fields_get(self, fields, name).await }) - } - fn fields_set( - &mut self, - fields: Fields, - name: String, - value: Vec>, - ) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::fields_set(self, fields, name, value).await }) - } - fn fields_delete(&mut self, fields: Fields, name: String) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::fields_delete(self, fields, name).await }) - } - fn fields_append( - &mut self, - fields: Fields, - name: String, - value: Vec, - ) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::fields_append(self, fields, name, value).await }) - } - fn fields_entries(&mut self, fields: Fields) -> wasmtime::Result)>> { - in_tokio(async { AsyncHost::fields_entries(self, fields).await }) - } - fn fields_clone(&mut self, fields: Fields) -> wasmtime::Result { - in_tokio(async { AsyncHost::fields_clone(self, fields).await }) - } - fn finish_incoming_stream( - &mut self, - stream_id: IncomingStream, - ) -> wasmtime::Result> { - in_tokio(async { AsyncHost::finish_incoming_stream(self, stream_id).await }) - } - fn finish_outgoing_stream( - &mut self, - stream: OutgoingStream, - trailers: Option, - ) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::finish_outgoing_stream(self, stream, trailers).await }) - } - fn drop_incoming_request(&mut self, request: IncomingRequest) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::drop_incoming_request(self, request).await }) - } - fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::drop_outgoing_request(self, request).await }) - } - fn incoming_request_method( - &mut self, - request: IncomingRequest, - ) -> wasmtime::Result { - in_tokio(async { AsyncHost::incoming_request_method(self, request).await }) - .map(Method::from) - } - fn incoming_request_path_with_query( - &mut self, - request: IncomingRequest, - ) -> wasmtime::Result> { - in_tokio(async { AsyncHost::incoming_request_path_with_query(self, request).await }) - } - fn incoming_request_scheme( - &mut self, - request: IncomingRequest, - ) -> wasmtime::Result> { - Ok( - in_tokio(async { AsyncHost::incoming_request_scheme(self, request).await })? - .map(Scheme::from), - ) - } - fn incoming_request_authority( - &mut self, - request: IncomingRequest, - ) -> wasmtime::Result> { - in_tokio(async { AsyncHost::incoming_request_authority(self, request).await }) - } - fn incoming_request_headers( - &mut self, - request: IncomingRequest, - ) -> wasmtime::Result { - in_tokio(async { AsyncHost::incoming_request_headers(self, request).await }) - } - fn incoming_request_consume( - &mut self, - request: IncomingRequest, - ) -> wasmtime::Result> { - in_tokio(async { AsyncHost::incoming_request_consume(self, request).await }) - } - fn new_outgoing_request( - &mut self, - method: Method, - path_with_query: Option, - scheme: Option, - authority: Option, - headers: Headers, - ) -> wasmtime::Result { - in_tokio(async { - AsyncHost::new_outgoing_request( - self, - method.into(), - path_with_query, - scheme.map(AsyncScheme::from), - authority, - headers, - ) - .await - }) - } - fn outgoing_request_write( - &mut self, - request: OutgoingRequest, - ) -> wasmtime::Result> { - in_tokio(async { AsyncHost::outgoing_request_write(self, request).await }) - } - fn drop_response_outparam(&mut self, response: ResponseOutparam) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::drop_response_outparam(self, response).await }) - } - fn set_response_outparam( - &mut self, - outparam: ResponseOutparam, - response: Result, - ) -> wasmtime::Result> { - in_tokio(async { - AsyncHost::set_response_outparam(self, outparam, response.map_err(AsyncError::from)) - .await - }) - } - fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::drop_incoming_response(self, response).await }) - } - fn drop_outgoing_response(&mut self, response: OutgoingResponse) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::drop_outgoing_response(self, response).await }) - } - fn incoming_response_status( - &mut self, - response: IncomingResponse, - ) -> wasmtime::Result { - in_tokio(async { AsyncHost::incoming_response_status(self, response).await }) - } - fn incoming_response_headers( - &mut self, - response: IncomingResponse, - ) -> wasmtime::Result { - in_tokio(async { AsyncHost::incoming_response_headers(self, response).await }) - } - fn incoming_response_consume( - &mut self, - response: IncomingResponse, - ) -> wasmtime::Result> { - in_tokio(async { AsyncHost::incoming_response_consume(self, response).await }) - } - fn new_outgoing_response( - &mut self, - status_code: StatusCode, - headers: Headers, - ) -> wasmtime::Result { - in_tokio(async { AsyncHost::new_outgoing_response(self, status_code, headers).await }) - } - fn outgoing_response_write( - &mut self, - response: OutgoingResponse, - ) -> wasmtime::Result> { - in_tokio(async { AsyncHost::outgoing_response_write(self, response).await }) - } - fn drop_future_incoming_response( - &mut self, - future: FutureIncomingResponse, - ) -> wasmtime::Result<()> { - in_tokio(async { AsyncHost::drop_future_incoming_response(self, future).await }) - } - fn future_incoming_response_get( - &mut self, - future: FutureIncomingResponse, - ) -> wasmtime::Result>> { - Ok( - in_tokio(async { AsyncHost::future_incoming_response_get(self, future).await })? - .map(|v| v.map_err(Error::from)), - ) - } - fn listen_to_future_incoming_response( - &mut self, - future: FutureIncomingResponse, - ) -> wasmtime::Result { - in_tokio(async { AsyncHost::listen_to_future_incoming_response(self, future).await }) - } + fn drop_outgoing_body(&mut self, id: OutgoingBody) -> wasmtime::Result<()> { + let _ = self.table().delete_outgoing_body(id)?; + Ok(()) } } diff --git a/crates/wasi-http/wit/deps/http/incoming-handler.wit b/crates/wasi-http/wit/deps/http/incoming-handler.wit index d0e270465593..ad8a43f8ccf0 100644 --- a/crates/wasi-http/wit/deps/http/incoming-handler.wit +++ b/crates/wasi-http/wit/deps/http/incoming-handler.wit @@ -12,13 +12,13 @@ interface incoming-handler { // The `handle` function takes an outparam instead of returning its response // so that the component may stream its response while streaming any other // request or response bodies. The callee MUST write a response to the - // `response-out` and then finish the response before returning. The `handle` + // `response-outparam` and then finish the response before returning. The `handle` // function is allowed to continue execution after finishing the response's // output stream. While this post-response execution is taken off the // critical path, since there is no return value, there is no way to report // its success or failure. handle: func( - request: incoming-request, - response-out: response-outparam + request: /* own */ incoming-request, + response-out: /* own */ response-outparam ) } diff --git a/crates/wasi-http/wit/deps/http/outgoing-handler.wit b/crates/wasi-http/wit/deps/http/outgoing-handler.wit index 06c8e469f95b..3e03327d742b 100644 --- a/crates/wasi-http/wit/deps/http/outgoing-handler.wit +++ b/crates/wasi-http/wit/deps/http/outgoing-handler.wit @@ -8,11 +8,20 @@ interface outgoing-handler { use types.{outgoing-request, request-options, future-incoming-response} + // FIXME: we would want to use the types.error here but there is a + // wasmtime-wit-bindgen bug that prevents us from using the same error in + // the two different interfaces, right now... + variant error { + invalid(string) + } + // The parameter and result types of the `handle` function allow the caller // to concurrently stream the bodies of the outgoing request and the incoming // response. + // Consumes the outgoing-request. Gives an error if the outgoing-request + // is invalid or cannot be satisfied by this handler. handle: func( - request: outgoing-request, + request: /* own */ outgoing-request, options: option - ) -> future-incoming-response + ) -> result } diff --git a/crates/wasi-http/wit/deps/http/types.wit b/crates/wasi-http/wit/deps/http/types.wit index 7b7b015529c0..821e15f96213 100644 --- a/crates/wasi-http/wit/deps/http/types.wit +++ b/crates/wasi-http/wit/deps/http/types.wit @@ -41,30 +41,28 @@ interface types { // fields = u32` type alias can be replaced by a proper `resource fields` // definition containing all the functions using the method syntactic sugar. type fields = u32 - drop-fields: func(fields: fields) - new-fields: func(entries: list>) -> fields - fields-get: func(fields: fields, name: string) -> list> - fields-set: func(fields: fields, name: string, value: list>) - fields-delete: func(fields: fields, name: string) - fields-append: func(fields: fields, name: string, value: list) - fields-entries: func(fields: fields) -> list>> - fields-clone: func(fields: fields) -> fields + drop-fields: func(fields: /* own */ fields) + // Multiple values for a header are multiple entries in the list with the + // same key. + new-fields: func(entries: list>>) -> fields + // Values off wire are not necessarily well formed, so they are given by + // list instead of string. + fields-get: func(fields: /* borrow */ fields, name: string) -> list> + // Values off wire are not necessarily well formed, so they are given by + // list instead of string. + fields-set: func(fields: /* borrow */ fields, name: string, value: list>) + fields-delete: func(fields: /* borrow */ fields, name: string) + fields-append: func(fields: /* borrow */ fields, name: string, value: list) + + // Values off wire are not necessarily well formed, so they are given by + // list instead of string. + fields-entries: func(fields: /* borrow */ fields) -> list>> + // Deep copy of all contents in a fields. + fields-clone: func(fields: /* borrow */ fields) -> fields type headers = fields type trailers = fields - // The following block defines stream types which corresponds to the HTTP - // standard Contents and Trailers. With Preview3, all of these fields can be - // replaced by a stream>. In the interim, we need to - // build on separate resource types defined by `wasi:io/streams`. The - // `finish-` functions emulate the stream's result value and MUST be called - // exactly once after the final read/write from/to the stream before dropping - // the stream. - type incoming-stream = input-stream - type outgoing-stream = output-stream - finish-incoming-stream: func(s: incoming-stream) -> option - finish-outgoing-stream: func(s: outgoing-stream, trailers: option) - // The following block defines the `incoming-request` and `outgoing-request` // resource types that correspond to HTTP standard Requests. Soon, when // resource types are added, the `u32` type aliases can be replaced by @@ -74,23 +72,30 @@ interface types { // above). The `consume` and `write` methods may only be called once (and // return failure thereafter). type incoming-request = u32 + drop-incoming-request: func(request: /* own */ incoming-request) + incoming-request-method: func(request: /* borrow */ incoming-request) -> method + incoming-request-path-with-query: func(request: /* borrow */ incoming-request) -> option + incoming-request-scheme: func(request: /* borrow */ incoming-request) -> option + incoming-request-authority: func(request: /* borrow */ incoming-request) -> option + + incoming-request-headers: func(request: /* borrow */ incoming-request) -> /* child */ headers + // Will return the input-stream child at most once. If called more than + // once, subsequent calls will return error. + incoming-request-consume: func(request: /* borrow */ incoming-request) -> result< /* child */ input-stream> + type outgoing-request = u32 - drop-incoming-request: func(request: incoming-request) - drop-outgoing-request: func(request: outgoing-request) - incoming-request-method: func(request: incoming-request) -> method - incoming-request-path-with-query: func(request: incoming-request) -> option - incoming-request-scheme: func(request: incoming-request) -> option - incoming-request-authority: func(request: incoming-request) -> option - incoming-request-headers: func(request: incoming-request) -> headers - incoming-request-consume: func(request: incoming-request) -> result + drop-outgoing-request: func(request: /* own */ outgoing-request) new-outgoing-request: func( method: method, path-with-query: option, scheme: option, authority: option, - headers: headers + headers: /* borrow */ headers ) -> outgoing-request - outgoing-request-write: func(request: outgoing-request) -> result + + // Will return the outgoing-body child at most once. If called more than + // once, subsequent calls will return error. + outgoing-request-write: func(request: /* borrow */ outgoing-request) -> result< /* child */ outgoing-body> // Additional optional parameters that can be set when making a request. record request-options { @@ -115,8 +120,8 @@ interface types { // (the `wasi:http/handler` interface used for both incoming and outgoing can // simply return a `stream`). type response-outparam = u32 - drop-response-outparam: func(response: response-outparam) - set-response-outparam: func(param: response-outparam, response: result) -> result + drop-response-outparam: func(response: /* own */ response-outparam) + set-response-outparam: func(param: /* own */ response-outparam, response: result< /* own */ outgoing-response, error>) // This type corresponds to the HTTP standard Status Code. type status-code = u16 @@ -129,27 +134,72 @@ interface types { // type (that uses the single `stream` type mentioned above). The `consume` and // `write` methods may only be called once (and return failure thereafter). type incoming-response = u32 + drop-incoming-response: func(response: /* own */ incoming-response) + incoming-response-status: func(response: /* borrow */ incoming-response) -> status-code + incoming-response-headers: func(response: /* borrow */ incoming-response) -> /* child */ headers + // May be called at most once. returns error if called additional times. + // TODO: make incoming-request-consume work the same way, giving a child + // incoming-body. + incoming-response-consume: func(response: /* borrow */ incoming-response) -> result + + type incoming-body = u32 + drop-incoming-body: func(this: /* own */ incoming-body) + + // returned input-stream is a child - the implementation may trap if + // incoming-body is dropped (or consumed by call to + // incoming-body-finish) before the input-stream is dropped. + // May be called at most once. returns error if called additional times. + incoming-body-stream: func(this: /* borrow */ incoming-body) -> + result + // takes ownership of incoming-body. this will trap if the + // incoming-body-stream child is still alive! + incoming-body-finish: func(this: /* own */ incoming-body) -> + /* transitive child of the incoming-response of incoming-body */ future-trailers + + type future-trailers = u32 + drop-future-trailers: func(this: /* own */ future-trailers) + /// Pollable that resolves when the body has been fully read, and the trailers + /// are ready to be consumed. + future-trailers-subscribe: func(this: /* borrow */ future-trailers) -> /* child */ pollable + + /// Retrieve reference to trailers, if they are ready. + future-trailers-get: func(response: /* borrow */ future-trailers) -> option> + type outgoing-response = u32 - drop-incoming-response: func(response: incoming-response) - drop-outgoing-response: func(response: outgoing-response) - incoming-response-status: func(response: incoming-response) -> status-code - incoming-response-headers: func(response: incoming-response) -> headers - incoming-response-consume: func(response: incoming-response) -> result + drop-outgoing-response: func(response: /* own */ outgoing-response) new-outgoing-response: func( status-code: status-code, - headers: headers + headers: /* borrow */ headers ) -> outgoing-response - outgoing-response-write: func(response: outgoing-response) -> result - // The following block defines a special resource type used by the - // `wasi:http/outgoing-handler` interface to emulate - // `future>` in advance of Preview3. Given a - // `future-incoming-response`, the client can call the non-blocking `get` - // method to get the result if it is available. If the result is not available, - // the client can call `listen` to get a `pollable` that can be passed to - // `io.poll.poll-oneoff`. + /// Will give the child outgoing-response at most once. subsequent calls will + /// return an error. + outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result + + type outgoing-body = u32 + drop-outgoing-body: func(this: /* own */ outgoing-body) + /// Will give the child output-stream at most once. subsequent calls will + /// return an error. + outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result + /// Write trailers as the way to finish an outgoing-body. To finish an + /// outgoing-body without writing trailers, use drop-outgoing-body. + outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers) + + /// The following block defines a special resource type used by the + /// `wasi:http/outgoing-handler` interface to emulate + /// `future>` in advance of Preview3. Given a + /// `future-incoming-response`, the client can call the non-blocking `get` + /// method to get the result if it is available. If the result is not available, + /// the client can call `listen` to get a `pollable` that can be passed to + /// `io.poll.poll-oneoff`. type future-incoming-response = u32 - drop-future-incoming-response: func(f: future-incoming-response) - future-incoming-response-get: func(f: future-incoming-response) -> option> - listen-to-future-incoming-response: func(f: future-incoming-response) -> pollable + drop-future-incoming-response: func(f: /* own */ future-incoming-response) + /// option indicates readiness. + /// outer result indicates you are allowed to get the + /// incoming-response-or-error at most once. subsequent calls after ready + /// will return an error here. + /// inner result indicates whether the incoming-response was available, or an + /// error occured. + future-incoming-response-get: func(f: /* borrow */ future-incoming-response) -> option>> + listen-to-future-incoming-response: func(f: /* borrow */ future-incoming-response) -> /* child */ pollable } diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 2e0a3e95e114..6d9ad182dcec 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -31,6 +31,7 @@ mod stdio; mod stream; mod table; mod tcp; +mod write_stream; pub use self::clocks::{HostMonotonicClock, HostWallClock}; pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiView}; @@ -151,7 +152,7 @@ pub(crate) static RUNTIME: once_cell::sync::Lazy = .unwrap() }); -pub(crate) struct AbortOnDropJoinHandle(tokio::task::JoinHandle); +pub struct AbortOnDropJoinHandle(tokio::task::JoinHandle); impl Drop for AbortOnDropJoinHandle { fn drop(&mut self) { self.0.abort() @@ -188,7 +189,7 @@ impl std::future::Future for AbortOnDropJoinHandle { } } -pub(crate) fn spawn(f: F) -> AbortOnDropJoinHandle +pub fn spawn(f: F) -> AbortOnDropJoinHandle where F: std::future::Future + Send + 'static, G: Send + 'static, diff --git a/crates/wasi/src/preview2/pipe.rs b/crates/wasi/src/preview2/pipe.rs index a01215c11323..69749db9d43c 100644 --- a/crates/wasi/src/preview2/pipe.rs +++ b/crates/wasi/src/preview2/pipe.rs @@ -10,9 +10,10 @@ use crate::preview2::{HostInputStream, HostOutputStream, OutputStreamError, StreamState}; use anyhow::{anyhow, Error}; use bytes::Bytes; -use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; +pub use crate::preview2::write_stream::AsyncWriteStream; + #[derive(Debug)] pub struct MemoryInputPipe { buffer: std::io::Cursor, @@ -221,198 +222,6 @@ impl HostInputStream for AsyncReadStream { } } -#[derive(Debug)] -struct WorkerState { - alive: bool, - items: std::collections::VecDeque, - write_budget: usize, - flush_pending: bool, - error: Option, -} - -impl WorkerState { - fn check_error(&mut self) -> Result<(), OutputStreamError> { - if let Some(e) = self.error.take() { - return Err(OutputStreamError::LastOperationFailed(e)); - } - if !self.alive { - return Err(OutputStreamError::Closed); - } - Ok(()) - } -} - -struct Worker { - state: Mutex, - new_work: tokio::sync::Notify, - write_ready_changed: tokio::sync::Notify, -} - -enum Job { - Flush, - Write(Bytes), -} - -enum WriteStatus<'a> { - Done(Result), - Pending(tokio::sync::futures::Notified<'a>), -} - -impl Worker { - fn new(write_budget: usize) -> Self { - Self { - state: Mutex::new(WorkerState { - alive: true, - items: std::collections::VecDeque::new(), - write_budget, - flush_pending: false, - error: None, - }), - new_work: tokio::sync::Notify::new(), - write_ready_changed: tokio::sync::Notify::new(), - } - } - fn check_write(&self) -> WriteStatus<'_> { - let mut state = self.state(); - if let Err(e) = state.check_error() { - return WriteStatus::Done(Err(e)); - } - - if state.flush_pending || state.write_budget == 0 { - return WriteStatus::Pending(self.write_ready_changed.notified()); - } - - WriteStatus::Done(Ok(state.write_budget)) - } - fn state(&self) -> std::sync::MutexGuard { - self.state.lock().unwrap() - } - fn pop(&self) -> Option { - let mut state = self.state(); - if state.items.is_empty() { - if state.flush_pending { - return Some(Job::Flush); - } - } else if let Some(bytes) = state.items.pop_front() { - return Some(Job::Write(bytes)); - } - - None - } - fn report_error(&self, e: std::io::Error) { - { - let mut state = self.state(); - state.alive = false; - state.error = Some(e.into()); - state.flush_pending = false; - } - self.write_ready_changed.notify_waiters(); - } - async fn work(&self, mut writer: T) { - use tokio::io::AsyncWriteExt; - loop { - let notified = self.new_work.notified(); - while let Some(job) = self.pop() { - match job { - Job::Flush => { - if let Err(e) = writer.flush().await { - self.report_error(e); - return; - } - - tracing::debug!("worker marking flush complete"); - self.state().flush_pending = false; - } - - Job::Write(mut bytes) => { - tracing::debug!("worker writing: {bytes:?}"); - let len = bytes.len(); - match writer.write_all_buf(&mut bytes).await { - Err(e) => { - self.report_error(e); - return; - } - Ok(_) => { - self.state().write_budget += len; - } - } - } - } - - self.write_ready_changed.notify_waiters(); - } - - notified.await; - } - } -} - -/// Provides a [`HostOutputStream`] impl from a [`tokio::io::AsyncWrite`] impl -pub struct AsyncWriteStream { - worker: Arc, - _join_handle: crate::preview2::AbortOnDropJoinHandle<()>, -} - -impl AsyncWriteStream { - /// Create a [`AsyncWriteStream`]. In order to use the [`HostOutputStream`] impl - /// provided by this struct, the argument must impl [`tokio::io::AsyncWrite`]. - pub fn new( - write_budget: usize, - writer: T, - ) -> Self { - let worker = Arc::new(Worker::new(write_budget)); - - let w = Arc::clone(&worker); - let join_handle = crate::preview2::spawn(async move { w.work(writer).await }); - - AsyncWriteStream { - worker, - _join_handle: join_handle, - } - } -} - -#[async_trait::async_trait] -impl HostOutputStream for AsyncWriteStream { - fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { - let mut state = self.worker.state(); - state.check_error()?; - if state.flush_pending { - return Err(OutputStreamError::Trap(anyhow!( - "write not permitted while flush pending" - ))); - } - match state.write_budget.checked_sub(bytes.len()) { - Some(remaining_budget) => { - state.write_budget = remaining_budget; - state.items.push_back(bytes); - } - None => return Err(OutputStreamError::Trap(anyhow!("write exceeded budget"))), - } - drop(state); - self.worker.new_work.notify_waiters(); - Ok(()) - } - fn flush(&mut self) -> Result<(), OutputStreamError> { - let mut state = self.worker.state(); - state.check_error()?; - - state.flush_pending = true; - self.worker.new_work.notify_waiters(); - - Ok(()) - } - - async fn write_ready(&mut self) -> Result { - loop { - match self.worker.check_write() { - WriteStatus::Done(r) => return r, - WriteStatus::Pending(notifier) => notifier.await, - } - } - } -} - /// An output stream that consumes all input written to it, and is always ready. pub struct SinkOutputStream; diff --git a/crates/wasi/src/preview2/table.rs b/crates/wasi/src/preview2/table.rs index f4efd8005ed1..de9b50a6f772 100644 --- a/crates/wasi/src/preview2/table.rs +++ b/crates/wasi/src/preview2/table.rs @@ -174,6 +174,15 @@ impl Table { } } + /// Get a mutable reference to the underlying untyped cell for an entry in the table. + pub fn get_any_mut(&mut self, key: u32) -> Result<&mut dyn Any, TableError> { + if let Some(r) = self.map.get_mut(&key) { + Ok(&mut *r.entry) + } else { + Err(TableError::NotPresent) + } + } + /// Get an immutable reference to a resource of a given type at a given index. Multiple /// immutable references can be borrowed at any given time. Borrow failure /// results in a trapping error. @@ -203,7 +212,7 @@ impl Table { /// remove or replace the entry based on its contents. The methods available are a subset of /// [`std::collections::hash_map::OccupiedEntry`] - it does not give access to the key, it /// restricts replacing the entry to items of the same type, and it does not allow for deletion. - pub fn entry(&mut self, index: u32) -> Result { + pub fn entry<'a>(&'a mut self, index: u32) -> Result, TableError> { if self.map.contains_key(&index) { Ok(OccupiedEntry { table: self, index }) } else { diff --git a/crates/wasi/src/preview2/write_stream.rs b/crates/wasi/src/preview2/write_stream.rs new file mode 100644 index 000000000000..bf154aec0720 --- /dev/null +++ b/crates/wasi/src/preview2/write_stream.rs @@ -0,0 +1,196 @@ +use crate::preview2::{HostOutputStream, OutputStreamError}; +use anyhow::anyhow; +use bytes::Bytes; +use std::sync::{Arc, Mutex}; + +#[derive(Debug)] +struct WorkerState { + alive: bool, + items: std::collections::VecDeque, + write_budget: usize, + flush_pending: bool, + error: Option, +} + +impl WorkerState { + fn check_error(&mut self) -> Result<(), OutputStreamError> { + if let Some(e) = self.error.take() { + return Err(OutputStreamError::LastOperationFailed(e)); + } + if !self.alive { + return Err(OutputStreamError::Closed); + } + Ok(()) + } +} + +struct Worker { + state: Mutex, + new_work: tokio::sync::Notify, + write_ready_changed: tokio::sync::Notify, +} + +enum Job { + Flush, + Write(Bytes), +} + +enum WriteStatus<'a> { + Done(Result), + Pending(tokio::sync::futures::Notified<'a>), +} + +impl Worker { + fn new(write_budget: usize) -> Self { + Self { + state: Mutex::new(WorkerState { + alive: true, + items: std::collections::VecDeque::new(), + write_budget, + flush_pending: false, + error: None, + }), + new_work: tokio::sync::Notify::new(), + write_ready_changed: tokio::sync::Notify::new(), + } + } + fn check_write(&self) -> WriteStatus<'_> { + let mut state = self.state(); + if let Err(e) = state.check_error() { + return WriteStatus::Done(Err(e)); + } + + if state.flush_pending || state.write_budget == 0 { + return WriteStatus::Pending(self.write_ready_changed.notified()); + } + + WriteStatus::Done(Ok(state.write_budget)) + } + fn state(&self) -> std::sync::MutexGuard { + self.state.lock().unwrap() + } + fn pop(&self) -> Option { + let mut state = self.state(); + if state.items.is_empty() { + if state.flush_pending { + return Some(Job::Flush); + } + } else if let Some(bytes) = state.items.pop_front() { + return Some(Job::Write(bytes)); + } + + None + } + fn report_error(&self, e: std::io::Error) { + { + let mut state = self.state(); + state.alive = false; + state.error = Some(e.into()); + state.flush_pending = false; + } + self.write_ready_changed.notify_waiters(); + } + async fn work(&self, mut writer: T) { + use tokio::io::AsyncWriteExt; + loop { + let notified = self.new_work.notified(); + while let Some(job) = self.pop() { + match job { + Job::Flush => { + if let Err(e) = writer.flush().await { + self.report_error(e); + return; + } + + tracing::debug!("worker marking flush complete"); + self.state().flush_pending = false; + } + + Job::Write(mut bytes) => { + tracing::debug!("worker writing: {bytes:?}"); + let len = bytes.len(); + match writer.write_all_buf(&mut bytes).await { + Err(e) => { + self.report_error(e); + return; + } + Ok(_) => { + self.state().write_budget += len; + } + } + } + } + + self.write_ready_changed.notify_waiters(); + } + + notified.await; + } + } +} + +/// Provides a [`HostOutputStream`] impl from a [`tokio::io::AsyncWrite`] impl +pub struct AsyncWriteStream { + worker: Arc, + _join_handle: crate::preview2::AbortOnDropJoinHandle<()>, +} + +impl AsyncWriteStream { + /// Create a [`AsyncWriteStream`]. In order to use the [`HostOutputStream`] impl + /// provided by this struct, the argument must impl [`tokio::io::AsyncWrite`]. + pub fn new( + write_budget: usize, + writer: T, + ) -> Self { + let worker = Arc::new(Worker::new(write_budget)); + + let w = Arc::clone(&worker); + let join_handle = crate::preview2::spawn(async move { w.work(writer).await }); + + AsyncWriteStream { + worker, + _join_handle: join_handle, + } + } +} + +#[async_trait::async_trait] +impl HostOutputStream for AsyncWriteStream { + fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { + let mut state = self.worker.state(); + state.check_error()?; + if state.flush_pending { + return Err(OutputStreamError::Trap(anyhow!( + "write not permitted while flush pending" + ))); + } + match state.write_budget.checked_sub(bytes.len()) { + Some(remaining_budget) => { + state.write_budget = remaining_budget; + state.items.push_back(bytes); + } + None => return Err(OutputStreamError::Trap(anyhow!("write exceeded budget"))), + } + drop(state); + self.worker.new_work.notify_waiters(); + Ok(()) + } + fn flush(&mut self) -> Result<(), OutputStreamError> { + let mut state = self.worker.state(); + state.check_error()?; + + state.flush_pending = true; + self.worker.new_work.notify_waiters(); + + Ok(()) + } + + async fn write_ready(&mut self) -> Result { + loop { + match self.worker.check_write() { + WriteStatus::Done(r) => return r, + WriteStatus::Pending(notifier) => notifier.await, + } + } + } +} diff --git a/crates/wasi/wit/deps/http/incoming-handler.wit b/crates/wasi/wit/deps/http/incoming-handler.wit index d0e270465593..ad8a43f8ccf0 100644 --- a/crates/wasi/wit/deps/http/incoming-handler.wit +++ b/crates/wasi/wit/deps/http/incoming-handler.wit @@ -12,13 +12,13 @@ interface incoming-handler { // The `handle` function takes an outparam instead of returning its response // so that the component may stream its response while streaming any other // request or response bodies. The callee MUST write a response to the - // `response-out` and then finish the response before returning. The `handle` + // `response-outparam` and then finish the response before returning. The `handle` // function is allowed to continue execution after finishing the response's // output stream. While this post-response execution is taken off the // critical path, since there is no return value, there is no way to report // its success or failure. handle: func( - request: incoming-request, - response-out: response-outparam + request: /* own */ incoming-request, + response-out: /* own */ response-outparam ) } diff --git a/crates/wasi/wit/deps/http/outgoing-handler.wit b/crates/wasi/wit/deps/http/outgoing-handler.wit index 06c8e469f95b..3e03327d742b 100644 --- a/crates/wasi/wit/deps/http/outgoing-handler.wit +++ b/crates/wasi/wit/deps/http/outgoing-handler.wit @@ -8,11 +8,20 @@ interface outgoing-handler { use types.{outgoing-request, request-options, future-incoming-response} + // FIXME: we would want to use the types.error here but there is a + // wasmtime-wit-bindgen bug that prevents us from using the same error in + // the two different interfaces, right now... + variant error { + invalid(string) + } + // The parameter and result types of the `handle` function allow the caller // to concurrently stream the bodies of the outgoing request and the incoming // response. + // Consumes the outgoing-request. Gives an error if the outgoing-request + // is invalid or cannot be satisfied by this handler. handle: func( - request: outgoing-request, + request: /* own */ outgoing-request, options: option - ) -> future-incoming-response + ) -> result } diff --git a/crates/wasi/wit/deps/http/types.wit b/crates/wasi/wit/deps/http/types.wit index 7b7b015529c0..821e15f96213 100644 --- a/crates/wasi/wit/deps/http/types.wit +++ b/crates/wasi/wit/deps/http/types.wit @@ -41,30 +41,28 @@ interface types { // fields = u32` type alias can be replaced by a proper `resource fields` // definition containing all the functions using the method syntactic sugar. type fields = u32 - drop-fields: func(fields: fields) - new-fields: func(entries: list>) -> fields - fields-get: func(fields: fields, name: string) -> list> - fields-set: func(fields: fields, name: string, value: list>) - fields-delete: func(fields: fields, name: string) - fields-append: func(fields: fields, name: string, value: list) - fields-entries: func(fields: fields) -> list>> - fields-clone: func(fields: fields) -> fields + drop-fields: func(fields: /* own */ fields) + // Multiple values for a header are multiple entries in the list with the + // same key. + new-fields: func(entries: list>>) -> fields + // Values off wire are not necessarily well formed, so they are given by + // list instead of string. + fields-get: func(fields: /* borrow */ fields, name: string) -> list> + // Values off wire are not necessarily well formed, so they are given by + // list instead of string. + fields-set: func(fields: /* borrow */ fields, name: string, value: list>) + fields-delete: func(fields: /* borrow */ fields, name: string) + fields-append: func(fields: /* borrow */ fields, name: string, value: list) + + // Values off wire are not necessarily well formed, so they are given by + // list instead of string. + fields-entries: func(fields: /* borrow */ fields) -> list>> + // Deep copy of all contents in a fields. + fields-clone: func(fields: /* borrow */ fields) -> fields type headers = fields type trailers = fields - // The following block defines stream types which corresponds to the HTTP - // standard Contents and Trailers. With Preview3, all of these fields can be - // replaced by a stream>. In the interim, we need to - // build on separate resource types defined by `wasi:io/streams`. The - // `finish-` functions emulate the stream's result value and MUST be called - // exactly once after the final read/write from/to the stream before dropping - // the stream. - type incoming-stream = input-stream - type outgoing-stream = output-stream - finish-incoming-stream: func(s: incoming-stream) -> option - finish-outgoing-stream: func(s: outgoing-stream, trailers: option) - // The following block defines the `incoming-request` and `outgoing-request` // resource types that correspond to HTTP standard Requests. Soon, when // resource types are added, the `u32` type aliases can be replaced by @@ -74,23 +72,30 @@ interface types { // above). The `consume` and `write` methods may only be called once (and // return failure thereafter). type incoming-request = u32 + drop-incoming-request: func(request: /* own */ incoming-request) + incoming-request-method: func(request: /* borrow */ incoming-request) -> method + incoming-request-path-with-query: func(request: /* borrow */ incoming-request) -> option + incoming-request-scheme: func(request: /* borrow */ incoming-request) -> option + incoming-request-authority: func(request: /* borrow */ incoming-request) -> option + + incoming-request-headers: func(request: /* borrow */ incoming-request) -> /* child */ headers + // Will return the input-stream child at most once. If called more than + // once, subsequent calls will return error. + incoming-request-consume: func(request: /* borrow */ incoming-request) -> result< /* child */ input-stream> + type outgoing-request = u32 - drop-incoming-request: func(request: incoming-request) - drop-outgoing-request: func(request: outgoing-request) - incoming-request-method: func(request: incoming-request) -> method - incoming-request-path-with-query: func(request: incoming-request) -> option - incoming-request-scheme: func(request: incoming-request) -> option - incoming-request-authority: func(request: incoming-request) -> option - incoming-request-headers: func(request: incoming-request) -> headers - incoming-request-consume: func(request: incoming-request) -> result + drop-outgoing-request: func(request: /* own */ outgoing-request) new-outgoing-request: func( method: method, path-with-query: option, scheme: option, authority: option, - headers: headers + headers: /* borrow */ headers ) -> outgoing-request - outgoing-request-write: func(request: outgoing-request) -> result + + // Will return the outgoing-body child at most once. If called more than + // once, subsequent calls will return error. + outgoing-request-write: func(request: /* borrow */ outgoing-request) -> result< /* child */ outgoing-body> // Additional optional parameters that can be set when making a request. record request-options { @@ -115,8 +120,8 @@ interface types { // (the `wasi:http/handler` interface used for both incoming and outgoing can // simply return a `stream`). type response-outparam = u32 - drop-response-outparam: func(response: response-outparam) - set-response-outparam: func(param: response-outparam, response: result) -> result + drop-response-outparam: func(response: /* own */ response-outparam) + set-response-outparam: func(param: /* own */ response-outparam, response: result< /* own */ outgoing-response, error>) // This type corresponds to the HTTP standard Status Code. type status-code = u16 @@ -129,27 +134,72 @@ interface types { // type (that uses the single `stream` type mentioned above). The `consume` and // `write` methods may only be called once (and return failure thereafter). type incoming-response = u32 + drop-incoming-response: func(response: /* own */ incoming-response) + incoming-response-status: func(response: /* borrow */ incoming-response) -> status-code + incoming-response-headers: func(response: /* borrow */ incoming-response) -> /* child */ headers + // May be called at most once. returns error if called additional times. + // TODO: make incoming-request-consume work the same way, giving a child + // incoming-body. + incoming-response-consume: func(response: /* borrow */ incoming-response) -> result + + type incoming-body = u32 + drop-incoming-body: func(this: /* own */ incoming-body) + + // returned input-stream is a child - the implementation may trap if + // incoming-body is dropped (or consumed by call to + // incoming-body-finish) before the input-stream is dropped. + // May be called at most once. returns error if called additional times. + incoming-body-stream: func(this: /* borrow */ incoming-body) -> + result + // takes ownership of incoming-body. this will trap if the + // incoming-body-stream child is still alive! + incoming-body-finish: func(this: /* own */ incoming-body) -> + /* transitive child of the incoming-response of incoming-body */ future-trailers + + type future-trailers = u32 + drop-future-trailers: func(this: /* own */ future-trailers) + /// Pollable that resolves when the body has been fully read, and the trailers + /// are ready to be consumed. + future-trailers-subscribe: func(this: /* borrow */ future-trailers) -> /* child */ pollable + + /// Retrieve reference to trailers, if they are ready. + future-trailers-get: func(response: /* borrow */ future-trailers) -> option> + type outgoing-response = u32 - drop-incoming-response: func(response: incoming-response) - drop-outgoing-response: func(response: outgoing-response) - incoming-response-status: func(response: incoming-response) -> status-code - incoming-response-headers: func(response: incoming-response) -> headers - incoming-response-consume: func(response: incoming-response) -> result + drop-outgoing-response: func(response: /* own */ outgoing-response) new-outgoing-response: func( status-code: status-code, - headers: headers + headers: /* borrow */ headers ) -> outgoing-response - outgoing-response-write: func(response: outgoing-response) -> result - // The following block defines a special resource type used by the - // `wasi:http/outgoing-handler` interface to emulate - // `future>` in advance of Preview3. Given a - // `future-incoming-response`, the client can call the non-blocking `get` - // method to get the result if it is available. If the result is not available, - // the client can call `listen` to get a `pollable` that can be passed to - // `io.poll.poll-oneoff`. + /// Will give the child outgoing-response at most once. subsequent calls will + /// return an error. + outgoing-response-write: func(this: /* borrow */ outgoing-response) -> result + + type outgoing-body = u32 + drop-outgoing-body: func(this: /* own */ outgoing-body) + /// Will give the child output-stream at most once. subsequent calls will + /// return an error. + outgoing-body-write: func(this: /* borrow */ outgoing-body) -> result + /// Write trailers as the way to finish an outgoing-body. To finish an + /// outgoing-body without writing trailers, use drop-outgoing-body. + outgoing-body-write-trailers: func(this: /* own */ outgoing-body, trailers: /* own */ trailers) + + /// The following block defines a special resource type used by the + /// `wasi:http/outgoing-handler` interface to emulate + /// `future>` in advance of Preview3. Given a + /// `future-incoming-response`, the client can call the non-blocking `get` + /// method to get the result if it is available. If the result is not available, + /// the client can call `listen` to get a `pollable` that can be passed to + /// `io.poll.poll-oneoff`. type future-incoming-response = u32 - drop-future-incoming-response: func(f: future-incoming-response) - future-incoming-response-get: func(f: future-incoming-response) -> option> - listen-to-future-incoming-response: func(f: future-incoming-response) -> pollable + drop-future-incoming-response: func(f: /* own */ future-incoming-response) + /// option indicates readiness. + /// outer result indicates you are allowed to get the + /// incoming-response-or-error at most once. subsequent calls after ready + /// will return an error here. + /// inner result indicates whether the incoming-response was available, or an + /// error occured. + future-incoming-response-get: func(f: /* borrow */ future-incoming-response) -> option>> + listen-to-future-incoming-response: func(f: /* borrow */ future-incoming-response) -> /* child */ pollable } diff --git a/src/commands/run.rs b/src/commands/run.rs index 1c1bfe281799..01b04809b10b 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -848,22 +848,22 @@ impl RunCommand { } if self.common.wasi.http == Some(true) { - #[cfg(not(feature = "wasi-http"))] + #[cfg(not(all(feature = "wasi-http", feature = "component-model")))] { bail!("Cannot enable wasi-http when the binary is not compiled with this feature."); } - #[cfg(feature = "wasi-http")] + #[cfg(all(feature = "wasi-http", feature = "component-model"))] { match linker { - CliLinker::Core(linker) => { - wasmtime_wasi_http::sync::add_to_linker(linker)?; + CliLinker::Core(_) => { + bail!("Cannot enable wasi-http for core wasm modules"); } - #[cfg(feature = "component-model")] CliLinker::Component(linker) => { - wasmtime_wasi_http::proxy::sync::add_to_linker(linker)?; + wasmtime_wasi_http::proxy::add_to_linker(linker)?; } } - store.data_mut().wasi_http = Some(Arc::new(WasiHttpCtx::new())); + + store.data_mut().wasi_http = Some(Arc::new(WasiHttpCtx {})); } } @@ -998,13 +998,13 @@ impl preview2::preview1::WasiPreview1View for Host { #[cfg(feature = "wasi-http")] impl wasmtime_wasi_http::types::WasiHttpView for Host { - fn http_ctx(&self) -> &WasiHttpCtx { - self.wasi_http.as_ref().unwrap() + fn ctx(&mut self) -> &mut WasiHttpCtx { + let ctx = self.wasi_http.as_mut().unwrap(); + Arc::get_mut(ctx).expect("preview2 is not compatible with threads") } - fn http_ctx_mut(&mut self) -> &mut WasiHttpCtx { - let ctx = self.wasi_http.as_mut().unwrap(); - Arc::get_mut(ctx).expect("wasi-http is not compatible with threads") + fn table(&mut self) -> &mut preview2::Table { + Arc::get_mut(&mut self.preview2_table).expect("preview2 is not compatible with threads") } } diff --git a/tests/all/cli_tests.rs b/tests/all/cli_tests.rs index de5f2cb06a03..4f7dd4968dc0 100644 --- a/tests/all/cli_tests.rs +++ b/tests/all/cli_tests.rs @@ -791,6 +791,7 @@ fn run_basic_component() -> Result<()> { } #[cfg(feature = "wasi-http")] +#[ignore = "needs to be ported to components"] #[test] fn run_wasi_http_module() -> Result<()> { let output = run_wasmtime_for_output(