From efab420627d755fff4f060eb130a896ee5f9b173 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 27 Sep 2023 13:50:32 -0700 Subject: [PATCH] Start to lift restriction of stdio only once This commit adds new `{Stdin,Stdout}Stream` traits which take over the job of the stdio streams in `WasiCtxBuilder` and `WasiCtx`. These traits bake in the ability to create a stream at any time to satisfy the API of `wasi:cli`. The TTY functionality is folded into them as while I was at it. The implementation for stdin is relatively trivial since the stdin implementation already handles multiple streams reading it. Built-in impls of the `StdinStream` trait are also provided for helper types in `preview2::pipe` which resulted in the implementation of `MemoryInputPipe` being updated to support `Clone` where all clones read the same original data. --- Cargo.lock | 1 - crates/wasi/Cargo.toml | 2 - crates/wasi/src/preview2/ctx.rs | 97 ++------ crates/wasi/src/preview2/pipe.rs | 28 ++- crates/wasi/src/preview2/stdio.rs | 215 ++++++++++++------ .../src/preview2/stdio/worker_thread_stdin.rs | 11 +- 6 files changed, 190 insertions(+), 164 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 465622bf001a..5e101b39db29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3731,7 +3731,6 @@ dependencies = [ "futures", "io-extras", "io-lifetimes 2.0.2", - "is-terminal", "libc", "once_cell", "rustix 0.38.8", diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index 5bc3b63cf8b0..2ef0c8d67c30 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -33,7 +33,6 @@ cap-net-ext = { workspace = true, optional = true } cap-time-ext = { workspace = true, optional = true } io-lifetimes = { workspace = true, optional = true } fs-set-times = { workspace = true, optional = true } -is-terminal = { workspace = true, optional = true } bitflags = { workspace = true, optional = true } async-trait = { workspace = true, optional = true } system-interface = { workspace = true, optional = true} @@ -72,7 +71,6 @@ preview2 = [ 'dep:cap-time-ext', 'dep:io-lifetimes', 'dep:fs-set-times', - 'dep:is-terminal', 'dep:bitflags', 'dep:async-trait', 'dep:system-interface', diff --git a/crates/wasi/src/preview2/ctx.rs b/crates/wasi/src/preview2/ctx.rs index 90ed5678aafd..69adb83915fd 100644 --- a/crates/wasi/src/preview2/ctx.rs +++ b/crates/wasi/src/preview2/ctx.rs @@ -1,13 +1,10 @@ use super::clocks::host::{monotonic_clock, wall_clock}; use crate::preview2::{ - bindings::cli::{terminal_input, terminal_output}, - bindings::io::streams, clocks::{self, HostMonotonicClock, HostWallClock}, filesystem::Dir, pipe, random, stdio, - stdio::{HostTerminalInputState, HostTerminalOutputState}, - stream::{HostInputStream, HostOutputStream, TableStreamExt}, - DirPerms, FilePerms, IsATTY, Table, + stdio::{StdinStream, StdoutStream}, + DirPerms, FilePerms, Table, }; use cap_rand::{Rng, RngCore, SeedableRng}; use cap_std::ipnet::{self, IpNet}; @@ -15,12 +12,11 @@ use cap_std::net::Pool; use cap_std::{ambient_authority, AmbientAuthority}; use std::mem; use std::net::{Ipv4Addr, Ipv6Addr}; -use wasmtime::component::Resource; pub struct WasiCtxBuilder { - stdin: (Box, IsATTY), - stdout: (Box, IsATTY), - stderr: (Box, IsATTY), + stdin: Box, + stdout: Box, + stderr: Box, env: Vec<(String, String)>, args: Vec, preopens: Vec<(Dir, String)>, @@ -65,9 +61,9 @@ impl WasiCtxBuilder { let insecure_random_seed = cap_rand::thread_rng(cap_rand::ambient_authority()).gen::(); Self { - stdin: (Box::new(pipe::ClosedInputStream), IsATTY::No), - stdout: (Box::new(pipe::SinkOutputStream), IsATTY::No), - stderr: (Box::new(pipe::SinkOutputStream), IsATTY::No), + stdin: Box::new(pipe::ClosedInputStream), + stdout: Box::new(pipe::SinkOutputStream), + stderr: Box::new(pipe::SinkOutputStream), env: Vec::new(), args: Vec::new(), preopens: Vec::new(), @@ -81,52 +77,31 @@ impl WasiCtxBuilder { } } - pub fn stdin(&mut self, stdin: impl HostInputStream + 'static, isatty: IsATTY) -> &mut Self { - self.stdin = (Box::new(stdin), isatty); + pub fn stdin(&mut self, stdin: impl StdinStream + 'static) -> &mut Self { + self.stdin = Box::new(stdin); self } - pub fn stdout(&mut self, stdout: impl HostOutputStream + 'static, isatty: IsATTY) -> &mut Self { - self.stdout = (Box::new(stdout), isatty); + pub fn stdout(&mut self, stdout: impl StdoutStream + 'static) -> &mut Self { + self.stdout = Box::new(stdout); self } - pub fn stderr(&mut self, stderr: impl HostOutputStream + 'static, isatty: IsATTY) -> &mut Self { - self.stderr = (Box::new(stderr), isatty); + pub fn stderr(&mut self, stderr: impl StdoutStream + 'static) -> &mut Self { + self.stderr = Box::new(stderr); self } pub fn inherit_stdin(&mut self) -> &mut Self { - use is_terminal::IsTerminal; - let inherited = stdio::stdin(); - let isatty = if inherited.is_terminal() { - IsATTY::Yes - } else { - IsATTY::No - }; - self.stdin(inherited, isatty) + self.stdin(stdio::stdin()) } pub fn inherit_stdout(&mut self) -> &mut Self { - use is_terminal::IsTerminal; - let inherited = stdio::stdout(); - let isatty = if inherited.is_terminal() { - IsATTY::Yes - } else { - IsATTY::No - }; - self.stdout(inherited, isatty) + self.stdout(stdio::stdout()) } pub fn inherit_stderr(&mut self) -> &mut Self { - use is_terminal::IsTerminal; - let inherited = stdio::stderr(); - let isatty = if inherited.is_terminal() { - IsATTY::Yes - } else { - IsATTY::No - }; - self.stderr(inherited, isatty) + self.stderr(stdio::stderr()) } pub fn inherit_stdio(&mut self) -> &mut Self { @@ -273,7 +248,6 @@ impl WasiCtxBuilder { pub fn build(&mut self, table: &mut Table) -> Result { assert!(!self.built); - use anyhow::Context; let Self { stdin, stdout, @@ -291,38 +265,10 @@ impl WasiCtxBuilder { } = mem::replace(self, Self::new()); self.built = true; - let stdin_terminal = Some(if let IsATTY::Yes = stdin.1 { - Some(Resource::new_own( - table.push(Box::new(HostTerminalInputState))?, - )) - } else { - None - }); - let stdout_terminal = Some(if let IsATTY::Yes = stdout.1 { - Some(Resource::new_own( - table.push(Box::new(HostTerminalOutputState))?, - )) - } else { - None - }); - let stderr_terminal = Some(if let IsATTY::Yes = stderr.1 { - Some(Resource::new_own( - table.push(Box::new(HostTerminalOutputState))?, - )) - } else { - None - }); - let stdin = Some(table.push_input_stream(stdin.0).context("stdin")?); - let stdout = Some(table.push_output_stream(stdout.0).context("stdout")?); - let stderr = Some(table.push_output_stream(stderr.0).context("stderr")?); - Ok(WasiCtx { stdin, - stdin_terminal, stdout, - stdout_terminal, stderr, - stderr_terminal, env, args, preopens, @@ -352,11 +298,8 @@ pub struct WasiCtx { pub(crate) env: Vec<(String, String)>, pub(crate) args: Vec, pub(crate) preopens: Vec<(Dir, String)>, - pub(crate) stdin: Option>, - pub(crate) stdout: Option>, - pub(crate) stderr: Option>, - pub(crate) stdin_terminal: Option>>, - pub(crate) stdout_terminal: Option>>, - pub(crate) stderr_terminal: Option>>, + pub(crate) stdin: Box, + pub(crate) stdout: Box, + pub(crate) stderr: Box, pub(crate) pool: Pool, } diff --git a/crates/wasi/src/preview2/pipe.rs b/crates/wasi/src/preview2/pipe.rs index 69749db9d43c..135603ddf0a3 100644 --- a/crates/wasi/src/preview2/pipe.rs +++ b/crates/wasi/src/preview2/pipe.rs @@ -10,45 +10,46 @@ use crate::preview2::{HostInputStream, HostOutputStream, OutputStreamError, StreamState}; use anyhow::{anyhow, Error}; use bytes::Bytes; +use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; pub use crate::preview2::write_stream::AsyncWriteStream; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MemoryInputPipe { - buffer: std::io::Cursor, + buffer: Arc>, } impl MemoryInputPipe { pub fn new(bytes: Bytes) -> Self { Self { - buffer: std::io::Cursor::new(bytes), + buffer: Arc::new(Mutex::new(bytes)), } } pub fn is_empty(&self) -> bool { - self.buffer.get_ref().len() as u64 == self.buffer.position() + self.buffer.lock().unwrap().is_empty() } } #[async_trait::async_trait] impl HostInputStream for MemoryInputPipe { fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { - if self.is_empty() { + let mut buffer = self.buffer.lock().unwrap(); + if buffer.is_empty() { return Ok((Bytes::new(), StreamState::Closed)); } - let mut dest = bytes::BytesMut::zeroed(size); - let nbytes = std::io::Read::read(&mut self.buffer, dest.as_mut())?; - dest.truncate(nbytes); - - let state = if self.is_empty() { + let size = size.min(buffer.len()); + let read = buffer.split_to(size); + let state = if buffer.is_empty() { StreamState::Closed } else { StreamState::Open }; - Ok((dest.freeze(), state)) + Ok((read, state)) } + async fn ready(&mut self) -> Result<(), Error> { Ok(()) } @@ -57,7 +58,7 @@ impl HostInputStream for MemoryInputPipe { #[derive(Debug, Clone)] pub struct MemoryOutputPipe { capacity: usize, - buffer: std::sync::Arc>, + buffer: Arc>, } impl MemoryOutputPipe { @@ -223,6 +224,7 @@ impl HostInputStream for AsyncReadStream { } /// An output stream that consumes all input written to it, and is always ready. +#[derive(Copy, Clone)] pub struct SinkOutputStream; #[async_trait::async_trait] @@ -242,6 +244,7 @@ impl HostOutputStream for SinkOutputStream { } /// A stream that is ready immediately, but will always report that it's closed. +#[derive(Copy, Clone)] pub struct ClosedInputStream; #[async_trait::async_trait] @@ -256,6 +259,7 @@ impl HostInputStream for ClosedInputStream { } /// An output stream that is always closed. +#[derive(Copy, Clone)] pub struct ClosedOutputStream; #[async_trait::async_trait] diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index bcef7a0aefaf..40b3f149443e 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -3,13 +3,57 @@ use crate::preview2::bindings::cli::{ terminal_stdout, }; use crate::preview2::bindings::io::streams; -use crate::preview2::pipe::AsyncWriteStream; -use crate::preview2::{HostOutputStream, OutputStreamError, WasiView}; -use anyhow::bail; -use bytes::Bytes; -use is_terminal::IsTerminal; +use crate::preview2::pipe::{self, AsyncWriteStream}; +use crate::preview2::stream::TableStreamExt; +use crate::preview2::{HostInputStream, HostOutputStream, WasiView}; +use std::io::IsTerminal; use wasmtime::component::Resource; +/// A trait used to represent the standard input to a guest program. +/// +/// This is used to implement various WASI APIs via the method implementations +/// below. +/// +/// Built-in implementations are provided for [`Stdin`], +/// [`pipe::MemoryInputPipe`], and [`pipe::ClosedInputStream`]. +pub trait StdinStream: Send + Sync { + /// Creates a fresh stream which is reading stdin. + /// + /// Note that the returned stream must share state with all other streams + /// previously created. Guests may create multiple handles to the same stdin + /// and they should all be synchronized in their progress through the + /// program's input. + /// + /// Note that this means that if one handle becomes ready for reading they + /// all become ready for reading. Subsequently if one is read from it may + /// mean that all the others are no longer ready for reading. This is + /// basically a consequence of the way the WIT APIs are designed today. + fn stream(&self) -> Box; + + /// Returns whether this stream is backed by a TTY. + fn isatty(&self) -> bool; +} + +impl StdinStream for pipe::MemoryInputPipe { + fn stream(&self) -> Box { + Box::new(self.clone()) + } + + fn isatty(&self) -> bool { + false + } +} + +impl StdinStream for pipe::ClosedInputStream { + fn stream(&self) -> Box { + Box::new(self.clone()) + } + + fn isatty(&self) -> bool { + false + } +} + mod worker_thread_stdin; pub use self::worker_thread_stdin::{stdin, Stdin}; @@ -19,55 +63,91 @@ pub use self::worker_thread_stdin::{stdin, Stdin}; // and tokio's stdout/err. const STDIO_BUFFER_SIZE: usize = 4096; -pub struct Stdout(AsyncWriteStream); - -pub fn stdout() -> Stdout { - Stdout(AsyncWriteStream::new( - STDIO_BUFFER_SIZE, - tokio::io::stdout(), - )) +/// Similar to [`StdinStream`], except for output. +pub trait StdoutStream: Send + Sync { + /// Returns a fresh new stream which can write to this output stream. + /// + /// Note that all output streams should output to the same logical source. + /// This means that it's possible for each independent stream to acquire a + /// separate "permit" to write and then act on that permit. Note that + /// additionally at this time once a permit is "acquired" there's no way to + /// release it, for example you can wait for readiness and then never + /// actually write in WASI. This means that acquisition of a permit for one + /// stream cannot discount the size of a permit another stream could + /// obtain. + /// + /// Implementations must be able to handle this + fn stream(&self) -> Box; + + /// Returns whether this stream is backed by a TTY. + fn isatty(&self) -> bool; } -impl IsTerminal for Stdout { - fn is_terminal(&self) -> bool { - std::io::stdout().is_terminal() + +impl StdoutStream for pipe::MemoryOutputPipe { + fn stream(&self) -> Box { + Box::new(self.clone()) + } + + fn isatty(&self) -> bool { + false } } -#[async_trait::async_trait] -impl HostOutputStream for Stdout { - fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { - self.0.write(bytes) + +impl StdoutStream for pipe::SinkOutputStream { + fn stream(&self) -> Box { + Box::new(self.clone()) } - fn flush(&mut self) -> Result<(), OutputStreamError> { - self.0.flush() + + fn isatty(&self) -> bool { + false } - async fn write_ready(&mut self) -> Result { - self.0.write_ready().await +} + +impl StdoutStream for pipe::ClosedOutputStream { + fn stream(&self) -> Box { + Box::new(self.clone()) + } + + fn isatty(&self) -> bool { + false } } -pub struct Stderr(AsyncWriteStream); +pub struct Stdout; -pub fn stderr() -> Stderr { - Stderr(AsyncWriteStream::new( - STDIO_BUFFER_SIZE, - tokio::io::stderr(), - )) +pub fn stdout() -> Stdout { + Stdout } -impl IsTerminal for Stderr { - fn is_terminal(&self) -> bool { - std::io::stderr().is_terminal() + +impl StdoutStream for Stdout { + fn stream(&self) -> Box { + Box::new(AsyncWriteStream::new( + STDIO_BUFFER_SIZE, + tokio::io::stdout(), + )) } -} -#[async_trait::async_trait] -impl HostOutputStream for Stderr { - fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { - self.0.write(bytes) + + fn isatty(&self) -> bool { + std::io::stdout().is_terminal() } - fn flush(&mut self) -> Result<(), OutputStreamError> { - self.0.flush() +} + +pub struct Stderr; + +pub fn stderr() -> Stderr { + Stderr +} + +impl StdoutStream for Stderr { + fn stream(&self) -> Box { + Box::new(AsyncWriteStream::new( + STDIO_BUFFER_SIZE, + tokio::io::stderr(), + )) } - async fn write_ready(&mut self) -> Result { - self.0.write_ready().await + + fn isatty(&self) -> bool { + std::io::stderr().is_terminal() } } @@ -79,48 +159,39 @@ pub enum IsATTY { impl stdin::Host for T { fn get_stdin(&mut self) -> Result, anyhow::Error> { - match self.ctx_mut().stdin.take() { - Some(stdin) => Ok(stdin), - None => bail!("stdin has already been consumed"), - } + let stream = self.ctx_mut().stdin.stream(); + Ok(self.table_mut().push_input_stream(stream)?) } } impl stdout::Host for T { fn get_stdout(&mut self) -> Result, anyhow::Error> { - match self.ctx_mut().stdout.take() { - Some(stdout) => Ok(stdout), - None => bail!("stdout has already been consumed"), - } + let stream = self.ctx_mut().stdout.stream(); + Ok(self.table_mut().push_output_stream(stream)?) } } impl stderr::Host for T { fn get_stderr(&mut self) -> Result, anyhow::Error> { - match self.ctx_mut().stderr.take() { - Some(stderr) => Ok(stderr), - None => { - bail!("stderr has already been consumed") - } - } + let stream = self.ctx_mut().stderr.stream(); + Ok(self.table_mut().push_output_stream(stream)?) } } -pub struct HostTerminalInputState; -pub struct HostTerminalOutputState; +pub struct HostTerminalInput; +pub struct HostTerminalOutput; impl terminal_input::Host for T {} impl crate::preview2::bindings::cli::terminal_input::HostTerminalInput for T { fn drop(&mut self, r: Resource) -> anyhow::Result<()> { - self.table_mut().delete::(r.rep())?; + self.table_mut().delete::(r.rep())?; Ok(()) } } impl terminal_output::Host for T {} impl crate::preview2::bindings::cli::terminal_output::HostTerminalOutput for T { fn drop(&mut self, r: Resource) -> anyhow::Result<()> { - self.table_mut() - .delete::(r.rep())?; + self.table_mut().delete::(r.rep())?; Ok(()) } } @@ -128,9 +199,11 @@ impl terminal_stdin::Host for T { fn get_terminal_stdin( &mut self, ) -> anyhow::Result>> { - match self.ctx_mut().stdin_terminal.take() { - Some(stdin_terminal) => Ok(stdin_terminal), - None => bail!("stdin terminal has already been consumed"), + if self.ctx().stdin.isatty() { + let fd = self.table_mut().push(Box::new(HostTerminalInput))?; + Ok(Some(Resource::new_own(fd))) + } else { + Ok(None) } } } @@ -138,9 +211,11 @@ impl terminal_stdout::Host for T { fn get_terminal_stdout( &mut self, ) -> anyhow::Result>> { - match self.ctx_mut().stdout_terminal.take() { - Some(stdout_terminal) => Ok(stdout_terminal), - None => bail!("stdout terminal has already been consumed"), + if self.ctx().stdout.isatty() { + let fd = self.table_mut().push(Box::new(HostTerminalOutput))?; + Ok(Some(Resource::new_own(fd))) + } else { + Ok(None) } } } @@ -148,9 +223,11 @@ impl terminal_stderr::Host for T { fn get_terminal_stderr( &mut self, ) -> anyhow::Result>> { - match self.ctx_mut().stderr_terminal.take() { - Some(stderr_terminal) => Ok(stderr_terminal), - None => bail!("stderr terminal has already been consumed"), + if self.ctx().stderr.isatty() { + let fd = self.table_mut().push(Box::new(HostTerminalOutput))?; + Ok(Some(Resource::new_own(fd))) + } else { + Ok(None) } } } diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index e1505dff844b..bf933ad8398e 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -23,10 +23,11 @@ //! This module is one that's likely to change over time though as new systems //! are encountered along with preexisting bugs. +use crate::preview2::stdio::StdinStream; use crate::preview2::{HostInputStream, StreamState}; use anyhow::Error; use bytes::{Bytes, BytesMut}; -use std::io::Read; +use std::io::{IsTerminal, Read}; use std::mem; use std::sync::{Condvar, Mutex, OnceLock}; use tokio::sync::Notify; @@ -103,8 +104,12 @@ pub fn stdin() -> Stdin { Stdin } -impl is_terminal::IsTerminal for Stdin { - fn is_terminal(&self) -> bool { +impl StdinStream for Stdin { + fn stream(&self) -> Box { + Box::new(Stdin) + } + + fn isatty(&self) -> bool { std::io::stdin().is_terminal() } }