Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasmtime-wasi-http docs #8347

Merged
merged 14 commits into from
Apr 12, 2024
259 changes: 143 additions & 116 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Implementation of the `wasi:http/types` interface's various body types.

use crate::{bindings::http::types, types::FieldMap};
use anyhow::anyhow;
use bytes::Bytes;
Expand All @@ -14,9 +16,74 @@ use wasmtime_wasi::{
HostInputStream, HostOutputStream, StreamError, Subscribe,
};

/// Common type for incoming bodies.
pub type HyperIncomingBody = BoxBody<Bytes, types::ErrorCode>;

/// Small wrapper around `BoxBody` which adds a timeout to every frame.
/// Common type for outgoing bodies.
pub type HyperOutgoingBody = BoxBody<Bytes, types::ErrorCode>;

/// The concrete type behind a `was:http/types/incoming-body` resource.
pub struct HostIncomingBody {
body: IncomingBodyState,
/// An optional worker task to keep alive while this body is being read.
/// This ensures that if the parent of this body is dropped before the body
/// then the backing data behind this worker is kept alive.
worker: Option<AbortOnDropJoinHandle<()>>,
}

impl HostIncomingBody {
/// Create a new `HostIncomingBody` with the given `body` and a per-frame timeout
pub fn new(body: HyperIncomingBody, between_bytes_timeout: Duration) -> HostIncomingBody {
let body = BodyWithTimeout::new(body, between_bytes_timeout);
HostIncomingBody {
body: IncomingBodyState::Start(body),
worker: None,
}
}

/// Retain a worker task that needs to be kept alive while this body is being read.
pub fn retain_worker(&mut self, worker: AbortOnDropJoinHandle<()>) {
assert!(self.worker.is_none());
self.worker = Some(worker);
}

/// Try taking the stream of this body, if it's available.
pub fn take_stream(&mut self) -> Option<HostIncomingBodyStream> {
match &mut self.body {
IncomingBodyState::Start(_) => {}
IncomingBodyState::InBodyStream(_) => return None,
}
let (tx, rx) = oneshot::channel();
let body = match mem::replace(&mut self.body, IncomingBodyState::InBodyStream(rx)) {
IncomingBodyState::Start(b) => b,
IncomingBodyState::InBodyStream(_) => unreachable!(),
};
Some(HostIncomingBodyStream {
state: IncomingBodyStreamState::Open { body, tx },
buffer: Bytes::new(),
error: None,
})
}

/// Convert this body into a `HostFutureTrailers` resource.
pub fn into_future_trailers(self) -> HostFutureTrailers {
HostFutureTrailers::Waiting(self)
}
}

/// Internal state of a [`HostIncomingBody`].
enum IncomingBodyState {
/// The body is stored here meaning that within `HostIncomingBody` the
/// `take_stream` method can be called for example.
Start(BodyWithTimeout),

/// The body is within a `HostIncomingBodyStream` meaning that it's not
/// currently owned here. The body will be sent back over this channel when
/// it's done, however.
InBodyStream(oneshot::Receiver<StreamEnd>),
}

/// Small wrapper around [`HyperIncomingBody`] which adds a timeout to every frame.
struct BodyWithTimeout {
/// Underlying stream that frames are coming from.
inner: HyperIncomingBody,
Expand Down Expand Up @@ -77,25 +144,6 @@ impl Body for BodyWithTimeout {
}
}

pub struct HostIncomingBody {
body: IncomingBodyState,
/// An optional worker task to keep alive while this body is being read.
/// This ensures that if the parent of this body is dropped before the body
/// then the backing data behind this worker is kept alive.
worker: Option<AbortOnDropJoinHandle<()>>,
}

enum IncomingBodyState {
/// The body is stored here meaning that within `HostIncomingBody` the
/// `take_stream` method can be called for example.
Start(BodyWithTimeout),

/// The body is within a `HostIncomingBodyStream` meaning that it's not
/// currently owned here. The body will be sent back over this channel when
/// it's done, however.
InBodyStream(oneshot::Receiver<StreamEnd>),
}

