diff --git a/crates/test-programs/src/bin/api_proxy_streaming.rs b/crates/test-programs/src/bin/api_proxy_streaming.rs index a5f59b4bb3d8..5f1c0560c8e5 100644 --- a/crates/test-programs/src/bin/api_proxy_streaming.rs +++ b/crates/test-programs/src/bin/api_proxy_streaming.rs @@ -350,7 +350,7 @@ mod executor { let mut ready = vec![false; wakers.len()]; - for index in io::poll::poll_list(&pollables) { + for index in io::poll::poll(&pollables) { ready[usize::try_from(index).unwrap()] = true; } diff --git a/crates/test-programs/src/bin/api_reactor.rs b/crates/test-programs/src/bin/api_reactor.rs index 5b700e9d2d1d..b8aff333917e 100644 --- a/crates/test-programs/src/bin/api_reactor.rs +++ b/crates/test-programs/src/bin/api_reactor.rs @@ -7,7 +7,6 @@ wit_bindgen::generate!({ }); struct T; -use crate::wasi::io::poll; static mut STATE: Vec = Vec::new(); @@ -34,7 +33,7 @@ impl Guest for T { for s in STATE.iter() { let mut out = s.as_bytes(); while !out.is_empty() { - poll::poll_list(&[&pollable]); + pollable.block(); let n = match o.check_write() { Ok(n) => n, Err(_) => return Err(()), @@ -52,8 +51,7 @@ impl Guest for T { Ok(_) => {} Err(_) => return Err(()), } - - poll::poll_list(&[&pollable]); + pollable.block(); match o.check_write() { Ok(_) => {} Err(_) => return Err(()), diff --git a/crates/test-programs/src/bin/preview2_ip_name_lookup.rs b/crates/test-programs/src/bin/preview2_ip_name_lookup.rs index fc0b25de85a4..29c851a44b6f 100644 --- a/crates/test-programs/src/bin/preview2_ip_name_lookup.rs +++ b/crates/test-programs/src/bin/preview2_ip_name_lookup.rs @@ -8,7 +8,7 @@ fn main() { let addresses = ip_name_lookup::resolve_addresses(&network, "example.com", None, false).unwrap(); let pollable = addresses.subscribe(); - poll::poll_one(&pollable); + pollable.block(); assert!(addresses.resolve_next_address().is_ok()); let result = ip_name_lookup::resolve_addresses(&network, "a.b<&>", None, false); @@ -20,7 +20,7 @@ fn main() { let addresses = ip_name_lookup::resolve_addresses(&network, "github.com", None, false).unwrap(); let lookup = addresses.subscribe(); let timeout = monotonic_clock::subscribe_duration(1_000_000_000); - let ready = poll::poll_list(&[&lookup, &timeout]); + let ready = poll::poll(&[&lookup, &timeout]); assert!(ready.len() > 0); match ready[0] { 0 => loop { diff --git a/crates/test-programs/src/bin/preview2_sleep.rs b/crates/test-programs/src/bin/preview2_sleep.rs index 15de238dc8cf..a6c8dae39840 100644 --- a/crates/test-programs/src/bin/preview2_sleep.rs +++ b/crates/test-programs/src/bin/preview2_sleep.rs @@ -1,4 +1,4 @@ -use test_programs::wasi::{clocks::monotonic_clock, io::poll}; +use test_programs::wasi::clocks::monotonic_clock; fn main() { sleep_10ms(); @@ -9,19 +9,25 @@ fn main() { fn sleep_10ms() { let dur = 10_000_000; let p = monotonic_clock::subscribe_instant(monotonic_clock::now() + dur); - poll::poll_one(&p); + p.block(); let p = monotonic_clock::subscribe_duration(dur); - poll::poll_one(&p); + p.block(); } fn sleep_0ms() { let p = monotonic_clock::subscribe_instant(monotonic_clock::now()); - poll::poll_one(&p); + p.block(); let p = monotonic_clock::subscribe_duration(0); - poll::poll_one(&p); + assert!( + p.ready(), + "timer subscription with duration 0 is ready immediately" + ); } fn sleep_backwards_in_time() { let p = monotonic_clock::subscribe_instant(monotonic_clock::now() - 1); - poll::poll_one(&p); + assert!( + p.ready(), + "timer subscription for instant which has passed is ready immediately" + ); } diff --git a/crates/test-programs/src/bin/preview2_stream_pollable_correct.rs b/crates/test-programs/src/bin/preview2_stream_pollable_correct.rs index dfb15573470e..044ebb336aa9 100644 --- a/crates/test-programs/src/bin/preview2_stream_pollable_correct.rs +++ b/crates/test-programs/src/bin/preview2_stream_pollable_correct.rs @@ -1,12 +1,11 @@ use test_programs::wasi::cli::stdin; -use test_programs::wasi::io::poll; use test_programs::wasi::io::streams; fn main() { let stdin: streams::InputStream = stdin::get_stdin(); let stdin_pollable = stdin.subscribe(); - let ready = poll::poll_list(&[&stdin_pollable]); - assert_eq!(ready, &[0]); + stdin_pollable.block(); + assert!(stdin_pollable.ready(), "after blocking, pollable is ready"); drop(stdin_pollable); drop(stdin); } diff --git a/crates/test-programs/src/bin/preview2_stream_pollable_traps.rs b/crates/test-programs/src/bin/preview2_stream_pollable_traps.rs index 13331fb02ee6..bcdb958b3433 100644 --- a/crates/test-programs/src/bin/preview2_stream_pollable_traps.rs +++ b/crates/test-programs/src/bin/preview2_stream_pollable_traps.rs @@ -1,12 +1,10 @@ use test_programs::wasi::cli::stdin; -use test_programs::wasi::io::poll; use test_programs::wasi::io::streams; fn main() { let stdin: streams::InputStream = stdin::get_stdin(); let stdin_pollable = stdin.subscribe(); - let ready = poll::poll_list(&[&stdin_pollable]); - assert_eq!(ready, &[0]); + stdin_pollable.block(); drop(stdin); unreachable!("execution should have trapped in line above when stream dropped before pollable"); } diff --git a/crates/test-programs/src/http.rs b/crates/test-programs/src/http.rs index 4637ec713570..1f16348f2462 100644 --- a/crates/test-programs/src/http.rs +++ b/crates/test-programs/src/http.rs @@ -1,5 +1,4 @@ use crate::wasi::http::{outgoing_handler, types as http_types}; -use crate::wasi::io::poll; use crate::wasi::io::streams; use anyhow::{anyhow, Result}; use std::fmt; @@ -72,7 +71,7 @@ pub fn request( let pollable = request_body.subscribe(); while !buf.is_empty() { - poll::poll_list(&[&pollable]); + pollable.block(); let permit = match request_body.check_write() { Ok(n) => n, @@ -94,7 +93,7 @@ pub fn request( _ => {} } - poll::poll_list(&[&pollable]); + pollable.block(); match request_body.check_write() { Ok(_) => {} @@ -110,7 +109,7 @@ pub fn request( Some(result) => result.map_err(|_| anyhow!("incoming response errored"))?, None => { let pollable = future_response.subscribe(); - let _ = poll::poll_list(&[&pollable]); + pollable.block(); future_response .get() .expect("incoming response available") @@ -140,7 +139,7 @@ pub fn request( let mut body = Vec::new(); loop { - poll::poll_list(&[&input_stream_pollable]); + input_stream_pollable.block(); let mut body_chunk = match input_stream.read(1024 * 1024) { Ok(c) => c, diff --git a/crates/test-programs/src/sockets.rs b/crates/test-programs/src/sockets.rs index 74a65d74fa54..2d1a0c99d103 100644 --- a/crates/test-programs/src/sockets.rs +++ b/crates/test-programs/src/sockets.rs @@ -16,12 +16,8 @@ use std::ops::Range; const TIMEOUT_NS: u64 = 1_000_000_000; impl Pollable { - pub fn wait(&self) { - poll::poll_one(self); - } - pub fn wait_until(&self, timeout: &Pollable) -> Result<(), ErrorCode> { - let ready = poll::poll_list(&[self, timeout]); + let ready = poll::poll(&[self, timeout]); assert!(ready.len() > 0); match ready[0] { 0 => Ok(()), @@ -36,7 +32,7 @@ impl OutputStream { let pollable = self.subscribe(); while !bytes.is_empty() { - pollable.wait(); + pollable.block(); let permit = self.check_write()?; @@ -75,7 +71,7 @@ impl TcpSocket { loop { match self.finish_bind() { - Err(ErrorCode::WouldBlock) => sub.wait(), + Err(ErrorCode::WouldBlock) => sub.block(), result => return result, } } @@ -88,7 +84,7 @@ impl TcpSocket { loop { match self.finish_listen() { - Err(ErrorCode::WouldBlock) => sub.wait(), + Err(ErrorCode::WouldBlock) => sub.block(), result => return result, } } @@ -105,7 +101,7 @@ impl TcpSocket { loop { match self.finish_connect() { - Err(ErrorCode::WouldBlock) => sub.wait(), + Err(ErrorCode::WouldBlock) => sub.block(), result => return result, } } @@ -116,7 +112,7 @@ impl TcpSocket { loop { match self.accept() { - Err(ErrorCode::WouldBlock) => sub.wait(), + Err(ErrorCode::WouldBlock) => sub.block(), result => return result, } } @@ -139,7 +135,7 @@ impl UdpSocket { loop { match self.finish_bind() { - Err(ErrorCode::WouldBlock) => sub.wait(), + Err(ErrorCode::WouldBlock) => sub.block(), result => return result, } } diff --git a/crates/wasi-http/wit/deps/http/types.wit b/crates/wasi-http/wit/deps/http/types.wit index 88902bf8c9c8..9a0c8956b540 100644 --- a/crates/wasi-http/wit/deps/http/types.wit +++ b/crates/wasi-http/wit/deps/http/types.wit @@ -223,7 +223,7 @@ interface types { /// transport layer of the HTTP protocol. /// /// These timeouts are separate from any the user may use to bound a - /// blocking call to `wasi:io/poll.poll-list`. + /// blocking call to `wasi:io/poll.poll`. resource request-options { /// Construct a default `request-options` value. constructor(); diff --git a/crates/wasi-http/wit/deps/io/poll.wit b/crates/wasi-http/wit/deps/io/poll.wit index 52a4957dc496..d85028d6ca4c 100644 --- a/crates/wasi-http/wit/deps/io/poll.wit +++ b/crates/wasi-http/wit/deps/io/poll.wit @@ -3,8 +3,21 @@ package wasi:io@0.2.0-rc-2023-11-05; /// A poll API intended to let users wait for I/O events on multiple handles /// at once. interface poll { - /// A "pollable" handle. - resource pollable; + /// `pollable` epresents a single I/O event which may be ready, or not. + resource pollable { + + /// Return the readiness of a pollable. This function never blocks. + /// + /// Returns `true` when the pollable is ready, and `false` otherwise. + ready: func() -> bool; + + /// `block` returns immediately if the pollable is ready, and otherwise + /// blocks until ready. + /// + /// This function is equivalent to calling `poll.poll` on a list + /// containing only this pollable. + block: func(); + } /// Poll for completion on a set of pollables. /// @@ -24,11 +37,5 @@ interface poll { /// do any I/O so it doesn't fail. If any of the I/O sources identified by /// the pollables has an error, it is indicated by marking the source as /// being reaedy for I/O. - poll-list: func(in: list>) -> list; - - /// Poll for completion on a single pollable. - /// - /// This function is similar to `poll-list`, but operates on only a single - /// pollable. When it returns, the handle is ready for I/O. - poll-one: func(in: borrow); + poll: func(in: list>) -> list; } diff --git a/crates/wasi-preview1-component-adapter/src/lib.rs b/crates/wasi-preview1-component-adapter/src/lib.rs index 267a3ac6570f..66d56da23589 100644 --- a/crates/wasi-preview1-component-adapter/src/lib.rs +++ b/crates/wasi-preview1-component-adapter/src/lib.rs @@ -39,7 +39,7 @@ pub mod bindings { // can't support in these special core-wasm adapters. // Instead, we manually define the bindings for these functions in // terms of raw pointers. - skip: ["run", "get-environment", "poll-list"], + skip: ["run", "get-environment", "poll"], }); #[cfg(feature = "reactor")] @@ -54,7 +54,7 @@ pub mod bindings { // can't support in these special core-wasm adapters. // Instead, we manually define the bindings for these functions in // terms of raw pointers. - skip: ["get-environment", "poll-list"], + skip: ["get-environment", "poll"], }); } @@ -1780,8 +1780,8 @@ pub unsafe extern "C" fn poll_oneoff( #[link(wasm_import_module = "wasi:io/poll@0.2.0-rc-2023-11-05")] #[allow(improper_ctypes)] // FIXME(bytecodealliance/wit-bindgen#684) extern "C" { - #[link_name = "poll-list"] - fn poll_list_import(pollables: *const Pollable, len: usize, rval: *mut ReadyList); + #[link_name = "poll"] + fn poll_import(pollables: *const Pollable, len: usize, rval: *mut ReadyList); } let mut ready_list = ReadyList { base: std::ptr::null(), @@ -1794,7 +1794,7 @@ pub unsafe extern "C" fn poll_oneoff( .checked_mul(size_of::()) .trapping_unwrap(), || { - poll_list_import( + poll_import( pollables.pointer, pollables.length, &mut ready_list as *mut _, diff --git a/crates/wasi/src/preview2/host/clocks.rs b/crates/wasi/src/preview2/host/clocks.rs index 52942f365a12..0b4c7dd364a2 100644 --- a/crates/wasi/src/preview2/host/clocks.rs +++ b/crates/wasi/src/preview2/host/clocks.rs @@ -47,7 +47,9 @@ fn subscribe_to_duration( table: &mut crate::preview2::Table, duration: tokio::time::Duration, ) -> anyhow::Result> { - let sleep = if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) { + let sleep = if duration.is_zero() { + table.push(Deadline::Past)? + } else if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) { // NB: this resource created here is not actually exposed to wasm, it's // only an internal implementation detail used to match the signature // expected by `subscribe`. @@ -85,6 +87,7 @@ impl monotonic_clock::Host for T { } enum Deadline { + Past, Instant(tokio::time::Instant), Never, } @@ -93,6 +96,7 @@ enum Deadline { impl Subscribe for Deadline { async fn ready(&mut self) { match self { + Deadline::Past => {} Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await, Deadline::Never => std::future::pending().await, } diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index f9d0977453b2..bcb11652739a 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -148,8 +148,9 @@ pub mod bindings { "[method]output-stream.blocking-write-and-flush", "[method]output-stream.blocking-write-zeroes-and-flush", "[method]directory-entry-stream.read-directory-entry", - "poll-list", - "poll-one", + "poll", + "[method]pollable.block", + "[method]pollable.ready", ], }, trappable_error_type: { diff --git a/crates/wasi/src/preview2/poll.rs b/crates/wasi/src/preview2/poll.rs index ebb2820693f9..8e2b5385ffe1 100644 --- a/crates/wasi/src/preview2/poll.rs +++ b/crates/wasi/src/preview2/poll.rs @@ -16,7 +16,7 @@ pub type ClosureFuture = Box PollableFuture<'static> + Send + Sync + /// A pollable is not the same thing as a Rust Future: the same pollable may be used to /// repeatedly check for readiness of a given condition, e.g. if a stream is readable /// or writable. So, rather than containing a Future, which can only become Ready once, a -/// Pollable contains a way to create a Future in each call to `poll_list`. +/// Pollable contains a way to create a Future in each call to `poll`. pub struct Pollable { index: u32, make_future: MakeFuture, @@ -65,7 +65,7 @@ where #[async_trait::async_trait] impl poll::Host for T { - async fn poll_list(&mut self, pollables: Vec>) -> Result> { + async fn poll(&mut self, pollables: Vec>) -> Result> { type ReadylistIndex = u32; let table = self.table_mut(); @@ -116,19 +116,27 @@ impl poll::Host for T { Ok(PollList { futures }.await) } +} - async fn poll_one(&mut self, pollable: Resource) -> Result<()> { +#[async_trait::async_trait] +impl crate::preview2::bindings::io::poll::HostPollable for T { + async fn block(&mut self, pollable: Resource) -> Result<()> { let table = self.table_mut(); - let pollable = table.get(&pollable)?; let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); ready.await; Ok(()) } -} - -#[async_trait::async_trait] -impl crate::preview2::bindings::io::poll::HostPollable for T { + async fn ready(&mut self, pollable: Resource) -> Result { + let table = self.table_mut(); + let pollable = table.get(&pollable)?; + let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); + futures::pin_mut!(ready); + Ok(matches!( + futures::future::poll_immediate(ready).await, + Some(()) + )) + } fn drop(&mut self, pollable: Resource) -> Result<()> { let pollable = self.table_mut().delete(pollable)?; if let Some(delete) = pollable.remove_index_on_delete { @@ -148,16 +156,18 @@ pub mod sync { use wasmtime::component::Resource; impl poll::Host for T { - fn poll_list(&mut self, pollables: Vec>) -> Result> { - in_tokio(async { async_poll::Host::poll_list(self, pollables).await }) - } - - fn poll_one(&mut self, pollable: Resource) -> Result<()> { - in_tokio(async { async_poll::Host::poll_one(self, pollable).await }) + fn poll(&mut self, pollables: Vec>) -> Result> { + in_tokio(async { async_poll::Host::poll(self, pollables).await }) } } impl crate::preview2::bindings::sync_io::io::poll::HostPollable for T { + fn ready(&mut self, pollable: Resource) -> Result { + in_tokio(async { async_poll::HostPollable::ready(self, pollable).await }) + } + fn block(&mut self, pollable: Resource) -> Result<()> { + in_tokio(async { async_poll::HostPollable::block(self, pollable).await }) + } fn drop(&mut self, pollable: Resource) -> Result<()> { async_poll::HostPollable::drop(self, pollable) } diff --git a/crates/wasi/src/preview2/preview1.rs b/crates/wasi/src/preview2/preview1.rs index 4b81723051ec..cc976f1ddc05 100644 --- a/crates/wasi/src/preview2/preview1.rs +++ b/crates/wasi/src/preview2/preview1.rs @@ -2156,7 +2156,7 @@ impl< pollables.push(p); } let ready: HashSet<_> = self - .poll_list(pollables) + .poll(pollables) .await .context("failed to call `poll-oneoff`") .map_err(types::Error::trap)? diff --git a/crates/wasi/wit/deps/http/types.wit b/crates/wasi/wit/deps/http/types.wit index 88902bf8c9c8..9a0c8956b540 100644 --- a/crates/wasi/wit/deps/http/types.wit +++ b/crates/wasi/wit/deps/http/types.wit @@ -223,7 +223,7 @@ interface types { /// transport layer of the HTTP protocol. /// /// These timeouts are separate from any the user may use to bound a - /// blocking call to `wasi:io/poll.poll-list`. + /// blocking call to `wasi:io/poll.poll`. resource request-options { /// Construct a default `request-options` value. constructor(); diff --git a/crates/wasi/wit/deps/io/poll.wit b/crates/wasi/wit/deps/io/poll.wit index 52a4957dc496..d85028d6ca4c 100644 --- a/crates/wasi/wit/deps/io/poll.wit +++ b/crates/wasi/wit/deps/io/poll.wit @@ -3,8 +3,21 @@ package wasi:io@0.2.0-rc-2023-11-05; /// A poll API intended to let users wait for I/O events on multiple handles /// at once. interface poll { - /// A "pollable" handle. - resource pollable; + /// `pollable` epresents a single I/O event which may be ready, or not. + resource pollable { + + /// Return the readiness of a pollable. This function never blocks. + /// + /// Returns `true` when the pollable is ready, and `false` otherwise. + ready: func() -> bool; + + /// `block` returns immediately if the pollable is ready, and otherwise + /// blocks until ready. + /// + /// This function is equivalent to calling `poll.poll` on a list + /// containing only this pollable. + block: func(); + } /// Poll for completion on a set of pollables. /// @@ -24,11 +37,5 @@ interface poll { /// do any I/O so it doesn't fail. If any of the I/O sources identified by /// the pollables has an error, it is indicated by marking the source as /// being reaedy for I/O. - poll-list: func(in: list>) -> list; - - /// Poll for completion on a single pollable. - /// - /// This function is similar to `poll-list`, but operates on only a single - /// pollable. When it returns, the handle is ready for I/O. - poll-one: func(in: borrow); + poll: func(in: list>) -> list; }