Skip to content

Commit

Permalink
Start to lift restriction of stdio only once
Browse files Browse the repository at this point in the history
This commit adds new `{Stdin,Stdout}Stream` traits which take over the
job of the stdio streams in `WasiCtxBuilder` and `WasiCtx`. These traits
bake in the ability to create a stream at any time to satisfy the API
of `wasi:cli`. The TTY functionality is folded into them as while I was
at it.

The implementation for stdin is relatively trivial since the stdin
implementation already handles multiple streams reading it. Built-in
impls of the `StdinStream` trait are also provided for helper types in
`preview2::pipe` which resulted in the implementation of
`MemoryInputPipe` being updated to support `Clone` where all clones read
the same original data.
  • Loading branch information
alexcrichton committed Sep 27, 2023
1 parent 7dca345 commit efab420
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 164 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ cap-net-ext = { workspace = true, optional = true }
cap-time-ext = { workspace = true, optional = true }
io-lifetimes = { workspace = true, optional = true }
fs-set-times = { workspace = true, optional = true }
is-terminal = { workspace = true, optional = true }
bitflags = { workspace = true, optional = true }
async-trait = { workspace = true, optional = true }
system-interface = { workspace = true, optional = true}
Expand Down Expand Up @@ -72,7 +71,6 @@ preview2 = [
'dep:cap-time-ext',
'dep:io-lifetimes',
'dep:fs-set-times',
'dep:is-terminal',
'dep:bitflags',
'dep:async-trait',
'dep:system-interface',
Expand Down
97 changes: 20 additions & 77 deletions crates/wasi/src/preview2/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
use super::clocks::host::{monotonic_clock, wall_clock};
use crate::preview2::{
bindings::cli::{terminal_input, terminal_output},
bindings::io::streams,
clocks::{self, HostMonotonicClock, HostWallClock},
filesystem::Dir,
pipe, random, stdio,
stdio::{HostTerminalInputState, HostTerminalOutputState},
stream::{HostInputStream, HostOutputStream, TableStreamExt},
DirPerms, FilePerms, IsATTY, Table,
stdio::{StdinStream, StdoutStream},
DirPerms, FilePerms, Table,
};
use cap_rand::{Rng, RngCore, SeedableRng};
use cap_std::ipnet::{self, IpNet};
use cap_std::net::Pool;
use cap_std::{ambient_authority, AmbientAuthority};
use std::mem;
use std::net::{Ipv4Addr, Ipv6Addr};
use wasmtime::component::Resource;

pub struct WasiCtxBuilder {
stdin: (Box<dyn HostInputStream>, IsATTY),
stdout: (Box<dyn HostOutputStream>, IsATTY),
stderr: (Box<dyn HostOutputStream>, IsATTY),
stdin: Box<dyn StdinStream>,
stdout: Box<dyn StdoutStream>,
stderr: Box<dyn StdoutStream>,
env: Vec<(String, String)>,
args: Vec<String>,
preopens: Vec<(Dir, String)>,
Expand Down Expand Up @@ -65,9 +61,9 @@ impl WasiCtxBuilder {
let insecure_random_seed =
cap_rand::thread_rng(cap_rand::ambient_authority()).gen::<u128>();
Self {
stdin: (Box::new(pipe::ClosedInputStream), IsATTY::No),
stdout: (Box::new(pipe::SinkOutputStream), IsATTY::No),
stderr: (Box::new(pipe::SinkOutputStream), IsATTY::No),
stdin: Box::new(pipe::ClosedInputStream),
stdout: Box::new(pipe::SinkOutputStream),
stderr: Box::new(pipe::SinkOutputStream),
env: Vec::new(),
args: Vec::new(),
preopens: Vec::new(),
Expand All @@ -81,52 +77,31 @@ impl WasiCtxBuilder {
}
}

pub fn stdin(&mut self, stdin: impl HostInputStream + 'static, isatty: IsATTY) -> &mut Self {
self.stdin = (Box::new(stdin), isatty);
pub fn stdin(&mut self, stdin: impl StdinStream + 'static) -> &mut Self {
self.stdin = Box::new(stdin);
self
}

pub fn stdout(&mut self, stdout: impl HostOutputStream + 'static, isatty: IsATTY) -> &mut Self {
self.stdout = (Box::new(stdout), isatty);
pub fn stdout(&mut self, stdout: impl StdoutStream + 'static) -> &mut Self {
self.stdout = Box::new(stdout);
self
}

pub fn stderr(&mut self, stderr: impl HostOutputStream + 'static, isatty: IsATTY) -> &mut Self {
self.stderr = (Box::new(stderr), isatty);
pub fn stderr(&mut self, stderr: impl StdoutStream + 'static) -> &mut Self {
self.stderr = Box::new(stderr);
self
}

pub fn inherit_stdin(&mut self) -> &mut Self {
use is_terminal::IsTerminal;
let inherited = stdio::stdin();
let isatty = if inherited.is_terminal() {
IsATTY::Yes
} else {
IsATTY::No
};
self.stdin(inherited, isatty)
self.stdin(stdio::stdin())
}

pub fn inherit_stdout(&mut self) -> &mut Self {
use is_terminal::IsTerminal;
let inherited = stdio::stdout();
let isatty = if inherited.is_terminal() {
IsATTY::Yes
} else {
IsATTY::No
};
self.stdout(inherited, isatty)
self.stdout(stdio::stdout())
}

pub fn inherit_stderr(&mut self) -> &mut Self {
use is_terminal::IsTerminal;
let inherited = stdio::stderr();
let isatty = if inherited.is_terminal() {
IsATTY::Yes
} else {
IsATTY::No
};
self.stderr(inherited, isatty)
self.stderr(stdio::stderr())
}

pub fn inherit_stdio(&mut self) -> &mut Self {
Expand Down Expand Up @@ -273,7 +248,6 @@ impl WasiCtxBuilder {
pub fn build(&mut self, table: &mut Table) -> Result<WasiCtx, anyhow::Error> {
assert!(!self.built);

use anyhow::Context;
let Self {
stdin,
stdout,
Expand All @@ -291,38 +265,10 @@ impl WasiCtxBuilder {
} = mem::replace(self, Self::new());
self.built = true;

let stdin_terminal = Some(if let IsATTY::Yes = stdin.1 {
Some(Resource::new_own(
table.push(Box::new(HostTerminalInputState))?,
))
} else {
None
});
let stdout_terminal = Some(if let IsATTY::Yes = stdout.1 {
Some(Resource::new_own(
table.push(Box::new(HostTerminalOutputState))?,
))
} else {
None
});
let stderr_terminal = Some(if let IsATTY::Yes = stderr.1 {
Some(Resource::new_own(
table.push(Box::new(HostTerminalOutputState))?,
))
} else {
None
});
let stdin = Some(table.push_input_stream(stdin.0).context("stdin")?);
let stdout = Some(table.push_output_stream(stdout.0).context("stdout")?);
let stderr = Some(table.push_output_stream(stderr.0).context("stderr")?);

Ok(WasiCtx {
stdin,
stdin_terminal,
stdout,
stdout_terminal,
stderr,
stderr_terminal,
env,
args,
preopens,
Expand Down Expand Up @@ -352,11 +298,8 @@ pub struct WasiCtx {
pub(crate) env: Vec<(String, String)>,
pub(crate) args: Vec<String>,
pub(crate) preopens: Vec<(Dir, String)>,
pub(crate) stdin: Option<Resource<streams::InputStream>>,
pub(crate) stdout: Option<Resource<streams::OutputStream>>,
pub(crate) stderr: Option<Resource<streams::OutputStream>>,
pub(crate) stdin_terminal: Option<Option<Resource<terminal_input::TerminalInput>>>,
pub(crate) stdout_terminal: Option<Option<Resource<terminal_output::TerminalOutput>>>,
pub(crate) stderr_terminal: Option<Option<Resource<terminal_output::TerminalOutput>>>,
pub(crate) stdin: Box<dyn StdinStream>,
pub(crate) stdout: Box<dyn StdoutStream>,
pub(crate) stderr: Box<dyn StdoutStream>,
pub(crate) pool: Pool,
}
28 changes: 16 additions & 12 deletions crates/wasi/src/preview2/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,46 @@
use crate::preview2::{HostInputStream, HostOutputStream, OutputStreamError, StreamState};
use anyhow::{anyhow, Error};
use bytes::Bytes;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;

pub use crate::preview2::write_stream::AsyncWriteStream;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MemoryInputPipe {
buffer: std::io::Cursor<Bytes>,
buffer: Arc<Mutex<Bytes>>,
}

impl MemoryInputPipe {
pub fn new(bytes: Bytes) -> Self {
Self {
buffer: std::io::Cursor::new(bytes),
buffer: Arc::new(Mutex::new(bytes)),
}
}

pub fn is_empty(&self) -> bool {
self.buffer.get_ref().len() as u64 == self.buffer.position()
self.buffer.lock().unwrap().is_empty()
}
}

#[async_trait::async_trait]
impl HostInputStream for MemoryInputPipe {
fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> {
if self.is_empty() {
let mut buffer = self.buffer.lock().unwrap();
if buffer.is_empty() {
return Ok((Bytes::new(), StreamState::Closed));
}

let mut dest = bytes::BytesMut::zeroed(size);
let nbytes = std::io::Read::read(&mut self.buffer, dest.as_mut())?;
dest.truncate(nbytes);

let state = if self.is_empty() {
let size = size.min(buffer.len());
let read = buffer.split_to(size);
let state = if buffer.is_empty() {
StreamState::Closed
} else {
StreamState::Open
};
Ok((dest.freeze(), state))
Ok((read, state))
}

async fn ready(&mut self) -> Result<(), Error> {
Ok(())
}
Expand All @@ -57,7 +58,7 @@ impl HostInputStream for MemoryInputPipe {
#[derive(Debug, Clone)]
pub struct MemoryOutputPipe {
capacity: usize,
buffer: std::sync::Arc<std::sync::Mutex<bytes::BytesMut>>,
buffer: Arc<Mutex<bytes::BytesMut>>,
}

impl MemoryOutputPipe {
Expand Down Expand Up @@ -223,6 +224,7 @@ impl HostInputStream for AsyncReadStream {
}

/// An output stream that consumes all input written to it, and is always ready.
#[derive(Copy, Clone)]
pub struct SinkOutputStream;

#[async_trait::async_trait]
Expand All @@ -242,6 +244,7 @@ impl HostOutputStream for SinkOutputStream {
}

/// A stream that is ready immediately, but will always report that it's closed.
#[derive(Copy, Clone)]
pub struct ClosedInputStream;

#[async_trait::async_trait]
Expand All @@ -256,6 +259,7 @@ impl HostInputStream for ClosedInputStream {
}

/// An output stream that is always closed.
#[derive(Copy, Clone)]
pub struct ClosedOutputStream;

#[async_trait::async_trait]
Expand Down
Loading

0 comments on commit efab420

Please sign in to comment.