/// Message sent when a `HostIncomingBodyStream` is done to the
/// `HostFutureTrailers` state.
enum StreamEnd {
Expand All @@ -108,48 +156,59 @@ enum StreamEnd {
Trailers(Option<FieldMap>),
}

impl HostIncomingBody {
pub fn new(body: HyperIncomingBody, between_bytes_timeout: Duration) -> HostIncomingBody {
let body = BodyWithTimeout::new(body, between_bytes_timeout);
HostIncomingBody {
body: IncomingBodyState::Start(body),
worker: None,
}
}

pub fn retain_worker(&mut self, worker: AbortOnDropJoinHandle<()>) {
assert!(self.worker.is_none());
self.worker = Some(worker);
}

pub fn take_stream(&mut self) -> Option<HostIncomingBodyStream> {
match &mut self.body {
IncomingBodyState::Start(_) => {}
IncomingBodyState::InBodyStream(_) => return None,
}
let (tx, rx) = oneshot::channel();
let body = match mem::replace(&mut self.body, IncomingBodyState::InBodyStream(rx)) {
IncomingBodyState::Start(b) => b,
IncomingBodyState::InBodyStream(_) => unreachable!(),
};
Some(HostIncomingBodyStream {
state: IncomingBodyStreamState::Open { body, tx },
buffer: Bytes::new(),
error: None,
})
}

pub fn into_future_trailers(self) -> HostFutureTrailers {
HostFutureTrailers::Waiting(self)
}
}

/// The concrete type behind the `wasi:io/streams/input-stream` resource returned
/// by `wasi:http/types/incoming-body`'s `stream` method.
pub struct HostIncomingBodyStream {
state: IncomingBodyStreamState,
buffer: Bytes,
error: Option<anyhow::Error>,
}

impl HostIncomingBodyStream {
fn record_frame(&mut self, frame: Option<Result<Frame<Bytes>, types::ErrorCode>>) {
match frame {
Some(Ok(frame)) => match frame.into_data() {
// A data frame was received, so queue up the buffered data for
// the next `read` call.
Ok(bytes) => {
assert!(self.buffer.is_empty());
self.buffer = bytes;
}

// Trailers were received meaning that this was the final frame.
// Throw away the body and send the trailers along the
// `tx` channel to make them available.
Err(trailers) => {
let trailers = trailers.into_trailers().unwrap();
let tx = match mem::replace(&mut self.state, IncomingBodyStreamState::Closed) {
IncomingBodyStreamState::Open { body: _, tx } => tx,
IncomingBodyStreamState::Closed => unreachable!(),
};

// NB: ignore send failures here because if this fails then
// no one was interested in the trailers.
let _ = tx.send(StreamEnd::Trailers(Some(trailers)));
}
},

// An error was received meaning that the stream is now done.
// Destroy the body to terminate the stream while enqueueing the
// error to get returned from the next call to `read`.
Some(Err(e)) => {
self.error = Some(e.into());
self.state = IncomingBodyStreamState::Closed;
}

// No more frames are going to be received again, so drop the `body`
// and the `tx` channel we'd send the body back onto because it's
// not needed as frames are done.
None => {
self.state = IncomingBodyStreamState::Closed;
}
}
}
}

enum IncomingBodyStreamState {
/// The body is currently open for reading and present here.
///
Expand Down Expand Up @@ -219,51 +278,6 @@ impl Subscribe for HostIncomingBodyStream {
}
}

impl HostIncomingBodyStream {
fn record_frame(&mut self, frame: Option<Result<Frame<Bytes>, types::ErrorCode>>) {
match frame {
Some(Ok(frame)) => match frame.into_data() {
// A data frame was received, so queue up the buffered data for
// the next `read` call.
Ok(bytes) => {
assert!(self.buffer.is_empty());
self.buffer = bytes;
}

// Trailers were received meaning that this was the final frame.
// Throw away the body and send the trailers along the
// `tx` channel to make them available.
Err(trailers) => {
let trailers = trailers.into_trailers().unwrap();
let tx = match mem::replace(&mut self.state, IncomingBodyStreamState::Closed) {
IncomingBodyStreamState::Open { body: _, tx } => tx,
IncomingBodyStreamState::Closed => unreachable!(),
};

// NB: ignore send failures here because if this fails then
// no one was interested in the trailers.
let _ = tx.send(StreamEnd::Trailers(Some(trailers)));
}
},

// An error was received meaning that the stream is now done.
// Destroy the body to terminate the stream while enqueueing the
// error to get returned from the next call to `read`.
Some(Err(e)) => {
self.error = Some(e.into());
self.state = IncomingBodyStreamState::Closed;
}

// No more frames are going to be received again, so drop the `body`
// and the `tx` channel we'd send the body back onto because it's
// not needed as frames are done.
None => {
self.state = IncomingBodyStreamState::Closed;
}
}
}
}

impl Drop for HostIncomingBodyStream {
fn drop(&mut self) {
// When a body stream is dropped, for whatever reason, attempt to send
Expand All @@ -278,6 +292,7 @@ impl Drop for HostIncomingBodyStream {
}
}

/// The concrete type behind a `wasi:http/types/future-trailers` resource.
pub enum HostFutureTrailers {
/// Trailers aren't here yet.
///
Expand Down Expand Up @@ -363,14 +378,6 @@ impl Subscribe for HostFutureTrailers {
}
}

pub type HyperOutgoingBody = BoxBody<Bytes, types::ErrorCode>;

pub enum FinishMessage {
Finished,
Trailers(hyper::HeaderMap),
Abort,
}

#[derive(Clone)]
struct WrittenState {
expected: u64,
Expand Down Expand Up @@ -401,14 +408,17 @@ impl WrittenState {
}
}

/// The concrete type behind a `wasi:http/types/outgoing-body` resource.
pub struct HostOutgoingBody {
pub body_output_stream: Option<Box<dyn HostOutputStream>>,
/// The output stream that the body is written to.
body_output_stream: Option<Box<dyn HostOutputStream>>,
context: StreamContext,
written: Option<WrittenState>,
finish_sender: Option<tokio::sync::oneshot::Sender<FinishMessage>>,
}

impl HostOutgoingBody {
/// Create a new `HostOutgoingBody`
pub fn new(context: StreamContext, size: Option<u64>) -> (Self, HyperOutgoingBody) {
let written = size.map(WrittenState::new);

Expand Down Expand Up @@ -478,7 +488,13 @@ impl HostOutgoingBody {
)
}

pub fn finish(mut self, ts: Option<FieldMap>) -> Result<(), types::ErrorCode> {
/// Take the output stream, if it's available.
pub fn take_output_stream(&mut self) -> Option<Box<dyn HostOutputStream>> {
self.body_output_stream.take()
}

/// Finish the body, optionally with trailers.
pub fn finish(mut self, trailers: Option<FieldMap>) -> Result<(), types::ErrorCode> {
// Make sure that the output stream has been dropped, so that the BodyImpl poll function
// will immediately pick up the finish sender.
drop(self.body_output_stream);
Expand All @@ -492,11 +508,11 @@ impl HostOutgoingBody {
let written = w.written();
if written != w.expected {
let _ = sender.send(FinishMessage::Abort);
return Err(self.context.as_body_error(written));
return Err(self.context.as_body_size_error(written));
}
}

let message = if let Some(ts) = ts {
let message = if let Some(ts) = trailers {
FinishMessage::Trailers(ts)
} else {
FinishMessage::Finished
Expand All @@ -508,6 +524,7 @@ impl HostOutgoingBody {
Ok(())
}

/// Abort the body.
pub fn abort(mut self) {
// Make sure that the output stream has been dropped, so that the BodyImpl poll function
// will immediately pick up the finish sender.
Expand All @@ -522,15 +539,25 @@ impl HostOutgoingBody {
}
}

/// Message sent to end the `[HostOutgoingBody]` stream.
enum FinishMessage {
Finished,
Trailers(hyper::HeaderMap),
Abort,
}

/// Whether the body is a request or response body.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum StreamContext {
/// The body is a request body.
Request,
/// The body is a response body.
Response,
}

impl StreamContext {
/// Construct an http request or response body size error.
pub fn as_body_error(&self, size: u64) -> types::ErrorCode {
/// Construct the correct [`types::ErrorCode`] body size error.
pub fn as_body_size_error(&self, size: u64) -> types::ErrorCode {
match self {
StreamContext::Request => types::ErrorCode::HttpRequestBodySize(Some(size)),
StreamContext::Response => types::ErrorCode::HttpResponseBodySize(Some(size)),
Expand Down Expand Up @@ -578,7 +605,7 @@ impl HostOutputStream for BodyWriteStream {
let total = written.written();
return Err(StreamError::LastOperationFailed(anyhow!(self
.context
.as_body_error(total))));
.as_body_size_error(total))));
}
}

Expand Down
Loading