Skip to content

Commit

Permalink
Update stdio on Unix to fall back to worker threads (bytecodealliance…
Browse files Browse the repository at this point in the history
…#6833)

Not all file descriptors can get registered with epoll, for example
files on Linux or `/dev/null` on macOS return errors. In these
situations the fallback of the worker thread implementation is used
instead.
  • Loading branch information
alexcrichton authored and eduardomourar committed Aug 18, 2023
1 parent 24940ed commit 23a3f15
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 28 deletions.
77 changes: 49 additions & 28 deletions crates/wasi/src/preview2/stdio/unix.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::worker_thread_stdin;
use crate::preview2::{pipe::AsyncReadStream, HostInputStream, StreamState};
use anyhow::Error;
use bytes::Bytes;
Expand All @@ -16,26 +17,38 @@ use tokio::io::{AsyncRead, ReadBuf};
static STDIN: OnceLock<Stdin> = OnceLock::new();

#[derive(Clone)]
pub struct Stdin(Arc<Mutex<AsyncReadStream>>);
pub enum Stdin {
// The process's standard input can be successfully registered with `epoll`,
// so it's tracked by a native async stream.
Async(Arc<Mutex<AsyncReadStream>>),

// The process's stdin can't be registered with epoll, for example it's a
// file on Linux or `/dev/null` on macOS. The fallback implementation of a
// worker thread is used in these situations.
Blocking(worker_thread_stdin::Stdin),
}

pub fn stdin() -> Stdin {
fn init_stdin() -> AsyncReadStream {
fn init_stdin() -> anyhow::Result<AsyncReadStream> {
use crate::preview2::RUNTIME;
match tokio::runtime::Handle::try_current() {
Ok(_) => AsyncReadStream::new(InnerStdin::new().unwrap()),
Ok(_) => Ok(AsyncReadStream::new(InnerStdin::new()?)),
Err(_) => {
let _enter = RUNTIME.enter();
RUNTIME.block_on(async { AsyncReadStream::new(InnerStdin::new().unwrap()) })
RUNTIME.block_on(async { Ok(AsyncReadStream::new(InnerStdin::new()?)) })
}
}
}

let handle = STDIN
.get_or_init(|| Stdin(Arc::new(Mutex::new(init_stdin()))))
.get_or_init(|| match init_stdin() {
Ok(stream) => Stdin::Async(Arc::new(Mutex::new(stream))),
Err(_) => Stdin::Blocking(worker_thread_stdin::stdin()),
})
.clone();

{
let mut guard = handle.0.lock().unwrap();
if let Stdin::Async(stream) = &handle {
let mut guard = stream.lock().unwrap();

// The backing task exited. This can happen in two cases:
//
Expand All @@ -45,7 +58,7 @@ pub fn stdin() -> Stdin {
// As we can't tell the difference between these two, we assume the latter and restart the
// task.
if guard.join_handle.is_finished() {
*guard = init_stdin();
*guard = init_stdin().unwrap();
}
}

Expand All @@ -55,31 +68,39 @@ pub fn stdin() -> Stdin {
#[async_trait::async_trait]
impl crate::preview2::HostInputStream for Stdin {
fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> {
HostInputStream::read(&mut *self.0.lock().unwrap(), size)
match self {
Stdin::Async(s) => HostInputStream::read(&mut *s.lock().unwrap(), size),
Stdin::Blocking(s) => s.read(size),
}
}

async fn ready(&mut self) -> Result<(), Error> {
// Custom Future impl takes the std mutex in each invocation of poll.
// Required so we don't have to use a tokio mutex, which we can't take from
// inside a sync context in Self::read.
//
// Taking the lock, creating a fresh ready() future, polling it once, and
// then releasing the lock is acceptable here because the ready() future
// is only ever going to await on a single channel recv, plus some management
// of a state machine (for buffering).
struct Ready<'a> {
handle: &'a Stdin,
}
impl<'a> Future for Ready<'a> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut locked = self.handle.0.lock().unwrap();
let fut = locked.ready();
tokio::pin!(fut);
fut.poll(cx)
match self {
Stdin::Async(handle) => {
// Custom Future impl takes the std mutex in each invocation of poll.
// Required so we don't have to use a tokio mutex, which we can't take from
// inside a sync context in Self::read.
//
// Taking the lock, creating a fresh ready() future, polling it once, and
// then releasing the lock is acceptable here because the ready() future
// is only ever going to await on a single channel recv, plus some management
// of a state machine (for buffering).
struct Ready<'a> {
handle: &'a Arc<Mutex<AsyncReadStream>>,
}
impl<'a> Future for Ready<'a> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut locked = self.handle.lock().unwrap();
let fut = locked.ready();
tokio::pin!(fut);
fut.poll(cx)
}
}
Ready { handle }.await
}
Stdin::Blocking(s) => s.ready().await,
}
Ready { handle: self }.await
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/wasi/src/preview2/stdio/worker_thread_stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fn create() -> GlobalStdin {
}

/// Only public interface is the [`HostInputStream`] impl.
#[derive(Clone)]
pub struct Stdin;
impl Stdin {
// Private! Only required internally.
Expand Down

0 comments on commit 23a3f15

Please sign in to comment.