Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename poll-list to poll, poll-one to pollable.block, and introduce pollable.ready #7427

Merged
merged 9 commits into from
Oct 31, 2023
2 changes: 1 addition & 1 deletion crates/test-programs/src/bin/api_proxy_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ mod executor {

let mut ready = vec![false; wakers.len()];

for index in io::poll::poll_list(&pollables) {
for index in io::poll::poll(&pollables) {
ready[usize::try_from(index).unwrap()] = true;
}

Expand Down
6 changes: 2 additions & 4 deletions crates/test-programs/src/bin/api_reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ wit_bindgen::generate!({
});

struct T;
use crate::wasi::io::poll;

static mut STATE: Vec<String> = Vec::new();

Expand All @@ -34,7 +33,7 @@ impl Guest for T {
for s in STATE.iter() {
let mut out = s.as_bytes();
while !out.is_empty() {
poll::poll_list(&[&pollable]);
pollable.block();
let n = match o.check_write() {
Ok(n) => n,
Err(_) => return Err(()),
Expand All @@ -52,8 +51,7 @@ impl Guest for T {
Ok(_) => {}
Err(_) => return Err(()),
}

poll::poll_list(&[&pollable]);
pollable.block();
match o.check_write() {
Ok(_) => {}
Err(_) => return Err(()),
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/src/bin/preview2_ip_name_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {
let addresses =
ip_name_lookup::resolve_addresses(&network, "example.com", None, false).unwrap();
let pollable = addresses.subscribe();
poll::poll_one(&pollable);
pollable.block();
assert!(addresses.resolve_next_address().is_ok());

let result = ip_name_lookup::resolve_addresses(&network, "a.b<&>", None, false);
Expand All @@ -20,7 +20,7 @@ fn main() {
let addresses = ip_name_lookup::resolve_addresses(&network, "github.com", None, false).unwrap();
let lookup = addresses.subscribe();
let timeout = monotonic_clock::subscribe_duration(1_000_000_000);
let ready = poll::poll_list(&[&lookup, &timeout]);
let ready = poll::poll(&[&lookup, &timeout]);
assert!(ready.len() > 0);
match ready[0] {
0 => loop {
Expand Down
18 changes: 12 additions & 6 deletions crates/test-programs/src/bin/preview2_sleep.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use test_programs::wasi::{clocks::monotonic_clock, io::poll};
use test_programs::wasi::clocks::monotonic_clock;

fn main() {
sleep_10ms();
Expand All @@ -9,19 +9,25 @@ fn main() {
fn sleep_10ms() {
let dur = 10_000_000;
let p = monotonic_clock::subscribe_instant(monotonic_clock::now() + dur);
poll::poll_one(&p);
p.block();
let p = monotonic_clock::subscribe_duration(dur);
poll::poll_one(&p);
p.block();
}

fn sleep_0ms() {
let p = monotonic_clock::subscribe_instant(monotonic_clock::now());
poll::poll_one(&p);
p.block();
let p = monotonic_clock::subscribe_duration(0);
poll::poll_one(&p);
assert!(
p.ready(),
"timer subscription with duration 0 is ready immediately"
);
}

fn sleep_backwards_in_time() {
let p = monotonic_clock::subscribe_instant(monotonic_clock::now() - 1);
poll::poll_one(&p);
assert!(
p.ready(),
"timer subscription for instant which has passed is ready immediately"
);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use test_programs::wasi::cli::stdin;
use test_programs::wasi::io::poll;
use test_programs::wasi::io::streams;

fn main() {
let stdin: streams::InputStream = stdin::get_stdin();
let stdin_pollable = stdin.subscribe();
let ready = poll::poll_list(&[&stdin_pollable]);
assert_eq!(ready, &[0]);
stdin_pollable.block();
assert!(stdin_pollable.ready(), "after blocking, pollable is ready");
drop(stdin_pollable);
drop(stdin);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use test_programs::wasi::cli::stdin;
use test_programs::wasi::io::poll;
use test_programs::wasi::io::streams;

fn main() {
let stdin: streams::InputStream = stdin::get_stdin();
let stdin_pollable = stdin.subscribe();
let ready = poll::poll_list(&[&stdin_pollable]);
assert_eq!(ready, &[0]);
stdin_pollable.block();
drop(stdin);
unreachable!("execution should have trapped in line above when stream dropped before pollable");
}
9 changes: 4 additions & 5 deletions crates/test-programs/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::wasi::http::{outgoing_handler, types as http_types};
use crate::wasi::io::poll;
use crate::wasi::io::streams;
use anyhow::{anyhow, Result};
use std::fmt;
Expand Down Expand Up @@ -72,7 +71,7 @@ pub fn request(

let pollable = request_body.subscribe();
while !buf.is_empty() {
poll::poll_list(&[&pollable]);
pollable.block();

let permit = match request_body.check_write() {
Ok(n) => n,
Expand All @@ -94,7 +93,7 @@ pub fn request(
_ => {}
}

poll::poll_list(&[&pollable]);
pollable.block();

match request_body.check_write() {
Ok(_) => {}
Expand All @@ -110,7 +109,7 @@ pub fn request(
Some(result) => result.map_err(|_| anyhow!("incoming response errored"))?,
None => {
let pollable = future_response.subscribe();
let _ = poll::poll_list(&[&pollable]);
pollable.block();
future_response
.get()
.expect("incoming response available")
Expand Down Expand Up @@ -140,7 +139,7 @@ pub fn request(

let mut body = Vec::new();
loop {
poll::poll_list(&[&input_stream_pollable]);
input_stream_pollable.block();

let mut body_chunk = match input_stream.read(1024 * 1024) {
Ok(c) => c,
Expand Down
18 changes: 7 additions & 11 deletions crates/test-programs/src/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ use std::ops::Range;
const TIMEOUT_NS: u64 = 1_000_000_000;

impl Pollable {
pub fn wait(&self) {
poll::poll_one(self);
}

pub fn wait_until(&self, timeout: &Pollable) -> Result<(), ErrorCode> {
let ready = poll::poll_list(&[self, timeout]);
let ready = poll::poll(&[self, timeout]);
assert!(ready.len() > 0);
match ready[0] {
0 => Ok(()),
Expand All @@ -36,7 +32,7 @@ impl OutputStream {
let pollable = self.subscribe();

while !bytes.is_empty() {
pollable.wait();
pollable.block();

let permit = self.check_write()?;

Expand Down Expand Up @@ -75,7 +71,7 @@ impl TcpSocket {

loop {
match self.finish_bind() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -88,7 +84,7 @@ impl TcpSocket {

loop {
match self.finish_listen() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -105,7 +101,7 @@ impl TcpSocket {

loop {
match self.finish_connect() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -116,7 +112,7 @@ impl TcpSocket {

loop {
match self.accept() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -139,7 +135,7 @@ impl UdpSocket {

loop {
match self.finish_bind() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/wasi-http/wit/deps/http/types.wit
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ interface types {
/// transport layer of the HTTP protocol.
///
/// These timeouts are separate from any the user may use to bound a
/// blocking call to `wasi:io/poll.poll-list`.
/// blocking call to `wasi:io/poll.poll`.
resource request-options {
/// Construct a default `request-options` value.
constructor();
Expand Down
25 changes: 16 additions & 9 deletions crates/wasi-http/wit/deps/io/poll.wit
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@ package wasi:io@0.2.0-rc-2023-11-05;
/// A poll API intended to let users wait for I/O events on multiple handles
/// at once.
interface poll {
/// A "pollable" handle.
resource pollable;
/// `pollable` epresents a single I/O event which may be ready, or not.
resource pollable {

/// Return the readiness of a pollable. This function never blocks.
///
/// Returns `true` when the pollable is ready, and `false` otherwise.
ready: func() -> bool;

/// `block` returns immediately if the pollable is ready, and otherwise
/// blocks until ready.
///
/// This function is equivalent to calling `poll.poll` on a list
/// containing only this pollable.
block: func();
}

/// Poll for completion on a set of pollables.
///
Expand All @@ -24,11 +37,5 @@ interface poll {
/// do any I/O so it doesn't fail. If any of the I/O sources identified by
/// the pollables has an error, it is indicated by marking the source as
/// being reaedy for I/O.
poll-list: func(in: list<borrow<pollable>>) -> list<u32>;

/// Poll for completion on a single pollable.
///
/// This function is similar to `poll-list`, but operates on only a single
/// pollable. When it returns, the handle is ready for I/O.
poll-one: func(in: borrow<pollable>);
poll: func(in: list<borrow<pollable>>) -> list<u32>;
}
10 changes: 5 additions & 5 deletions crates/wasi-preview1-component-adapter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod bindings {
// can't support in these special core-wasm adapters.
// Instead, we manually define the bindings for these functions in
// terms of raw pointers.
skip: ["run", "get-environment", "poll-list"],
skip: ["run", "get-environment", "poll"],
});

#[cfg(feature = "reactor")]
Expand All @@ -54,7 +54,7 @@ pub mod bindings {
// can't support in these special core-wasm adapters.
// Instead, we manually define the bindings for these functions in
// terms of raw pointers.
skip: ["get-environment", "poll-list"],
skip: ["get-environment", "poll"],
});
}

Expand Down Expand Up @@ -1780,8 +1780,8 @@ pub unsafe extern "C" fn poll_oneoff(
#[link(wasm_import_module = "wasi:io/poll@0.2.0-rc-2023-11-05")]
#[allow(improper_ctypes)] // FIXME(bytecodealliance/wit-bindgen#684)
extern "C" {
#[link_name = "poll-list"]
fn poll_list_import(pollables: *const Pollable, len: usize, rval: *mut ReadyList);
#[link_name = "poll"]
fn poll_import(pollables: *const Pollable, len: usize, rval: *mut ReadyList);
}
let mut ready_list = ReadyList {
base: std::ptr::null(),
Expand All @@ -1794,7 +1794,7 @@ pub unsafe extern "C" fn poll_oneoff(
.checked_mul(size_of::<u32>())
.trapping_unwrap(),
|| {
poll_list_import(
poll_import(
pollables.pointer,
pollables.length,
&mut ready_list as *mut _,
Expand Down
6 changes: 5 additions & 1 deletion crates/wasi/src/preview2/host/clocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ fn subscribe_to_duration(
table: &mut crate::preview2::Table,
duration: tokio::time::Duration,
) -> anyhow::Result<Resource<Pollable>> {
let sleep = if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) {
let sleep = if duration.is_zero() {
table.push(Deadline::Past)?
} else if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) {
// NB: this resource created here is not actually exposed to wasm, it's
// only an internal implementation detail used to match the signature
// expected by `subscribe`.
Expand Down Expand Up @@ -85,6 +87,7 @@ impl<T: WasiView> monotonic_clock::Host for T {
}

enum Deadline {
Past,
Instant(tokio::time::Instant),
Never,
}
Expand All @@ -93,6 +96,7 @@ enum Deadline {
impl Subscribe for Deadline {
async fn ready(&mut self) {
match self {
Deadline::Past => {}
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
Expand Down
5 changes: 3 additions & 2 deletions crates/wasi/src/preview2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ pub mod bindings {
"[method]output-stream.blocking-write-and-flush",
"[method]output-stream.blocking-write-zeroes-and-flush",
"[method]directory-entry-stream.read-directory-entry",
"poll-list",
"poll-one",
"poll",
"[method]pollable.block",
"[method]pollable.ready",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this method wouldn't be on this list since it shouldn't block, but I can also see how that complicates the implementation, so I'm fine either way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The implementation, as it stands, shouldn't ever actually be Pending, and I think we will be able to keep an eye on that invariant going forwards.

],
},
trappable_error_type: {
Expand Down
Loading