From 5359a8c2c59c3c89a0b0a0584255791f87314e69 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 18 Sep 2023 22:46:21 -0500 Subject: [PATCH] Force usage of a worker thread for stdin on all platforms (#7058) This commit is a follow-up to #6833 to remove the `unix` module for handling stdio which sets stdin to nonblocking mode. I've just now discovered that on macOS at least configuring `O_NONBLOCK` for stdin affects the stdout/stderr descriptors too. This program for example will panic: fn main() { unsafe { let r = libc::fcntl( libc::STDIN_FILENO, libc::F_SETFL, libc::fcntl(libc::STDIN_FILENO, libc::F_GETFL) | libc::O_NONBLOCK, ); assert_eq!(r, 0); } loop { println!("hello"); } } It was originally assumed that updating the flags for stdin wouldn't affect anything else except Wasmtime, but because this looks to not be the case this commit removes the logic of registering stdin raw with Tokio and instead unconditionally using the worker thread solution which should work in all situations. --- crates/wasi/src/preview2/stdio.rs | 7 -- crates/wasi/src/preview2/stdio/unix.rs | 160 ------------------------- 2 files changed, 167 deletions(-) delete mode 100644 crates/wasi/src/preview2/stdio/unix.rs diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index ca8453a3b999..f385329c1a37 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -8,14 +8,7 @@ use crate::preview2::{HostOutputStream, OutputStreamError, WasiView}; use bytes::Bytes; use is_terminal::IsTerminal; -#[cfg(unix)] -mod unix; -#[cfg(unix)] -pub use self::unix::{stdin, Stdin}; - -#[allow(dead_code)] mod worker_thread_stdin; -#[cfg(windows)] pub use self::worker_thread_stdin::{stdin, Stdin}; // blocking-write-and-flush must accept 4k. It doesn't seem likely that we need to diff --git a/crates/wasi/src/preview2/stdio/unix.rs b/crates/wasi/src/preview2/stdio/unix.rs deleted file mode 100644 index e43e64b8d6dc..000000000000 --- a/crates/wasi/src/preview2/stdio/unix.rs +++ /dev/null @@ -1,160 +0,0 @@ -use super::worker_thread_stdin; -use crate::preview2::{pipe::AsyncReadStream, HostInputStream, StreamState}; -use anyhow::Error; -use bytes::Bytes; -use futures::ready; -use std::future::Future; -use std::io::{self, Read}; -use std::pin::Pin; -use std::sync::{Arc, Mutex, OnceLock}; -use std::task::{Context, Poll}; -use tokio::io::unix::AsyncFd; -use tokio::io::{AsyncRead, Interest, ReadBuf}; - -// We need a single global instance of the AsyncFd because creating -// this instance registers the process's stdin fd with epoll, which will -// return an error if an fd is registered more than once. -static STDIN: OnceLock = OnceLock::new(); - -#[derive(Clone)] -pub enum Stdin { - // The process's standard input can be successfully registered with `epoll`, - // so it's tracked by a native async stream. - Async(Arc>), - - // The process's stdin can't be registered with epoll, for example it's a - // file on Linux or `/dev/null` on macOS. The fallback implementation of a - // worker thread is used in these situations. - Blocking(worker_thread_stdin::Stdin), -} - -pub fn stdin() -> Stdin { - fn init_stdin() -> anyhow::Result { - use crate::preview2::RUNTIME; - match tokio::runtime::Handle::try_current() { - Ok(_) => Ok(AsyncReadStream::new(InnerStdin::new()?)), - Err(_) => { - let _enter = RUNTIME.enter(); - RUNTIME.block_on(async { Ok(AsyncReadStream::new(InnerStdin::new()?)) }) - } - } - } - - let handle = STDIN - .get_or_init(|| match init_stdin() { - Ok(stream) => Stdin::Async(Arc::new(Mutex::new(stream))), - Err(_) => Stdin::Blocking(worker_thread_stdin::stdin()), - }) - .clone(); - - if let Stdin::Async(stream) = &handle { - let mut guard = stream.lock().unwrap(); - - // The backing task exited. This can happen in two cases: - // - // 1. the task crashed - // 2. the runtime has exited and been restarted in the same process - // - // As we can't tell the difference between these two, we assume the latter and restart the - // task. - if guard.join_handle.is_finished() { - *guard = init_stdin().unwrap(); - } - } - - handle -} - -impl is_terminal::IsTerminal for Stdin { - fn is_terminal(&self) -> bool { - std::io::stdin().is_terminal() - } -} - -#[async_trait::async_trait] -impl crate::preview2::HostInputStream for Stdin { - fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { - match self { - Stdin::Async(s) => HostInputStream::read(&mut *s.lock().unwrap(), size), - Stdin::Blocking(s) => s.read(size), - } - } - - async fn ready(&mut self) -> Result<(), Error> { - match self { - Stdin::Async(handle) => { - // Custom Future impl takes the std mutex in each invocation of poll. - // Required so we don't have to use a tokio mutex, which we can't take from - // inside a sync context in Self::read. - // - // Taking the lock, creating a fresh ready() future, polling it once, and - // then releasing the lock is acceptable here because the ready() future - // is only ever going to await on a single channel recv, plus some management - // of a state machine (for buffering). - struct Ready<'a> { - handle: &'a Arc>, - } - impl<'a> Future for Ready<'a> { - type Output = Result<(), Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut locked = self.handle.lock().unwrap(); - let fut = locked.ready(); - tokio::pin!(fut); - fut.poll(cx) - } - } - Ready { handle }.await - } - Stdin::Blocking(s) => s.ready().await, - } - } -} - -struct InnerStdin { - inner: AsyncFd, -} - -impl InnerStdin { - pub fn new() -> anyhow::Result { - use rustix::fs::OFlags; - use std::os::fd::AsRawFd; - - let stdin = std::io::stdin(); - - let borrowed_fd = unsafe { rustix::fd::BorrowedFd::borrow_raw(stdin.as_raw_fd()) }; - let flags = rustix::fs::fcntl_getfl(borrowed_fd)?; - if !flags.contains(OFlags::NONBLOCK) { - rustix::fs::fcntl_setfl(borrowed_fd, flags.union(OFlags::NONBLOCK))?; - } - - Ok(Self { - inner: AsyncFd::with_interest(stdin, Interest::READABLE)?, - }) - } -} - -impl AsyncRead for InnerStdin { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - loop { - let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?; - - let unfilled = buf.initialize_unfilled(); - match guard.try_io(|inner| inner.get_mut().read(unfilled)) { - Ok(Ok(len)) => { - buf.advance(len); - return Poll::Ready(Ok(())); - } - Ok(Err(err)) => { - return Poll::Ready(Err(err)); - } - Err(_would_block) => { - continue; - } - } - } - } -}