From d1d3021271d3223848d049ca622b229aed2e2f82 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Sun, 19 May 2019 22:04:17 +0300 Subject: [PATCH 01/18] added polyfill for Atomics.wait and used it to notify futures --- crates/futures/src/lib.rs | 120 ++++++++++++++++++------------ crates/futures/src/polyfill.js | 130 +++++++++++++++++++++++++++++++++ crates/futures/src/polyfill.rs | 16 ++++ crates/js-sys/src/lib.rs | 14 +++- 4 files changed, 231 insertions(+), 49 deletions(-) create mode 100644 crates/futures/src/polyfill.js create mode 100644 crates/futures/src/polyfill.rs diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index e17eaaee30d..1c0f3464883 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -110,15 +110,18 @@ pub mod futures_0_3; use std::cell::{Cell, RefCell}; use std::fmt; use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; -use js_sys::{Function, Promise}; +use js_sys::{Atomics, Function, Int32Array, Promise, SharedArrayBuffer}; use wasm_bindgen::prelude::*; +mod polyfill; + /// A Rust `Future` backed by a JavaScript `Promise`. /// /// This type is constructed with a JavaScript `Promise` object and translates @@ -252,6 +255,7 @@ fn _future_to_promise(future: Box>) resolve, reject, notified: Cell::new(State::Notified), + waker: Arc::new(Waker::new(SharedArrayBuffer::new(4), false)), })); }); @@ -270,6 +274,9 @@ fn _future_to_promise(future: Box>) // JavaScript. We'll be invoking one of these at the end. resolve: Function, reject: Function, + + // Struct to wake a future + waker: Arc, } // The possible states our `Package` (future) can be in, tracked internally @@ -300,10 +307,68 @@ fn _future_to_promise(future: Box>) Waiting(Arc), } - // No shared memory right now, wasm is single threaded, no need to worry - // about this! - unsafe impl Send for Package {} - unsafe impl Sync for Package {} + struct Waker { + buffer: SharedArrayBuffer, + notified: AtomicBool, + }; + + impl Waker { + fn new(buffer: SharedArrayBuffer, notified: bool) -> Self { + Self { + buffer, + notified: AtomicBool::new(notified), + } + } + } + + impl Notify for Waker { + fn notify(&self, id: usize) { + if !self.notified.swap(true, Ordering::SeqCst) { + let array = Int32Array::new(&self.buffer); + let _ = Atomics::notify(&array, id as u32); + } + } + } + + fn poll_again(package: Arc, id: usize) { + let me = match package.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => me, + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => return, + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => return, + }; + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + // + // Note that the `Rc`/`RefCell` trick here is basically to just + // ensure that our `Closure` gets cleaned up appropriately. + let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0); + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let closure = Closure::wrap(Box::new(move |_| { + let myself = slot2.borrow_mut().take(); + debug_assert!(myself.is_some()); + Package::poll(&me); + }) as Box); + promise.then(&closure); + *slot.borrow_mut() = Some(closure); + } impl Package { // Move the future contained in `me` as far forward as we can. This will @@ -331,13 +396,14 @@ fn _future_to_promise(future: Box>) // our `Waiting` state, and resume the polling process State::Polling => { me.notified.set(State::Waiting(me.clone())); + poll_again(me.clone(), 0); break; } State::Waiting(_) => panic!("shouldn't see waiting state!"), } - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { // If the future is ready, immediately call the // resolve/reject callback and then return as we're done. Ok(Async::Ready(value)) => (value, &me.resolve), @@ -353,48 +419,6 @@ fn _future_to_promise(future: Box>) } } } - - impl Notify for Package { - fn notify(&self, _id: usize) { - let me = match self.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => me, - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => return, - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => return, - }; - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - // - // Note that the `Rc`/`RefCell` trick here is basically to just - // ensure that our `Closure` gets cleaned up appropriately. - let promise = Promise::resolve(&JsValue::undefined()); - let slot = Rc::new(RefCell::new(None)); - let slot2 = slot.clone(); - let closure = Closure::wrap(Box::new(move |_| { - let myself = slot2.borrow_mut().take(); - debug_assert!(myself.is_some()); - Package::poll(&me); - }) as Box); - promise.then(&closure); - *slot.borrow_mut() = Some(closure); - } - } } /// Converts a Rust `Future` on a local task queue. diff --git a/crates/futures/src/polyfill.js b/crates/futures/src/polyfill.js new file mode 100644 index 00000000000..ece6f96613d --- /dev/null +++ b/crates/futures/src/polyfill.js @@ -0,0 +1,130 @@ +/* + * The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async + */ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Author: Lars T Hansen, lhansen@mozilla.com + */ + +/* Polyfill for Atomics.waitAsync() for web browsers. + * + * Any kind of agent that is able to create a new Worker can use this polyfill. + * + * Load this file in all agents that will use Atomics.waitAsync. + * + * Agents that don't call Atomics.waitAsync need do nothing special. + * + * Any kind of agent can wake another agent that is sleeping in + * Atomics.waitAsync by just calling Atomics.wake for the location being slept + * on, as normal. + * + * The implementation is not completely faithful to the proposed semantics: in + * the case where an agent first asyncWaits and then waits on the same location: + * when it is woken, the two waits will be woken in order, while in the real + * semantics, the sync wait will be woken first. + * + * In this polyfill Atomics.waitAsync is not very fast. + */ + +/* Implementation: + * + * For every wait we fork off a Worker to perform the wait. Workers are reused + * when possible. The worker communicates with its parent using postMessage. + */ + +const helperCode = ` +onmessage = function (ev) { + try { + switch (ev.data[0]) { + case 'wait': { + let [_, ia, index, value, timeout] = ev.data; + let result = Atomics.wait(ia, index, value, timeout) + postMessage(['ok', result]); + break; + } + default: { + throw new Error("Wrong message sent to wait helper: " + ev.data.join(',')); + } + } + } catch (e) { + console.log("Exception in wait helper"); + postMessage(['error', 'Exception']); + } +} +`; + +const helpers = []; + +function allocHelper() { + if (helpers.length > 0) { + return helpers.pop(); + } + return new Worker("data:application/javascript," + encodeURIComponent(helperCode)); +} + +function freeHelper(h) { + helpers.push(h); +} + +// Atomics.waitAsync always returns a promise. Throws standard errors +// for parameter validation. The promise is resolved with a string as from +// Atomics.wait, or, in the case something went completely wrong, it is +// rejected with an error string. +export function waitAsync(ia, index, value, timeout = Infinity) { + if (typeof ia != "object" + || !(ia instanceof Int32Array) + || !(ia.buffer instanceof SharedArrayBuffer) + ) { + throw new TypeError("Expected shared memory"); + } + + // Range checking for the index. + + ia[index]; + + // Optimization, avoid the helper thread in this common case. + + if (Atomics.load(ia, index) !== value) { + return Promise.resolve("not-equal"); + } + + // General case, we must wait. + + return new Promise(function (resolve, reject) { + const h = allocHelper(); + h.onmessage = function (ev) { + // Free the helper early so that it can be reused if the resolution + // needs a helper. + freeHelper(h); + switch (ev.data[0]) { + case 'ok': + resolve(ev.data[1]); + break; + case 'error': + // Note, rejection is not in the spec, it is an artifact of the polyfill. + // The helper already printed an error to the console. + reject(ev.data[1]); + break; + } + }; + + // It's possible to do better here if the ia is already known to the + // helper. In that case we can communicate the other data through + // shared memory and wake the agent. And it is possible to make ia + // known to the helper by waking it with a special value so that it + // checks its messages, and then posting the ia to the helper. Some + // caching / decay scheme is useful no doubt, to improve performance + // and avoid leaks. + // + // In the event we wake the helper directly, we can micro-wait here + // for a quick result. We'll need to restructure some code to make + // that work out properly, and some synchronization is necessary for + // the helper to know that we've picked up the result and no + // postMessage is necessary. + + h.postMessage(['wait', ia, index, value, timeout]); + }) +} diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs new file mode 100644 index 00000000000..cd77147be88 --- /dev/null +++ b/crates/futures/src/polyfill.rs @@ -0,0 +1,16 @@ +use js_sys::{Int32Array, Promise, SharedArrayBuffer}; +use wasm_bindgen::prelude::*; + +#[wasm_bindgen(module = "/src/polyfill.js")] +extern "C" { + #[wasm_bindgen(js_name = waitAsync)] + pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Promise; + + #[wasm_bindgen(js_name = waitAsync)] + pub fn wait_async_with_timeout( + indexed_array: Int32Array, + index: u32, + value: i32, + timeout: f64, + ) -> Promise; +} diff --git a/crates/js-sys/src/lib.rs b/crates/js-sys/src/lib.rs index b3e92161821..137bb51f5b6 100644 --- a/crates/js-sys/src/lib.rs +++ b/crates/js-sys/src/lib.rs @@ -495,6 +495,9 @@ extern "C" { pub fn slice_with_end(this: &SharedArrayBuffer, begin: u32, end: u32) -> SharedArrayBuffer; } +unsafe impl Send for SharedArrayBuffer {} +unsafe impl Sync for SharedArrayBuffer {} + // Array Iterator #[wasm_bindgen] extern "C" { @@ -598,10 +601,19 @@ pub mod Atomics { /// The static `Atomics.notify()` method notifies up some agents that /// are sleeping in the wait queue. /// Note: This operation works with a shared `Int32Array` only. + /// If `count` is not provided, notifies all the agents int the queue. /// /// [MDN documentation](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify) #[wasm_bindgen(js_namespace = Atomics, catch)] - pub fn notify(typed_array: &Int32Array, index: u32, count: u32) -> Result; + pub fn notify(typed_array: &Int32Array, index: u32) -> Result; + + /// Notifies up to `count` agents in the wait queue. + #[wasm_bindgen(js_namespace = Atomics, catch, js_name = notify)] + pub fn notify_with_count( + typed_array: &Int32Array, + index: u32, + count: u32, + ) -> Result; /// The static `Atomics.or()` method computes a bitwise OR with a given value /// at a given position in the array, and returns the old value at that position. From c01575c1bc4d9a1d24e80ab3426939cc4d7e4a93 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Sun, 19 May 2019 22:07:51 +0300 Subject: [PATCH 02/18] typo fixed in Atomics docs --- crates/js-sys/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/js-sys/src/lib.rs b/crates/js-sys/src/lib.rs index 137bb51f5b6..556f7dba44e 100644 --- a/crates/js-sys/src/lib.rs +++ b/crates/js-sys/src/lib.rs @@ -601,7 +601,7 @@ pub mod Atomics { /// The static `Atomics.notify()` method notifies up some agents that /// are sleeping in the wait queue. /// Note: This operation works with a shared `Int32Array` only. - /// If `count` is not provided, notifies all the agents int the queue. + /// If `count` is not provided, notifies all the agents in the queue. /// /// [MDN documentation](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify) #[wasm_bindgen(js_namespace = Atomics, catch)] From 2fdfe79574dc7023b61b6f717262738f72da29fa Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Wed, 29 May 2019 22:36:57 +0300 Subject: [PATCH 03/18] added polyfill implementation in rust --- crates/futures/Cargo.toml | 8 ++ crates/futures/src/lib.rs | 3 +- crates/futures/src/polyfill.rs | 202 ++++++++++++++++++++++++++++++--- 3 files changed, 199 insertions(+), 14 deletions(-) diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 23cd230d6b2..69b9a0f33d9 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -18,6 +18,14 @@ futures-util-preview = { version = "0.3.0-alpha.15", optional = true } futures-channel-preview = { version = "0.3.0-alpha.15", optional = true } lazy_static = { version = "1.3.0", optional = true } +[dependencies.web-sys] +path = "../web-sys" +version = "0.3.22" +features = [ + "MessageEvent", + "Worker", +] + [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = { path = '../test', version = '0.2.48' } diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 1c0f3464883..57313ec41ef 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -358,7 +358,8 @@ fn _future_to_promise(future: Box>) // // Note that the `Rc`/`RefCell` trick here is basically to just // ensure that our `Closure` gets cleaned up appropriately. - let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0); + let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0) + .expect("Should create a Promise"); let slot = Rc::new(RefCell::new(None)); let slot2 = slot.clone(); let closure = Closure::wrap(Box::new(move |_| { diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index cd77147be88..2899fe33f84 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -1,16 +1,192 @@ -use js_sys::{Int32Array, Promise, SharedArrayBuffer}; +/* + * The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async + * and ported to Rust + */ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Author: Lars T Hansen, lhansen@mozilla.com + */ + +/* Polyfill for Atomics.waitAsync() for web browsers. + * + * Any kind of agent that is able to create a new Worker can use this polyfill. + * + * Load this file in all agents that will use Atomics.waitAsync. + * + * Agents that don't call Atomics.waitAsync need do nothing special. + * + * Any kind of agent can wake another agent that is sleeping in + * Atomics.waitAsync by just calling Atomics.wake for the location being slept + * on, as normal. + * + * The implementation is not completely faithful to the proposed semantics: in + * the case where an agent first asyncWaits and then waits on the same location: + * when it is woken, the two waits will be woken in order, while in the real + * semantics, the sync wait will be woken first. + * + * In this polyfill Atomics.waitAsync is not very fast. + */ + +/* Implementation: + * + * For every wait we fork off a Worker to perform the wait. Workers are reused + * when possible. The worker communicates with its parent using postMessage. + */ + +use std::cell::RefCell; +use std::rc::Rc; + +use js_sys::{ + encode_uri_component, Array, Atomics, Error, Function, Int32Array, JsString, Promise, Reflect, + SharedArrayBuffer, +}; use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; +use web_sys::{MessageEvent, Worker}; + +const HELPER_CODE: &'static str = " +onmessage = function (ev) { + try { + switch (ev.data[0]) { + case 'wait': { + let [_, ia, index, value, timeout] = ev.data; + let result = Atomics.wait(ia, index, value, timeout); + postMessage(['ok', result]); + break; + } + default: { + throw new Error('Wrong message sent to wait helper: ' + ev.data.join(',')); + } + } + } catch (e) { + console.log('Exception in wait helper'); + postMessage(['error', 'Exception']); + } +} +"; + +thread_local! { + static HELPERS: RefCell>>> = RefCell::new(vec![]); +} + +fn alloc_helper() -> Rc> { + HELPERS.with(|helpers| { + if let Some(helper) = helpers.borrow_mut().pop() { + return helper; + } + + let mut initialization_string = "data:application/javascript,".to_owned(); + let encoded: String = encode_uri_component(HELPER_CODE).into(); + initialization_string.push_str(&encoded); + + return Rc::new(RefCell::new( + Worker::new(&initialization_string).expect("Should create a Worker"), + )); + }) +} + +fn free_helper(helper: &Rc>) { + HELPERS.with(move |helpers| { + helpers.borrow_mut().push(helper.clone()); + }); +} + +pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Result { + let timeout = 0.0; + wait_async_with_timeout(indexed_array, index, value, timeout) +} + +fn get_array_item(array: &JsValue, index: u32) -> JsValue { + Reflect::get(array, &JsValue::from(index)) + .expect(&format!("Array should contain the index {}", index)) +} + +// Atomics.waitAsync always returns a promise. Throws standard errors +// for parameter validation. The promise is resolved with a string as from +// Atomics.wait, or, in the case something went completely wrong, it is +// rejected with an error string. +pub fn wait_async_with_timeout( + indexed_array: Int32Array, + index: u32, + value: i32, + timeout: f64, +) -> Result { + if !indexed_array.buffer().has_type::() { + return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into()); + } + + // Optimization, avoid the helper thread in this common case. + if Atomics::load(&indexed_array, index)? != value { + return Ok(Promise::resolve(&JsString::from("not-equal"))); + } + + // General case, we must wait. + + Ok(Promise::new( + &mut Box::new(move |resolve: Function, reject: Function| { + let helper = alloc_helper(); + let helper_ref = helper.clone(); + + let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| { + // Free the helper early so that it can be reused if the resolution + // needs a helper. + free_helper(&helper_ref); + match String::from( + get_array_item(&e.data(), 0) + .as_string() + .expect("data[0] should return a String"), + ) + .as_str() + { + "ok" => { + resolve + .call1(&JsValue::NULL, &get_array_item(&e.data(), 1)) + .expect("Should successfully call a resolve callback"); + } + "error" => { + // Note, rejection is not in the spec, it is an artifact of the polyfill. + // The helper already printed an error to the console. + reject + .call1(&JsValue::NULL, &get_array_item(&e.data(), 1)) + .expect("Should successfully call a reject callback"); + } + // it's not specified in the proposal yet + _ => (), + } + }) as Box); + helper + .borrow() + .set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + + // It's possible to do better here if the ia is already known to the + // helper. In that case we can communicate the other data through + // shared memory and wake the agent. And it is possible to make ia + // known to the helper by waking it with a special value so that it + // checks its messages, and then posting the ia to the helper. Some + // caching / decay scheme is useful no doubt, to improve performance + // and avoid leaks. + // + // In the event we wake the helper directly, we can micro-wait here + // for a quick result. We'll need to restructure some code to make + // that work out properly, and some synchronization is necessary for + // the helper to know that we've picked up the result and no + // postMessage is necessary. + + let data = Array::of5( + &JsString::from("wait"), + &indexed_array, + &JsValue::from(index), + &JsValue::from(value), + &JsValue::from(timeout), + ); -#[wasm_bindgen(module = "/src/polyfill.js")] -extern "C" { - #[wasm_bindgen(js_name = waitAsync)] - pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Promise; - - #[wasm_bindgen(js_name = waitAsync)] - pub fn wait_async_with_timeout( - indexed_array: Int32Array, - index: u32, - value: i32, - timeout: f64, - ) -> Promise; + helper + .borrow() + .post_message(&data) + .expect("Should successfully post data to a Worker"); + }) as &mut dyn FnMut(Function, Function), + )) } From e466e1a6f15d36e39593a0605041998668bc9047 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Mon, 10 Jun 2019 21:10:06 +0300 Subject: [PATCH 04/18] moved threadsafe futures behind a flag --- crates/futures/src/lib.rs | 152 +++++++++++++++++++++++++++++---- crates/futures/src/polyfill.rs | 17 ++++ crates/futures/tests/tests.rs | 2 + 3 files changed, 156 insertions(+), 15 deletions(-) diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 57313ec41ef..2e412a97c77 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -110,18 +110,36 @@ pub mod futures_0_3; use std::cell::{Cell, RefCell}; use std::fmt; use std::rc::Rc; +#[cfg(target_feature = "atomics")] use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +#[cfg(target_feature = "atomics")] +use std::sync::Mutex; use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; -use js_sys::{Atomics, Function, Int32Array, Promise, SharedArrayBuffer}; +use js_sys::{Function, Promise}; +#[cfg(target_feature = "atomics")] +use js_sys::{Atomics, Int32Array, SharedArrayBuffer, WebAssembly}; use wasm_bindgen::prelude::*; +#[cfg(target_feature = "atomics")] +use wasm_bindgen::JsCast; +#[cfg(target_feature = "atomics")] mod polyfill; +macro_rules! console_log { + ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console)] + fn log(s: &str); +} + /// A Rust `Future` backed by a JavaScript `Promise`. /// /// This type is constructed with a JavaScript `Promise` object and translates @@ -255,7 +273,8 @@ fn _future_to_promise(future: Box>) resolve, reject, notified: Cell::new(State::Notified), - waker: Arc::new(Waker::new(SharedArrayBuffer::new(4), false)), + #[cfg(target_feature = "atomics")] + waker: Arc::new(Waker::new(vec![0; 4], false)), })); }); @@ -275,6 +294,7 @@ fn _future_to_promise(future: Box>) resolve: Function, reject: Function, + #[cfg(target_feature = "atomics")] // Struct to wake a future waker: Arc, } @@ -307,38 +327,59 @@ fn _future_to_promise(future: Box>) Waiting(Arc), } + #[cfg(target_feature = "atomics")] struct Waker { - buffer: SharedArrayBuffer, + array: Vec, notified: AtomicBool, }; + #[cfg(target_feature = "atomics")] impl Waker { - fn new(buffer: SharedArrayBuffer, notified: bool) -> Self { - Self { - buffer, + fn new(array: Vec, notified: bool) -> Self { + Waker { + array, notified: AtomicBool::new(notified), } } } + #[cfg(target_feature = "atomics")] impl Notify for Waker { fn notify(&self, id: usize) { + console_log!("Waker notify"); if !self.notified.swap(true, Ordering::SeqCst) { - let array = Int32Array::new(&self.buffer); + console_log!("Waker, inside if"); + let memory_buffer = wasm_bindgen::memory() + .dyn_into::() + .expect("Should cast a memory to WebAssembly::Memory") + .buffer(); + + let array_location = self.array.as_ptr() as u32 / 4; + let array = Int32Array::new(&memory_buffer) + .subarray(array_location, array_location + self.array.len() as u32); + let _ = Atomics::notify(&array, id as u32); } } } + #[cfg(target_feature = "atomics")] fn poll_again(package: Arc, id: usize) { + console_log!("poll_again called"); let me = match package.notified.replace(State::Notified) { // we need to schedule polling to resume, so keep going - State::Waiting(me) => me, + State::Waiting(me) => { + console_log!("poll_again Waiting"); + me + }, // we were already notified, and were just notified again; // having now coalesced the notifications we return as it's // still someone else's job to process this - State::Notified => return, + State::Notified => { + console_log!("poll_again Notified"); + return; + }, // the future was previously being polled, and we've just // switched it to the "you're notified" state. We don't have @@ -347,9 +388,21 @@ fn _future_to_promise(future: Box>) // continue polling. For us, though, there's nothing else to do, // so we bail out. // later see - State::Polling => return, + State::Polling => { + console_log!("poll_again Polling"); + return; + }, }; + let memory_buffer = wasm_bindgen::memory() + .dyn_into::() + .expect("Should cast a memory to WebAssembly::Memory") + .buffer(); + + let array_location = package.waker.array.as_ptr() as u32 / 4; + let array = Int32Array::new(&memory_buffer) + .subarray(array_location, array_location + package.waker.array.len() as u32); + // Use `Promise.then` on a resolved promise to place our execution // onto the next turn of the microtask queue, enqueueing our poll // operation. We don't currently poll immediately as it turns out @@ -358,7 +411,7 @@ fn _future_to_promise(future: Box>) // // Note that the `Rc`/`RefCell` trick here is basically to just // ensure that our `Closure` gets cleaned up appropriately. - let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0) + let promise = polyfill::wait_async(array, id as u32, 0) .expect("Should create a Promise"); let slot = Rc::new(RefCell::new(None)); let slot2 = slot.clone(); @@ -366,11 +419,18 @@ fn _future_to_promise(future: Box>) let myself = slot2.borrow_mut().take(); debug_assert!(myself.is_some()); Package::poll(&me); - }) as Box); + }) as Box); promise.then(&closure); *slot.borrow_mut() = Some(closure); } + // No shared memory right now, wasm is single threaded, no need to worry + // about this! + #[cfg(not(target_feature = "atomics"))] + unsafe impl Send for Package {} + #[cfg(not(target_feature = "atomics"))] + unsafe impl Sync for Package {} + impl Package { // Move the future contained in `me` as far forward as we can. This will // do as much synchronous work as possible to complete the future, @@ -386,7 +446,9 @@ fn _future_to_promise(future: Box>) match me.notified.replace(State::Polling) { // We received a notification while previously polling, or // this is the initial poll. We've got work to do below! - State::Notified => {} + State::Notified => { + console_log!("Package::poll Notified"); + } // We've gone through this loop once and no notification was // received while we were executing work. That means we got @@ -396,15 +458,31 @@ fn _future_to_promise(future: Box>) // When the notification comes in it'll notify our task, see // our `Waiting` state, and resume the polling process State::Polling => { + console_log!("Package::poll Polling"); + me.notified.set(State::Waiting(me.clone())); + + #[cfg(target_feature = "atomics")] poll_again(me.clone(), 0); + break; } - State::Waiting(_) => panic!("shouldn't see waiting state!"), + State::Waiting(_) => { + console_log!("Package::poll Waiting"); + + panic!("shouldn't see waiting state!") + }, } - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { + + #[cfg(target_feature = "atomics")] + let waker = &me.waker; + + #[cfg(not(target_feature = "atomics"))] + let waker = me; + + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(waker, 0) { // If the future is ready, immediately call the // resolve/reject callback and then return as we're done. Ok(Async::Ready(value)) => (value, &me.resolve), @@ -420,6 +498,50 @@ fn _future_to_promise(future: Box>) } } } + + #[cfg(not(target_feature = "atomics"))] + impl Notify for Package { + fn notify(&self, _id: usize) { + console_log!("Package::notify Waiting"); + let me = match self.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => me, + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => return, + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => return, + }; + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + // + // Note that the `Rc`/`RefCell` trick here is basically to just + // ensure that our `Closure` gets cleaned up appropriately. + let promise = Promise::resolve(&JsValue::undefined()); + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let closure = Closure::wrap(Box::new(move |_| { + let myself = slot2.borrow_mut().take(); + debug_assert!(myself.is_some()); + Package::poll(&me); + }) as Box); + promise.then(&closure); + *slot.borrow_mut() = Some(closure); + } + } } /// Converts a Rust `Future` on a local task queue. diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index 2899fe33f84..3fe46c7ff5f 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -47,12 +47,23 @@ use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{MessageEvent, Worker}; +macro_rules! console_log { + ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console)] + fn log(s: &str); +} + const HELPER_CODE: &'static str = " onmessage = function (ev) { try { switch (ev.data[0]) { case 'wait': { let [_, ia, index, value, timeout] = ev.data; + console.log('wait event inside a worker'); let result = Atomics.wait(ia, index, value, timeout); postMessage(['ok', result]); break; @@ -115,16 +126,20 @@ pub fn wait_async_with_timeout( timeout: f64, ) -> Result { if !indexed_array.buffer().has_type::() { + console_log!("polyfill, not a SharedArrayBuffer"); return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into()); } // Optimization, avoid the helper thread in this common case. if Atomics::load(&indexed_array, index)? != value { + console_log!("polyfill, not-equal"); return Ok(Promise::resolve(&JsString::from("not-equal"))); } // General case, we must wait. + console_log!("polyfill, general case"); + Ok(Promise::new( &mut Box::new(move |resolve: Function, reject: Function| { let helper = alloc_helper(); @@ -161,6 +176,8 @@ pub fn wait_async_with_timeout( .borrow() .set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + onmessage_callback.forget(); + // It's possible to do better here if the ia is already known to the // helper. In that case we can communicate the other data through // shared memory and wake the agent. And it is possible to make ia diff --git a/crates/futures/tests/tests.rs b/crates/futures/tests/tests.rs index 07a3a04a557..3fa5cda4df4 100755 --- a/crates/futures/tests/tests.rs +++ b/crates/futures/tests/tests.rs @@ -6,6 +6,8 @@ extern crate wasm_bindgen; extern crate wasm_bindgen_futures; extern crate wasm_bindgen_test; +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + use futures::unsync::oneshot; use futures::Future; use wasm_bindgen::prelude::*; From 16c6bdc966073c2296aa1c4acaa46397c3e28509 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Mon, 17 Jun 2019 20:25:25 +0300 Subject: [PATCH 05/18] moved threadsafe futures implementation to a separate file, made updates after review --- crates/futures/src/atomics.rs | 351 ++++++++++++++++++++++++++ crates/futures/src/lib.rs | 160 +----------- crates/futures/src/polyfill.js | 130 ---------- crates/futures/src/polyfill.rs | 28 +- crates/js-sys/src/lib.rs | 3 - examples/raytrace-parallel/Cargo.toml | 2 +- examples/raytrace-parallel/src/lib.rs | 2 +- 7 files changed, 378 insertions(+), 298 deletions(-) create mode 100644 crates/futures/src/atomics.rs delete mode 100644 crates/futures/src/polyfill.js diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs new file mode 100644 index 00000000000..a9644b2571f --- /dev/null +++ b/crates/futures/src/atomics.rs @@ -0,0 +1,351 @@ +use std::cell::{Cell, RefCell}; +use std::fmt; +use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; +use std::sync::Arc; + +use futures::executor::{self, Notify, Spawn}; +use futures::future; +use futures::prelude::*; +use futures::sync::oneshot; +use js_sys::{Atomics, Int32Array, WebAssembly, Function, Promise}; +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; + +macro_rules! console_log { + ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console)] + fn log(s: &str); +} + +/// A Rust `Future` backed by a JavaScript `Promise`. +/// +/// This type is constructed with a JavaScript `Promise` object and translates +/// it to a Rust `Future`. This type implements the `Future` trait from the +/// `futures` crate and will either succeed or fail depending on what happens +/// with the JavaScript `Promise`. +/// +/// Currently this type is constructed with `JsFuture::from`. +pub struct JsFuture { + resolved: oneshot::Receiver, + rejected: oneshot::Receiver, + callbacks: Option<(Closure, Closure)>, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. These two callbacks + // will be connected to oneshot channels which feed back into our + // future. + // + // This may not be the speediest option today but it should work! + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let mut tx1 = Some(tx1); + let resolve = Closure::wrap(Box::new(move |val| { + drop(tx1.take().unwrap().send(val)); + }) as Box); + let mut tx2 = Some(tx2); + let reject = Closure::wrap(Box::new(move |val| { + drop(tx2.take().unwrap().send(val)); + }) as Box); + + js.then2(&resolve, &reject); + + JsFuture { + resolved: rx1, + rejected: rx2, + callbacks: Some((resolve, reject)), + } + } +} + +impl Future for JsFuture { + type Item = JsValue; + type Error = JsValue; + + fn poll(&mut self) -> Poll { + // Test if either our resolved or rejected side is finished yet. Note + // that they will return errors if they're disconnected which can't + // happen until we drop the `callbacks` field, which doesn't happen + // till we're done, so we dont need to handle that. + if let Ok(Async::Ready(val)) = self.resolved.poll() { + drop(self.callbacks.take()); + return Ok(val.into()); + } + if let Ok(Async::Ready(val)) = self.rejected.poll() { + drop(self.callbacks.take()); + return Err(val); + } + Ok(Async::NotReady) + } +} + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed back to JavaScript +/// to get plumbed into the rest of a system. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. The +/// returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> Promise + where + F: Future + 'static, +{ + _future_to_promise(Box::new(future)) +} + +// Implementation of actually transforming a future into a JavaScript `Promise`. +// +// The only primitive we have to work with here is `Promise::new`, which gives +// us two callbacks that we can use to either reject or resolve the promise. +// It's our job to ensure that one of those callbacks is called at the +// appropriate time. +// +// Now we know that JavaScript (in general) can't block and is largely +// notification/callback driven. That means that our future must either have +// synchronous computational work to do, or it's "scheduled a notification" to +// happen. These notifications are likely callbacks to get executed when things +// finish (like a different promise or something like `setTimeout`). The general +// idea here is thus to do as much synchronous work as we can and then otherwise +// translate notifications of a future's task into "let's poll the future!" +// +// This isn't necessarily the greatest future executor in the world, but it +// should get the job done for now hopefully. +fn _future_to_promise(future: Box>) -> Promise { + let mut future = Some(executor::spawn(future)); + return Promise::new(&mut |resolve, reject| { + Package::poll(&Arc::new(Package { + spawn: RefCell::new(future.take().unwrap()), + resolve, + reject, + notified: Cell::new(State::Notified), + waker: Arc::new(Waker::default()), + })); + }); + + struct Package { + // Our "spawned future". This'll have everything we need to poll the + // future and continue to move it forward. + spawn: RefCell>>>, + + // The current state of this future, expressed in an enum below. This + // indicates whether we're currently polling the future, received a + // notification and need to keep polling, or if we're waiting for a + // notification to come in (and no one is polling). + notified: Cell, + + // Our two callbacks connected to the `Promise` that we returned to + // JavaScript. We'll be invoking one of these at the end. + resolve: Function, + reject: Function, + + // Struct to wake a future + waker: Arc, + } + + // The possible states our `Package` (future) can be in, tracked internally + // and used to guide what happens when polling a future. + enum State { + // This future is currently and actively being polled. Attempting to + // access the future will result in a runtime panic and is considered a + // bug. + Polling, + + // This future has been notified, while it was being polled. This marker + // is used in the `Notify` implementation below, and indicates that a + // notification was received that the future is ready to make progress. + // If seen, however, it probably means that the future is also currently + // being polled. + Notified, + + // The future is blocked, waiting for something to happen. Stored here + // is a self-reference to the future itself so we can pull it out in + // `Notify` and continue polling. + // + // Note that the self-reference here is an Arc-cycle that will leak + // memory unless the future completes, but currently that should be ok + // as we'll have to stick around anyway while the future is executing! + // + // This state is removed as soon as a notification comes in, so the leak + // should only be "temporary" + Waiting(Arc), + } + + struct Waker { + value: AtomicI32, + notified: AtomicBool, + }; + + impl Default for Waker { + fn default() -> Self { + Waker { + value: AtomicI32::new(0), + notified: AtomicBool::new(false), + } + } + } + + impl Notify for Waker { + fn notify(&self, id: usize) { + console_log!("Waker notify"); + if !self.notified.swap(true, Ordering::SeqCst) { + console_log!("Waker, inside if"); + let _ = unsafe { core::arch::wasm32::atomic_notify(&self.value as *const AtomicI32 as *mut i32, 0) }; + } + } + } + + fn poll_again(package: Arc) { + console_log!("poll_again called"); + let me = match package.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => { + console_log!("poll_again Waiting"); + me + } + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => { + console_log!("poll_again Notified"); + return; + } + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => { + console_log!("poll_again Polling"); + return; + } + }; + + let memory_buffer = wasm_bindgen::memory() + .dyn_into::() + .expect("Should cast a memory to WebAssembly::Memory") + .buffer(); + + let value_location = &package.waker.value as *const AtomicI32 as u32 / 4; + let array = Int32Array::new(&memory_buffer); + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + let promise = crate::polyfill::wait_async(array, value_location, 0).expect("Should create a Promise"); + let closure = Closure::once(Box::new(move |_| { + Package::poll(&me); + }) as Box); + promise.then(&closure); + closure.forget(); + } + + impl Package { + // Move the future contained in `me` as far forward as we can. This will + // do as much synchronous work as possible to complete the future, + // ensuring that when it blocks we're scheduled to get notified via some + // callback somewhere at some point (vague, right?) + // + // TODO: this probably shouldn't do as much synchronous work as possible + // as it can starve other computations. Rather it should instead + // yield every so often with something like `setTimeout` with the + // timeout set to zero. + fn poll(me: &Arc) { + loop { + match me.notified.replace(State::Polling) { + // We received a notification while previously polling, or + // this is the initial poll. We've got work to do below! + State::Notified => { + console_log!("Package::poll Notified"); + } + + // We've gone through this loop once and no notification was + // received while we were executing work. That means we got + // `NotReady` below and we're scheduled to receive a + // notification. Block ourselves and wait for later. + // + // When the notification comes in it'll notify our task, see + // our `Waiting` state, and resume the polling process + State::Polling => { + console_log!("Package::poll Polling"); + + me.notified.set(State::Waiting(me.clone())); + + poll_again(me.clone()); + + break; + } + + State::Waiting(_) => { + console_log!("Package::poll Waiting"); + + panic!("shouldn't see waiting state!") + } + } + + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { + // If the future is ready, immediately call the + // resolve/reject callback and then return as we're done. + Ok(Async::Ready(value)) => (value, &me.resolve), + Err(value) => (value, &me.reject), + + // Otherwise keep going in our loop, if we weren't notified + // we'll break out and start waiting. + Ok(Async::NotReady) => continue, + }; + + drop(f.call1(&JsValue::undefined(), &val)); + break; + } + } + } +} + +/// Converts a Rust `Future` on a local task queue. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +pub fn spawn_local(future: F) + where + F: Future + 'static, +{ + future_to_promise( + future + .map(|()| JsValue::undefined()) + .or_else(|()| future::ok::(JsValue::undefined())), + ); +} diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 2e412a97c77..becc6246e0a 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -101,44 +101,33 @@ //! } //! ``` +#![feature(stdsimd)] + #![deny(missing_docs)] #[cfg(feature = "futures_0_3")] /// Contains a Futures 0.3 implementation of this crate. pub mod futures_0_3; +#[cfg(target_feature = "atomics")] +/// Contains a thread-safe version of this crate, with Futures 0.1 +pub mod atomics; + +#[cfg(target_feature = "atomics")] +/// Polyfill for `Atomics.waitAsync` function +mod polyfill; + use std::cell::{Cell, RefCell}; use std::fmt; use std::rc::Rc; -#[cfg(target_feature = "atomics")] -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -#[cfg(target_feature = "atomics")] -use std::sync::Mutex; use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; use js_sys::{Function, Promise}; -#[cfg(target_feature = "atomics")] -use js_sys::{Atomics, Int32Array, SharedArrayBuffer, WebAssembly}; use wasm_bindgen::prelude::*; -#[cfg(target_feature = "atomics")] -use wasm_bindgen::JsCast; - -#[cfg(target_feature = "atomics")] -mod polyfill; - -macro_rules! console_log { - ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) -} - -#[wasm_bindgen] -extern "C" { - #[wasm_bindgen(js_namespace = console)] - fn log(s: &str); -} /// A Rust `Future` backed by a JavaScript `Promise`. /// @@ -273,8 +262,6 @@ fn _future_to_promise(future: Box>) resolve, reject, notified: Cell::new(State::Notified), - #[cfg(target_feature = "atomics")] - waker: Arc::new(Waker::new(vec![0; 4], false)), })); }); @@ -293,10 +280,6 @@ fn _future_to_promise(future: Box>) // JavaScript. We'll be invoking one of these at the end. resolve: Function, reject: Function, - - #[cfg(target_feature = "atomics")] - // Struct to wake a future - waker: Arc, } // The possible states our `Package` (future) can be in, tracked internally @@ -327,108 +310,9 @@ fn _future_to_promise(future: Box>) Waiting(Arc), } - #[cfg(target_feature = "atomics")] - struct Waker { - array: Vec, - notified: AtomicBool, - }; - - #[cfg(target_feature = "atomics")] - impl Waker { - fn new(array: Vec, notified: bool) -> Self { - Waker { - array, - notified: AtomicBool::new(notified), - } - } - } - - #[cfg(target_feature = "atomics")] - impl Notify for Waker { - fn notify(&self, id: usize) { - console_log!("Waker notify"); - if !self.notified.swap(true, Ordering::SeqCst) { - console_log!("Waker, inside if"); - let memory_buffer = wasm_bindgen::memory() - .dyn_into::() - .expect("Should cast a memory to WebAssembly::Memory") - .buffer(); - - let array_location = self.array.as_ptr() as u32 / 4; - let array = Int32Array::new(&memory_buffer) - .subarray(array_location, array_location + self.array.len() as u32); - - let _ = Atomics::notify(&array, id as u32); - } - } - } - - #[cfg(target_feature = "atomics")] - fn poll_again(package: Arc, id: usize) { - console_log!("poll_again called"); - let me = match package.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => { - console_log!("poll_again Waiting"); - me - }, - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => { - console_log!("poll_again Notified"); - return; - }, - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => { - console_log!("poll_again Polling"); - return; - }, - }; - - let memory_buffer = wasm_bindgen::memory() - .dyn_into::() - .expect("Should cast a memory to WebAssembly::Memory") - .buffer(); - - let array_location = package.waker.array.as_ptr() as u32 / 4; - let array = Int32Array::new(&memory_buffer) - .subarray(array_location, array_location + package.waker.array.len() as u32); - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - // - // Note that the `Rc`/`RefCell` trick here is basically to just - // ensure that our `Closure` gets cleaned up appropriately. - let promise = polyfill::wait_async(array, id as u32, 0) - .expect("Should create a Promise"); - let slot = Rc::new(RefCell::new(None)); - let slot2 = slot.clone(); - let closure = Closure::wrap(Box::new(move |_| { - let myself = slot2.borrow_mut().take(); - debug_assert!(myself.is_some()); - Package::poll(&me); - }) as Box); - promise.then(&closure); - *slot.borrow_mut() = Some(closure); - } - // No shared memory right now, wasm is single threaded, no need to worry // about this! - #[cfg(not(target_feature = "atomics"))] unsafe impl Send for Package {} - #[cfg(not(target_feature = "atomics"))] unsafe impl Sync for Package {} impl Package { @@ -446,9 +330,7 @@ fn _future_to_promise(future: Box>) match me.notified.replace(State::Polling) { // We received a notification while previously polling, or // this is the initial poll. We've got work to do below! - State::Notified => { - console_log!("Package::poll Notified"); - } + State::Notified => {} // We've gone through this loop once and no notification was // received while we were executing work. That means we got @@ -458,31 +340,17 @@ fn _future_to_promise(future: Box>) // When the notification comes in it'll notify our task, see // our `Waiting` state, and resume the polling process State::Polling => { - console_log!("Package::poll Polling"); - me.notified.set(State::Waiting(me.clone())); - #[cfg(target_feature = "atomics")] - poll_again(me.clone(), 0); - break; } State::Waiting(_) => { - console_log!("Package::poll Waiting"); - panic!("shouldn't see waiting state!") - }, + } } - - #[cfg(target_feature = "atomics")] - let waker = &me.waker; - - #[cfg(not(target_feature = "atomics"))] - let waker = me; - - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(waker, 0) { + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { // If the future is ready, immediately call the // resolve/reject callback and then return as we're done. Ok(Async::Ready(value)) => (value, &me.resolve), @@ -499,10 +367,8 @@ fn _future_to_promise(future: Box>) } } - #[cfg(not(target_feature = "atomics"))] impl Notify for Package { fn notify(&self, _id: usize) { - console_log!("Package::notify Waiting"); let me = match self.notified.replace(State::Notified) { // we need to schedule polling to resume, so keep going State::Waiting(me) => me, diff --git a/crates/futures/src/polyfill.js b/crates/futures/src/polyfill.js deleted file mode 100644 index ece6f96613d..00000000000 --- a/crates/futures/src/polyfill.js +++ /dev/null @@ -1,130 +0,0 @@ -/* - * The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async - */ - -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - * - * Author: Lars T Hansen, lhansen@mozilla.com - */ - -/* Polyfill for Atomics.waitAsync() for web browsers. - * - * Any kind of agent that is able to create a new Worker can use this polyfill. - * - * Load this file in all agents that will use Atomics.waitAsync. - * - * Agents that don't call Atomics.waitAsync need do nothing special. - * - * Any kind of agent can wake another agent that is sleeping in - * Atomics.waitAsync by just calling Atomics.wake for the location being slept - * on, as normal. - * - * The implementation is not completely faithful to the proposed semantics: in - * the case where an agent first asyncWaits and then waits on the same location: - * when it is woken, the two waits will be woken in order, while in the real - * semantics, the sync wait will be woken first. - * - * In this polyfill Atomics.waitAsync is not very fast. - */ - -/* Implementation: - * - * For every wait we fork off a Worker to perform the wait. Workers are reused - * when possible. The worker communicates with its parent using postMessage. - */ - -const helperCode = ` -onmessage = function (ev) { - try { - switch (ev.data[0]) { - case 'wait': { - let [_, ia, index, value, timeout] = ev.data; - let result = Atomics.wait(ia, index, value, timeout) - postMessage(['ok', result]); - break; - } - default: { - throw new Error("Wrong message sent to wait helper: " + ev.data.join(',')); - } - } - } catch (e) { - console.log("Exception in wait helper"); - postMessage(['error', 'Exception']); - } -} -`; - -const helpers = []; - -function allocHelper() { - if (helpers.length > 0) { - return helpers.pop(); - } - return new Worker("data:application/javascript," + encodeURIComponent(helperCode)); -} - -function freeHelper(h) { - helpers.push(h); -} - -// Atomics.waitAsync always returns a promise. Throws standard errors -// for parameter validation. The promise is resolved with a string as from -// Atomics.wait, or, in the case something went completely wrong, it is -// rejected with an error string. -export function waitAsync(ia, index, value, timeout = Infinity) { - if (typeof ia != "object" - || !(ia instanceof Int32Array) - || !(ia.buffer instanceof SharedArrayBuffer) - ) { - throw new TypeError("Expected shared memory"); - } - - // Range checking for the index. - - ia[index]; - - // Optimization, avoid the helper thread in this common case. - - if (Atomics.load(ia, index) !== value) { - return Promise.resolve("not-equal"); - } - - // General case, we must wait. - - return new Promise(function (resolve, reject) { - const h = allocHelper(); - h.onmessage = function (ev) { - // Free the helper early so that it can be reused if the resolution - // needs a helper. - freeHelper(h); - switch (ev.data[0]) { - case 'ok': - resolve(ev.data[1]); - break; - case 'error': - // Note, rejection is not in the spec, it is an artifact of the polyfill. - // The helper already printed an error to the console. - reject(ev.data[1]); - break; - } - }; - - // It's possible to do better here if the ia is already known to the - // helper. In that case we can communicate the other data through - // shared memory and wake the agent. And it is possible to make ia - // known to the helper by waking it with a special value so that it - // checks its messages, and then posting the ia to the helper. Some - // caching / decay scheme is useful no doubt, to improve performance - // and avoid leaks. - // - // In the event we wake the helper directly, we can micro-wait here - // for a quick result. We'll need to restructure some code to make - // that work out properly, and some synchronization is necessary for - // the helper to know that we've picked up the result and no - // postMessage is necessary. - - h.postMessage(['wait', ia, index, value, timeout]); - }) -} diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index 3fe46c7ff5f..ded6f50fbf1 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -80,10 +80,10 @@ onmessage = function (ev) { "; thread_local! { - static HELPERS: RefCell>>> = RefCell::new(vec![]); + static HELPERS: RefCell>> = RefCell::new(vec![]); } -fn alloc_helper() -> Rc> { +fn alloc_helper() -> Rc { HELPERS.with(|helpers| { if let Some(helper) = helpers.borrow_mut().pop() { return helper; @@ -93,20 +93,18 @@ fn alloc_helper() -> Rc> { let encoded: String = encode_uri_component(HELPER_CODE).into(); initialization_string.push_str(&encoded); - return Rc::new(RefCell::new( - Worker::new(&initialization_string).expect("Should create a Worker"), - )); + return Rc::new(Worker::new(&initialization_string).expect("Should create a Worker")); }) } -fn free_helper(helper: &Rc>) { +fn free_helper(helper: &Rc) { HELPERS.with(move |helpers| { helpers.borrow_mut().push(helper.clone()); }); } pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Result { - let timeout = 0.0; + let timeout = 0.1; wait_async_with_timeout(indexed_array, index, value, timeout) } @@ -141,11 +139,11 @@ pub fn wait_async_with_timeout( console_log!("polyfill, general case"); Ok(Promise::new( - &mut Box::new(move |resolve: Function, reject: Function| { + &mut move |resolve: Function, reject: Function| { let helper = alloc_helper(); let helper_ref = helper.clone(); - let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| { + let onmessage_callback = Closure::once_into_js(Box::new(move |e: MessageEvent| { // Free the helper early so that it can be reused if the resolution // needs a helper. free_helper(&helper_ref); @@ -171,12 +169,11 @@ pub fn wait_async_with_timeout( // it's not specified in the proposal yet _ => (), } - }) as Box); - helper - .borrow() - .set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + }) + as Box); + helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); - onmessage_callback.forget(); + // onmessage_callback.forget(); // It's possible to do better here if the ia is already known to the // helper. In that case we can communicate the other data through @@ -201,9 +198,8 @@ pub fn wait_async_with_timeout( ); helper - .borrow() .post_message(&data) .expect("Should successfully post data to a Worker"); - }) as &mut dyn FnMut(Function, Function), + }, )) } diff --git a/crates/js-sys/src/lib.rs b/crates/js-sys/src/lib.rs index 556f7dba44e..83c27c22783 100644 --- a/crates/js-sys/src/lib.rs +++ b/crates/js-sys/src/lib.rs @@ -495,9 +495,6 @@ extern "C" { pub fn slice_with_end(this: &SharedArrayBuffer, begin: u32, end: u32) -> SharedArrayBuffer; } -unsafe impl Send for SharedArrayBuffer {} -unsafe impl Sync for SharedArrayBuffer {} - // Array Iterator #[wasm_bindgen] extern "C" { diff --git a/examples/raytrace-parallel/Cargo.toml b/examples/raytrace-parallel/Cargo.toml index 0a6dabae308..2521574db79 100644 --- a/examples/raytrace-parallel/Cargo.toml +++ b/examples/raytrace-parallel/Cargo.toml @@ -18,7 +18,7 @@ wasm-bindgen = { version = "0.2.48", features = ['serde-serialize'] } wasm-bindgen-futures = "0.3.25" [dependencies.web-sys] -version = "0.3.4" +version = "0.3.23" features = [ 'CanvasRenderingContext2d', 'ErrorEvent', diff --git a/examples/raytrace-parallel/src/lib.rs b/examples/raytrace-parallel/src/lib.rs index 8f99ed9236c..32d96d344be 100644 --- a/examples/raytrace-parallel/src/lib.rs +++ b/examples/raytrace-parallel/src/lib.rs @@ -92,7 +92,7 @@ impl Scene { .map(move |_data| image_data(base, len, width, height).into()); Ok(RenderingScene { - promise: wasm_bindgen_futures::future_to_promise(done), + promise: wasm_bindgen_futures::atomics::future_to_promise(done), base, len, height, From 06c783d5e3cab7242a6c3013c13652f5f899e303 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Mon, 17 Jun 2019 20:29:39 +0300 Subject: [PATCH 06/18] placed web-sys dependency behind a feature flag in wasm-bindgen-futures --- crates/futures/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 69b9a0f33d9..04726409716 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -18,9 +18,9 @@ futures-util-preview = { version = "0.3.0-alpha.15", optional = true } futures-channel-preview = { version = "0.3.0-alpha.15", optional = true } lazy_static = { version = "1.3.0", optional = true } -[dependencies.web-sys] +[target.'cfg(target_feature = "atomics")'.dependencies.web-sys] path = "../web-sys" -version = "0.3.22" +version = "0.3.23" features = [ "MessageEvent", "Worker", From 221dc732afd92b5e05b825b721bca2781b95f255 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Mon, 17 Jun 2019 21:08:06 +0300 Subject: [PATCH 07/18] updated default timeout and wait_async signature in wasm-bindgen-futures --- crates/futures/src/atomics.rs | 29 ++++++++++++++--------------- crates/futures/src/lib.rs | 6 +----- crates/futures/src/polyfill.rs | 23 +++++++++++++---------- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs index a9644b2571f..a5d57eac316 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/atomics.rs @@ -7,9 +7,8 @@ use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; -use js_sys::{Atomics, Int32Array, WebAssembly, Function, Promise}; +use js_sys::{Function, Promise}; use wasm_bindgen::prelude::*; -use wasm_bindgen::JsCast; macro_rules! console_log { ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) @@ -112,8 +111,8 @@ impl Future for JsFuture { /// resolve**. Instead it will be a leaked promise. This is an unfortunate /// limitation of wasm currently that's hoped to be fixed one day! pub fn future_to_promise(future: F) -> Promise - where - F: Future + 'static, +where + F: Future + 'static, { _future_to_promise(Box::new(future)) } @@ -210,11 +209,16 @@ fn _future_to_promise(future: Box>) } impl Notify for Waker { - fn notify(&self, id: usize) { + fn notify(&self, _id: usize) { console_log!("Waker notify"); if !self.notified.swap(true, Ordering::SeqCst) { console_log!("Waker, inside if"); - let _ = unsafe { core::arch::wasm32::atomic_notify(&self.value as *const AtomicI32 as *mut i32, 0) }; + let _ = unsafe { + core::arch::wasm32::atomic_notify( + &self.value as *const AtomicI32 as *mut i32, + 0, + ) + }; } } } @@ -249,20 +253,15 @@ fn _future_to_promise(future: Box>) } }; - let memory_buffer = wasm_bindgen::memory() - .dyn_into::() - .expect("Should cast a memory to WebAssembly::Memory") - .buffer(); - let value_location = &package.waker.value as *const AtomicI32 as u32 / 4; - let array = Int32Array::new(&memory_buffer); // Use `Promise.then` on a resolved promise to place our execution // onto the next turn of the microtask queue, enqueueing our poll // operation. We don't currently poll immediately as it turns out // `futures` crate adapters aren't compatible with it and it also // helps avoid blowing the stack by accident. - let promise = crate::polyfill::wait_async(array, value_location, 0).expect("Should create a Promise"); + let promise = + crate::polyfill::wait_async(value_location, 0).expect("Should create a Promise"); let closure = Closure::once(Box::new(move |_| { Package::poll(&me); }) as Box); @@ -340,8 +339,8 @@ fn _future_to_promise(future: Box>) /// /// This function has the same panic behavior as `future_to_promise`. pub fn spawn_local(future: F) - where - F: Future + 'static, +where + F: Future + 'static, { future_to_promise( future diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index becc6246e0a..e21c94ea584 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -102,7 +102,6 @@ //! ``` #![feature(stdsimd)] - #![deny(missing_docs)] #[cfg(feature = "futures_0_3")] @@ -341,13 +340,10 @@ fn _future_to_promise(future: Box>) // our `Waiting` state, and resume the polling process State::Polling => { me.notified.set(State::Waiting(me.clone())); - break; } - State::Waiting(_) => { - panic!("shouldn't see waiting state!") - } + State::Waiting(_) => panic!("shouldn't see waiting state!"), } let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index ded6f50fbf1..b742a3c04f7 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -41,7 +41,7 @@ use std::rc::Rc; use js_sys::{ encode_uri_component, Array, Atomics, Error, Function, Int32Array, JsString, Promise, Reflect, - SharedArrayBuffer, + SharedArrayBuffer, WebAssembly, }; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; @@ -57,6 +57,8 @@ extern "C" { fn log(s: &str); } +const DEFAULT_TIMEOUT: f64 = 10.0; + const HELPER_CODE: &'static str = " onmessage = function (ev) { try { @@ -103,9 +105,8 @@ fn free_helper(helper: &Rc) { }); } -pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Result { - let timeout = 0.1; - wait_async_with_timeout(indexed_array, index, value, timeout) +pub fn wait_async(index: u32, value: i32) -> Result { + wait_async_with_timeout(index, value, DEFAULT_TIMEOUT) } fn get_array_item(array: &JsValue, index: u32) -> JsValue { @@ -117,12 +118,14 @@ fn get_array_item(array: &JsValue, index: u32) -> JsValue { // for parameter validation. The promise is resolved with a string as from // Atomics.wait, or, in the case something went completely wrong, it is // rejected with an error string. -pub fn wait_async_with_timeout( - indexed_array: Int32Array, - index: u32, - value: i32, - timeout: f64, -) -> Result { +pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result { + let memory_buffer = wasm_bindgen::memory() + .dyn_into::() + .expect("Should cast a memory to WebAssembly::Memory") + .buffer(); + + let indexed_array = Int32Array::new(&memory_buffer); + if !indexed_array.buffer().has_type::() { console_log!("polyfill, not a SharedArrayBuffer"); return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into()); From 6ab1a49a41aa003c9846e21626d43b4a8c77535a Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Thu, 27 Jun 2019 00:06:43 +0300 Subject: [PATCH 08/18] moved lib.rs to stable.rs in wasm-bindgen-futures, updated during review --- crates/futures/Cargo.toml | 3 +- crates/futures/src/atomics.rs | 30 +-- crates/futures/src/lib.rs | 334 ++------------------------ crates/futures/src/polyfill.rs | 58 ++--- crates/futures/src/stable.rs | 308 ++++++++++++++++++++++++ examples/raytrace-parallel/src/lib.rs | 2 +- 6 files changed, 350 insertions(+), 385 deletions(-) create mode 100644 crates/futures/src/stable.rs diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 04726409716..b07b78f1770 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -11,6 +11,7 @@ version = "0.3.25" edition = "2018" [dependencies] +cfg-if = "0.1.9" futures = "0.1.20" js-sys = { path = "../js-sys", version = '0.3.25' } wasm-bindgen = { path = "../..", version = '0.2.48' } @@ -20,7 +21,7 @@ lazy_static = { version = "1.3.0", optional = true } [target.'cfg(target_feature = "atomics")'.dependencies.web-sys] path = "../web-sys" -version = "0.3.23" +version = "0.3.24" features = [ "MessageEvent", "Worker", diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs index a5d57eac316..61481707d96 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/atomics.rs @@ -10,16 +10,6 @@ use futures::sync::oneshot; use js_sys::{Function, Promise}; use wasm_bindgen::prelude::*; -macro_rules! console_log { - ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) -} - -#[wasm_bindgen] -extern "C" { - #[wasm_bindgen(js_namespace = console)] - fn log(s: &str); -} - /// A Rust `Future` backed by a JavaScript `Promise`. /// /// This type is constructed with a JavaScript `Promise` object and translates @@ -210,13 +200,11 @@ fn _future_to_promise(future: Box>) impl Notify for Waker { fn notify(&self, _id: usize) { - console_log!("Waker notify"); if !self.notified.swap(true, Ordering::SeqCst) { - console_log!("Waker, inside if"); let _ = unsafe { core::arch::wasm32::atomic_notify( &self.value as *const AtomicI32 as *mut i32, - 0, + std::u32::MAX, // number of threads to notify ) }; } @@ -224,11 +212,9 @@ fn _future_to_promise(future: Box>) } fn poll_again(package: Arc) { - console_log!("poll_again called"); let me = match package.notified.replace(State::Notified) { // we need to schedule polling to resume, so keep going State::Waiting(me) => { - console_log!("poll_again Waiting"); me } @@ -236,7 +222,6 @@ fn _future_to_promise(future: Box>) // having now coalesced the notifications we return as it's // still someone else's job to process this State::Notified => { - console_log!("poll_again Notified"); return; } @@ -248,20 +233,17 @@ fn _future_to_promise(future: Box>) // so we bail out. // later see State::Polling => { - console_log!("poll_again Polling"); return; } }; - let value_location = &package.waker.value as *const AtomicI32 as u32 / 4; - // Use `Promise.then` on a resolved promise to place our execution // onto the next turn of the microtask queue, enqueueing our poll // operation. We don't currently poll immediately as it turns out // `futures` crate adapters aren't compatible with it and it also // helps avoid blowing the stack by accident. let promise = - crate::polyfill::wait_async(value_location, 0).expect("Should create a Promise"); + crate::polyfill::wait_async(&package.waker.value).expect("Should create a Promise"); let closure = Closure::once(Box::new(move |_| { Package::poll(&me); }) as Box); @@ -284,9 +266,7 @@ fn _future_to_promise(future: Box>) match me.notified.replace(State::Polling) { // We received a notification while previously polling, or // this is the initial poll. We've got work to do below! - State::Notified => { - console_log!("Package::poll Notified"); - } + State::Notified => {} // We've gone through this loop once and no notification was // received while we were executing work. That means we got @@ -296,8 +276,6 @@ fn _future_to_promise(future: Box>) // When the notification comes in it'll notify our task, see // our `Waiting` state, and resume the polling process State::Polling => { - console_log!("Package::poll Polling"); - me.notified.set(State::Waiting(me.clone())); poll_again(me.clone()); @@ -306,8 +284,6 @@ fn _future_to_promise(future: Box>) } State::Waiting(_) => { - console_log!("Package::poll Waiting"); - panic!("shouldn't see waiting state!") } } diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index e21c94ea584..2b6278f651b 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -101,326 +101,28 @@ //! } //! ``` -#![feature(stdsimd)] +#![cfg_attr(target_feature = "atomics", feature(stdsimd))] #![deny(missing_docs)] -#[cfg(feature = "futures_0_3")] -/// Contains a Futures 0.3 implementation of this crate. -pub mod futures_0_3; +use cfg_if::cfg_if; -#[cfg(target_feature = "atomics")] -/// Contains a thread-safe version of this crate, with Futures 0.1 -pub mod atomics; +cfg_if! { + if #[cfg(target_feature = "atomics")] { + /// Contains a thread-safe version of this crate, with Futures 0.1 + pub mod atomics; -#[cfg(target_feature = "atomics")] -/// Polyfill for `Atomics.waitAsync` function -mod polyfill; + /// Polyfill for `Atomics.waitAsync` function + mod polyfill; -use std::cell::{Cell, RefCell}; -use std::fmt; -use std::rc::Rc; -use std::sync::Arc; + pub use atomics::*; + } else if #[cfg(feature = "futures_0_3")] { + /// Contains a Futures 0.3 implementation of this crate. + pub mod futures_0_3; -use futures::executor::{self, Notify, Spawn}; -use futures::future; -use futures::prelude::*; -use futures::sync::oneshot; -use js_sys::{Function, Promise}; -use wasm_bindgen::prelude::*; - -/// A Rust `Future` backed by a JavaScript `Promise`. -/// -/// This type is constructed with a JavaScript `Promise` object and translates -/// it to a Rust `Future`. This type implements the `Future` trait from the -/// `futures` crate and will either succeed or fail depending on what happens -/// with the JavaScript `Promise`. -/// -/// Currently this type is constructed with `JsFuture::from`. -pub struct JsFuture { - rx: oneshot::Receiver>, -} - -impl fmt::Debug for JsFuture { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "JsFuture {{ ... }}") - } -} - -impl From for JsFuture { - fn from(js: Promise) -> JsFuture { - // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. We're currently - // assuming that JS engines will unconditionally invoke precisely one of - // these callbacks, no matter what. - // - // Ideally we'd have a way to cancel the callbacks getting invoked and - // free up state ourselves when this `JsFuture` is dropped. We don't - // have that, though, and one of the callbacks is likely always going to - // be invoked. - // - // As a result we need to make sure that no matter when the callbacks - // are invoked they are valid to be called at any time, which means they - // have to be self-contained. Through the `Closure::once` and some - // `Rc`-trickery we can arrange for both instances of `Closure`, and the - // `Rc`, to all be destroyed once the first one is called. - let (tx, rx) = oneshot::channel(); - let state = Rc::new(RefCell::new(None)); - let state2 = state.clone(); - let resolve = Closure::once(move |val| finish(&state2, Ok(val))); - let state2 = state.clone(); - let reject = Closure::once(move |val| finish(&state2, Err(val))); - - js.then2(&resolve, &reject); - *state.borrow_mut() = Some((tx, resolve, reject)); - - return JsFuture { rx }; - - fn finish( - state: &RefCell< - Option<( - oneshot::Sender>, - Closure, - Closure, - )>, - >, - val: Result, - ) { - match state.borrow_mut().take() { - // We don't have any guarantee that anyone's still listening at this - // point (the Rust `JsFuture` could have been dropped) so simply - // ignore any errors here. - Some((tx, _, _)) => drop(tx.send(val)), - None => wasm_bindgen::throw_str("cannot finish twice"), - } - } - } -} - -impl Future for JsFuture { - type Item = JsValue; - type Error = JsValue; - - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(val)) => val.map(Async::Ready), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => wasm_bindgen::throw_str("cannot cancel"), - } - } -} - -/// Converts a Rust `Future` into a JavaScript `Promise`. -/// -/// This function will take any future in Rust and schedule it to be executed, -/// returning a JavaScript `Promise` which can then be passed back to JavaScript -/// to get plumbed into the rest of a system. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. The -/// returned `Promise` will be resolved or rejected when the future completes, -/// depending on whether it finishes with `Ok` or `Err`. -/// -/// # Panics -/// -/// Note that in wasm panics are currently translated to aborts, but "abort" in -/// this case means that a JavaScript exception is thrown. The wasm module is -/// still usable (likely erroneously) after Rust panics. -/// -/// If the `future` provided panics then the returned `Promise` **will not -/// resolve**. Instead it will be a leaked promise. This is an unfortunate -/// limitation of wasm currently that's hoped to be fixed one day! -pub fn future_to_promise(future: F) -> Promise -where - F: Future + 'static, -{ - _future_to_promise(Box::new(future)) -} - -// Implementation of actually transforming a future into a JavaScript `Promise`. -// -// The only primitive we have to work with here is `Promise::new`, which gives -// us two callbacks that we can use to either reject or resolve the promise. -// It's our job to ensure that one of those callbacks is called at the -// appropriate time. -// -// Now we know that JavaScript (in general) can't block and is largely -// notification/callback driven. That means that our future must either have -// synchronous computational work to do, or it's "scheduled a notification" to -// happen. These notifications are likely callbacks to get executed when things -// finish (like a different promise or something like `setTimeout`). The general -// idea here is thus to do as much synchronous work as we can and then otherwise -// translate notifications of a future's task into "let's poll the future!" -// -// This isn't necessarily the greatest future executor in the world, but it -// should get the job done for now hopefully. -fn _future_to_promise(future: Box>) -> Promise { - let mut future = Some(executor::spawn(future)); - return Promise::new(&mut |resolve, reject| { - Package::poll(&Arc::new(Package { - spawn: RefCell::new(future.take().unwrap()), - resolve, - reject, - notified: Cell::new(State::Notified), - })); - }); - - struct Package { - // Our "spawned future". This'll have everything we need to poll the - // future and continue to move it forward. - spawn: RefCell>>>, - - // The current state of this future, expressed in an enum below. This - // indicates whether we're currently polling the future, received a - // notification and need to keep polling, or if we're waiting for a - // notification to come in (and no one is polling). - notified: Cell, - - // Our two callbacks connected to the `Promise` that we returned to - // JavaScript. We'll be invoking one of these at the end. - resolve: Function, - reject: Function, - } - - // The possible states our `Package` (future) can be in, tracked internally - // and used to guide what happens when polling a future. - enum State { - // This future is currently and actively being polled. Attempting to - // access the future will result in a runtime panic and is considered a - // bug. - Polling, - - // This future has been notified, while it was being polled. This marker - // is used in the `Notify` implementation below, and indicates that a - // notification was received that the future is ready to make progress. - // If seen, however, it probably means that the future is also currently - // being polled. - Notified, - - // The future is blocked, waiting for something to happen. Stored here - // is a self-reference to the future itself so we can pull it out in - // `Notify` and continue polling. - // - // Note that the self-reference here is an Arc-cycle that will leak - // memory unless the future completes, but currently that should be ok - // as we'll have to stick around anyway while the future is executing! - // - // This state is removed as soon as a notification comes in, so the leak - // should only be "temporary" - Waiting(Arc), - } - - // No shared memory right now, wasm is single threaded, no need to worry - // about this! - unsafe impl Send for Package {} - unsafe impl Sync for Package {} - - impl Package { - // Move the future contained in `me` as far forward as we can. This will - // do as much synchronous work as possible to complete the future, - // ensuring that when it blocks we're scheduled to get notified via some - // callback somewhere at some point (vague, right?) - // - // TODO: this probably shouldn't do as much synchronous work as possible - // as it can starve other computations. Rather it should instead - // yield every so often with something like `setTimeout` with the - // timeout set to zero. - fn poll(me: &Arc) { - loop { - match me.notified.replace(State::Polling) { - // We received a notification while previously polling, or - // this is the initial poll. We've got work to do below! - State::Notified => {} - - // We've gone through this loop once and no notification was - // received while we were executing work. That means we got - // `NotReady` below and we're scheduled to receive a - // notification. Block ourselves and wait for later. - // - // When the notification comes in it'll notify our task, see - // our `Waiting` state, and resume the polling process - State::Polling => { - me.notified.set(State::Waiting(me.clone())); - break; - } - - State::Waiting(_) => panic!("shouldn't see waiting state!"), - } - - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { - // If the future is ready, immediately call the - // resolve/reject callback and then return as we're done. - Ok(Async::Ready(value)) => (value, &me.resolve), - Err(value) => (value, &me.reject), - - // Otherwise keep going in our loop, if we weren't notified - // we'll break out and start waiting. - Ok(Async::NotReady) => continue, - }; - - drop(f.call1(&JsValue::undefined(), &val)); - break; - } - } - } - - impl Notify for Package { - fn notify(&self, _id: usize) { - let me = match self.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => me, - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => return, - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => return, - }; - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - // - // Note that the `Rc`/`RefCell` trick here is basically to just - // ensure that our `Closure` gets cleaned up appropriately. - let promise = Promise::resolve(&JsValue::undefined()); - let slot = Rc::new(RefCell::new(None)); - let slot2 = slot.clone(); - let closure = Closure::wrap(Box::new(move |_| { - let myself = slot2.borrow_mut().take(); - debug_assert!(myself.is_some()); - Package::poll(&me); - }) as Box); - promise.then(&closure); - *slot.borrow_mut() = Some(closure); - } - } -} - -/// Converts a Rust `Future` on a local task queue. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. -/// -/// # Panics -/// -/// This function has the same panic behavior as `future_to_promise`. -pub fn spawn_local(future: F) -where - F: Future + 'static, -{ - future_to_promise( - future - .map(|()| JsValue::undefined()) - .or_else(|()| future::ok::(JsValue::undefined())), - ); + pub mod stable; + pub use stable::*; + } else { + pub mod stable; + pub use stable::*; + } } diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index b742a3c04f7..9951c85f327 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -37,27 +37,17 @@ */ use std::cell::RefCell; -use std::rc::Rc; +use std::sync::atomic::{AtomicI32, Ordering}; use js_sys::{ - encode_uri_component, Array, Atomics, Error, Function, Int32Array, JsString, Promise, Reflect, - SharedArrayBuffer, WebAssembly, + encode_uri_component, Array, Function, Int32Array, JsString, Promise, Reflect, + WebAssembly, }; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{MessageEvent, Worker}; -macro_rules! console_log { - ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) -} - -#[wasm_bindgen] -extern "C" { - #[wasm_bindgen(js_namespace = console)] - fn log(s: &str); -} - -const DEFAULT_TIMEOUT: f64 = 10.0; +const DEFAULT_TIMEOUT: f64 = std::f64::INFINITY; const HELPER_CODE: &'static str = " onmessage = function (ev) { @@ -65,7 +55,6 @@ onmessage = function (ev) { switch (ev.data[0]) { case 'wait': { let [_, ia, index, value, timeout] = ev.data; - console.log('wait event inside a worker'); let result = Atomics.wait(ia, index, value, timeout); postMessage(['ok', result]); break; @@ -75,17 +64,17 @@ onmessage = function (ev) { } } } catch (e) { - console.log('Exception in wait helper'); + console.log('Exception in wait helper', e); postMessage(['error', 'Exception']); } } "; thread_local! { - static HELPERS: RefCell>> = RefCell::new(vec![]); + static HELPERS: RefCell> = RefCell::new(vec![]); } -fn alloc_helper() -> Rc { +fn alloc_helper() -> Worker { HELPERS.with(|helpers| { if let Some(helper) = helpers.borrow_mut().pop() { return helper; @@ -95,18 +84,18 @@ fn alloc_helper() -> Rc { let encoded: String = encode_uri_component(HELPER_CODE).into(); initialization_string.push_str(&encoded); - return Rc::new(Worker::new(&initialization_string).expect("Should create a Worker")); + Worker::new(&initialization_string).expect("Should create a Worker") }) } -fn free_helper(helper: &Rc) { +fn free_helper(helper: Worker) { HELPERS.with(move |helpers| { helpers.borrow_mut().push(helper.clone()); }); } -pub fn wait_async(index: u32, value: i32) -> Result { - wait_async_with_timeout(index, value, DEFAULT_TIMEOUT) +pub fn wait_async(value: &AtomicI32) -> Result { + wait_async_with_timeout(value, DEFAULT_TIMEOUT) } fn get_array_item(array: &JsValue, index: u32) -> JsValue { @@ -118,7 +107,7 @@ fn get_array_item(array: &JsValue, index: u32) -> JsValue { // for parameter validation. The promise is resolved with a string as from // Atomics.wait, or, in the case something went completely wrong, it is // rejected with an error string. -pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result { +pub fn wait_async_with_timeout(value: &AtomicI32, timeout: f64) -> Result { let memory_buffer = wasm_bindgen::memory() .dyn_into::() .expect("Should cast a memory to WebAssembly::Memory") @@ -126,30 +115,20 @@ pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result

() { - console_log!("polyfill, not a SharedArrayBuffer"); - return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into()); - } - - // Optimization, avoid the helper thread in this common case. - if Atomics::load(&indexed_array, index)? != value { - console_log!("polyfill, not-equal"); - return Ok(Promise::resolve(&JsString::from("not-equal"))); - } + let index = value as *const AtomicI32 as u32 / 4; + let value_i32 = value.load(Ordering::SeqCst); // General case, we must wait. - console_log!("polyfill, general case"); - Ok(Promise::new( &mut move |resolve: Function, reject: Function| { let helper = alloc_helper(); let helper_ref = helper.clone(); - let onmessage_callback = Closure::once_into_js(Box::new(move |e: MessageEvent| { + let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| { // Free the helper early so that it can be reused if the resolution // needs a helper. - free_helper(&helper_ref); + free_helper(helper_ref); match String::from( get_array_item(&e.data(), 0) .as_string() @@ -172,8 +151,7 @@ pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result

(), } - }) - as Box); + }); helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); // onmessage_callback.forget(); @@ -196,7 +174,7 @@ pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result

>, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. We're currently + // assuming that JS engines will unconditionally invoke precisely one of + // these callbacks, no matter what. + // + // Ideally we'd have a way to cancel the callbacks getting invoked and + // free up state ourselves when this `JsFuture` is dropped. We don't + // have that, though, and one of the callbacks is likely always going to + // be invoked. + // + // As a result we need to make sure that no matter when the callbacks + // are invoked they are valid to be called at any time, which means they + // have to be self-contained. Through the `Closure::once` and some + // `Rc`-trickery we can arrange for both instances of `Closure`, and the + // `Rc`, to all be destroyed once the first one is called. + let (tx, rx) = oneshot::channel(); + let state = Rc::new(RefCell::new(None)); + let state2 = state.clone(); + let resolve = Closure::once(move |val| finish(&state2, Ok(val))); + let state2 = state.clone(); + let reject = Closure::once(move |val| finish(&state2, Err(val))); + + js.then2(&resolve, &reject); + *state.borrow_mut() = Some((tx, resolve, reject)); + + return JsFuture { rx }; + + fn finish( + state: &RefCell< + Option<( + oneshot::Sender>, + Closure, + Closure, + )>, + >, + val: Result, + ) { + match state.borrow_mut().take() { + // We don't have any guarantee that anyone's still listening at this + // point (the Rust `JsFuture` could have been dropped) so simply + // ignore any errors here. + Some((tx, _, _)) => drop(tx.send(val)), + None => wasm_bindgen::throw_str("cannot finish twice"), + } + } + } +} + +impl Future for JsFuture { + type Item = JsValue; + type Error = JsValue; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(val)) => val.map(Async::Ready), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => wasm_bindgen::throw_str("cannot cancel"), + } + } +} + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed back to JavaScript +/// to get plumbed into the rest of a system. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. The +/// returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> Promise + where + F: Future + 'static, +{ + _future_to_promise(Box::new(future)) +} + +// Implementation of actually transforming a future into a JavaScript `Promise`. +// +// The only primitive we have to work with here is `Promise::new`, which gives +// us two callbacks that we can use to either reject or resolve the promise. +// It's our job to ensure that one of those callbacks is called at the +// appropriate time. +// +// Now we know that JavaScript (in general) can't block and is largely +// notification/callback driven. That means that our future must either have +// synchronous computational work to do, or it's "scheduled a notification" to +// happen. These notifications are likely callbacks to get executed when things +// finish (like a different promise or something like `setTimeout`). The general +// idea here is thus to do as much synchronous work as we can and then otherwise +// translate notifications of a future's task into "let's poll the future!" +// +// This isn't necessarily the greatest future executor in the world, but it +// should get the job done for now hopefully. +fn _future_to_promise(future: Box>) -> Promise { + let mut future = Some(executor::spawn(future)); + return Promise::new(&mut |resolve, reject| { + Package::poll(&Arc::new(Package { + spawn: RefCell::new(future.take().unwrap()), + resolve, + reject, + notified: Cell::new(State::Notified), + })); + }); + + struct Package { + // Our "spawned future". This'll have everything we need to poll the + // future and continue to move it forward. + spawn: RefCell>>>, + + // The current state of this future, expressed in an enum below. This + // indicates whether we're currently polling the future, received a + // notification and need to keep polling, or if we're waiting for a + // notification to come in (and no one is polling). + notified: Cell, + + // Our two callbacks connected to the `Promise` that we returned to + // JavaScript. We'll be invoking one of these at the end. + resolve: Function, + reject: Function, + } + + // The possible states our `Package` (future) can be in, tracked internally + // and used to guide what happens when polling a future. + enum State { + // This future is currently and actively being polled. Attempting to + // access the future will result in a runtime panic and is considered a + // bug. + Polling, + + // This future has been notified, while it was being polled. This marker + // is used in the `Notify` implementation below, and indicates that a + // notification was received that the future is ready to make progress. + // If seen, however, it probably means that the future is also currently + // being polled. + Notified, + + // The future is blocked, waiting for something to happen. Stored here + // is a self-reference to the future itself so we can pull it out in + // `Notify` and continue polling. + // + // Note that the self-reference here is an Arc-cycle that will leak + // memory unless the future completes, but currently that should be ok + // as we'll have to stick around anyway while the future is executing! + // + // This state is removed as soon as a notification comes in, so the leak + // should only be "temporary" + Waiting(Arc), + } + + // No shared memory right now, wasm is single threaded, no need to worry + // about this! + unsafe impl Send for Package {} + unsafe impl Sync for Package {} + + impl Package { + // Move the future contained in `me` as far forward as we can. This will + // do as much synchronous work as possible to complete the future, + // ensuring that when it blocks we're scheduled to get notified via some + // callback somewhere at some point (vague, right?) + // + // TODO: this probably shouldn't do as much synchronous work as possible + // as it can starve other computations. Rather it should instead + // yield every so often with something like `setTimeout` with the + // timeout set to zero. + fn poll(me: &Arc) { + loop { + match me.notified.replace(State::Polling) { + // We received a notification while previously polling, or + // this is the initial poll. We've got work to do below! + State::Notified => {} + + // We've gone through this loop once and no notification was + // received while we were executing work. That means we got + // `NotReady` below and we're scheduled to receive a + // notification. Block ourselves and wait for later. + // + // When the notification comes in it'll notify our task, see + // our `Waiting` state, and resume the polling process + State::Polling => { + me.notified.set(State::Waiting(me.clone())); + break; + } + + State::Waiting(_) => panic!("shouldn't see waiting state!"), + } + + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { + // If the future is ready, immediately call the + // resolve/reject callback and then return as we're done. + Ok(Async::Ready(value)) => (value, &me.resolve), + Err(value) => (value, &me.reject), + + // Otherwise keep going in our loop, if we weren't notified + // we'll break out and start waiting. + Ok(Async::NotReady) => continue, + }; + + drop(f.call1(&JsValue::undefined(), &val)); + break; + } + } + } + + impl Notify for Package { + fn notify(&self, _id: usize) { + let me = match self.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => me, + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => return, + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => return, + }; + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + // + // Note that the `Rc`/`RefCell` trick here is basically to just + // ensure that our `Closure` gets cleaned up appropriately. + let promise = Promise::resolve(&JsValue::undefined()); + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let closure = Closure::wrap(Box::new(move |_| { + let myself = slot2.borrow_mut().take(); + debug_assert!(myself.is_some()); + Package::poll(&me); + }) as Box); + promise.then(&closure); + *slot.borrow_mut() = Some(closure); + } + } +} + +/// Converts a Rust `Future` on a local task queue. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +pub fn spawn_local(future: F) + where + F: Future + 'static, +{ + future_to_promise( + future + .map(|()| JsValue::undefined()) + .or_else(|()| future::ok::(JsValue::undefined())), + ); +} diff --git a/examples/raytrace-parallel/src/lib.rs b/examples/raytrace-parallel/src/lib.rs index 32d96d344be..8f99ed9236c 100644 --- a/examples/raytrace-parallel/src/lib.rs +++ b/examples/raytrace-parallel/src/lib.rs @@ -92,7 +92,7 @@ impl Scene { .map(move |_data| image_data(base, len, width, height).into()); Ok(RenderingScene { - promise: wasm_bindgen_futures::atomics::future_to_promise(done), + promise: wasm_bindgen_futures::future_to_promise(done), base, len, height, From cbaa1d302ab83676e273583369775b1169735e7c Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Fri, 28 Jun 2019 11:14:14 +0300 Subject: [PATCH 09/18] added documentation comment for a stable version of wasm-bindgen-futures --- crates/futures/src/lib.rs | 2 ++ crates/futures/src/polyfill.rs | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 2b6278f651b..e8a6103c022 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -119,9 +119,11 @@ cfg_if! { /// Contains a Futures 0.3 implementation of this crate. pub mod futures_0_3; + /// Contains stable version of the crate pub mod stable; pub use stable::*; } else { + /// Contains stable version of the crate pub mod stable; pub use stable::*; } diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index 9951c85f327..8a2c3424e8e 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -1,7 +1,7 @@ -/* - * The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async - * and ported to Rust - */ +//! +//! The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async +//! and ported to Rust +//! /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this From 45d2c7ce93a7626d6a7b52bf4e00c3333ff35722 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Wed, 17 Jul 2019 01:24:44 +0300 Subject: [PATCH 10/18] updated to the latest master --- crates/futures/src/atomics.rs | 92 +++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs index 61481707d96..5a5e1dd49e5 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/atomics.rs @@ -1,5 +1,6 @@ use std::cell::{Cell, RefCell}; use std::fmt; +use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; @@ -19,9 +20,7 @@ use wasm_bindgen::prelude::*; /// /// Currently this type is constructed with `JsFuture::from`. pub struct JsFuture { - resolved: oneshot::Receiver, - rejected: oneshot::Receiver, - callbacks: Option<(Closure, Closure)>, + rx: oneshot::Receiver>, } impl fmt::Debug for JsFuture { @@ -33,28 +32,49 @@ impl fmt::Debug for JsFuture { impl From for JsFuture { fn from(js: Promise) -> JsFuture { // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. These two callbacks - // will be connected to oneshot channels which feed back into our - // future. + // resolved value and one for the rejected value. We're currently + // assuming that JS engines will unconditionally invoke precisely one of + // these callbacks, no matter what. // - // This may not be the speediest option today but it should work! - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - let mut tx1 = Some(tx1); - let resolve = Closure::wrap(Box::new(move |val| { - drop(tx1.take().unwrap().send(val)); - }) as Box); - let mut tx2 = Some(tx2); - let reject = Closure::wrap(Box::new(move |val| { - drop(tx2.take().unwrap().send(val)); - }) as Box); + // Ideally we'd have a way to cancel the callbacks getting invoked and + // free up state ourselves when this `JsFuture` is dropped. We don't + // have that, though, and one of the callbacks is likely always going to + // be invoked. + // + // As a result we need to make sure that no matter when the callbacks + // are invoked they are valid to be called at any time, which means they + // have to be self-contained. Through the `Closure::once` and some + // `Rc`-trickery we can arrange for both instances of `Closure`, and the + // `Rc`, to all be destroyed once the first one is called. + let (tx, rx) = oneshot::channel(); + let state = Rc::new(RefCell::new(None)); + let state2 = state.clone(); + let resolve = Closure::once(move |val| finish(&state2, Ok(val))); + let state2 = state.clone(); + let reject = Closure::once(move |val| finish(&state2, Err(val))); js.then2(&resolve, &reject); - - JsFuture { - resolved: rx1, - rejected: rx2, - callbacks: Some((resolve, reject)), + *state.borrow_mut() = Some((tx, resolve, reject)); + + return JsFuture { rx }; + + fn finish( + state: &RefCell< + Option<( + oneshot::Sender>, + Closure, + Closure, + )>, + >, + val: Result, + ) { + match state.borrow_mut().take() { + // We don't have any guarantee that anyone's still listening at this + // point (the Rust `JsFuture` could have been dropped) so simply + // ignore any errors here. + Some((tx, _, _)) => drop(tx.send(val)), + None => wasm_bindgen::throw_str("cannot finish twice"), + } } } } @@ -64,19 +84,11 @@ impl Future for JsFuture { type Error = JsValue; fn poll(&mut self) -> Poll { - // Test if either our resolved or rejected side is finished yet. Note - // that they will return errors if they're disconnected which can't - // happen until we drop the `callbacks` field, which doesn't happen - // till we're done, so we dont need to handle that. - if let Ok(Async::Ready(val)) = self.resolved.poll() { - drop(self.callbacks.take()); - return Ok(val.into()); + match self.rx.poll() { + Ok(Async::Ready(val)) => val.map(Async::Ready), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => wasm_bindgen::throw_str("cannot cancel"), } - if let Ok(Async::Ready(val)) = self.rejected.poll() { - drop(self.callbacks.take()); - return Err(val); - } - Ok(Async::NotReady) } } @@ -101,8 +113,8 @@ impl Future for JsFuture { /// resolve**. Instead it will be a leaked promise. This is an unfortunate /// limitation of wasm currently that's hoped to be fixed one day! pub fn future_to_promise(future: F) -> Promise -where - F: Future + 'static, + where + F: Future + 'static, { _future_to_promise(Box::new(future)) } @@ -283,9 +295,7 @@ fn _future_to_promise(future: Box>) break; } - State::Waiting(_) => { - panic!("shouldn't see waiting state!") - } + State::Waiting(_) => panic!("shouldn't see waiting state!"), } let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { @@ -315,8 +325,8 @@ fn _future_to_promise(future: Box>) /// /// This function has the same panic behavior as `future_to_promise`. pub fn spawn_local(future: F) -where - F: Future + 'static, + where + F: Future + 'static, { future_to_promise( future From 02be3690cfb6cd5f32e7811b1cfc8d4589fe52c9 Mon Sep 17 00:00:00 2001 From: ibaryshnikov Date: Wed, 17 Jul 2019 01:52:55 +0300 Subject: [PATCH 11/18] removed AtomicBool from Waker struct --- crates/futures/src/atomics.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs index 5a5e1dd49e5..b18b53093de 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/atomics.rs @@ -1,7 +1,7 @@ use std::cell::{Cell, RefCell}; use std::fmt; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; +use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use futures::executor::{self, Notify, Spawn}; @@ -196,23 +196,18 @@ fn _future_to_promise(future: Box>) Waiting(Arc), } + #[derive(Default)] struct Waker { + // worker will be waiting on this value + // 0 by default, which means not notified value: AtomicI32, - notified: AtomicBool, }; - impl Default for Waker { - fn default() -> Self { - Waker { - value: AtomicI32::new(0), - notified: AtomicBool::new(false), - } - } - } - impl Notify for Waker { fn notify(&self, _id: usize) { - if !self.notified.swap(true, Ordering::SeqCst) { + // since we have only value field here + // let it be 1 if notified, 0 if not + if self.value.swap(1, Ordering::SeqCst) == 0 { let _ = unsafe { core::arch::wasm32::atomic_notify( &self.value as *const AtomicI32 as *mut i32, @@ -256,9 +251,9 @@ fn _future_to_promise(future: Box>) // helps avoid blowing the stack by accident. let promise = crate::polyfill::wait_async(&package.waker.value).expect("Should create a Promise"); - let closure = Closure::once(Box::new(move |_| { + let closure = Closure::once(move |_| { Package::poll(&me); - }) as Box); + }); promise.then(&closure); closure.forget(); } From d122bbca1365dd07327614f56496b88b11378f3c Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 10:13:05 -0700 Subject: [PATCH 12/18] Emit a compiler error with futures 0.3 and atomics Not implemented yet, and the one there doesn't work with atomics! (we'll get around to this soon-ish) --- crates/futures/src/lib.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index e8a6103c022..8ed8d173e08 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -109,22 +109,24 @@ use cfg_if::cfg_if; cfg_if! { if #[cfg(target_feature = "atomics")] { /// Contains a thread-safe version of this crate, with Futures 0.1 - pub mod atomics; + mod atomics; /// Polyfill for `Atomics.waitAsync` function mod polyfill; pub use atomics::*; - } else if #[cfg(feature = "futures_0_3")] { + } else { + mod stable; + pub use stable::*; + } +} + +#[cfg(feature = "futures_0_3")] +cfg_if! { + if #[cfg(target_feature = "atomics")] { + compile_error!("futures 0.3 support is not implemented with atomics yet"); + } else { /// Contains a Futures 0.3 implementation of this crate. pub mod futures_0_3; - - /// Contains stable version of the crate - pub mod stable; - pub use stable::*; - } else { - /// Contains stable version of the crate - pub mod stable; - pub use stable::*; } } From 9f77f8dd0057a13c0b4c811fcdba278634adcf55 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 10:13:34 -0700 Subject: [PATCH 13/18] Update parallel raytrace example to use futures Use the atomics support now implemented! --- examples/raytrace-parallel/src/lib.rs | 42 ++++++------ examples/raytrace-parallel/src/pool.rs | 89 ++------------------------ 2 files changed, 27 insertions(+), 104 deletions(-) diff --git a/examples/raytrace-parallel/src/lib.rs b/examples/raytrace-parallel/src/lib.rs index 8f99ed9236c..cfc05d61e19 100644 --- a/examples/raytrace-parallel/src/lib.rs +++ b/examples/raytrace-parallel/src/lib.rs @@ -1,3 +1,4 @@ +use futures::sync::oneshot; use futures::Future; use js_sys::{Promise, Uint8ClampedArray, WebAssembly}; use rayon::prelude::*; @@ -69,27 +70,28 @@ impl Scene { // threads so we don't lock up the main thread, so we ship off a thread // which actually does the whole rayon business. When our returned // future is resolved we can pull out the final version of the image. - let done = pool - .run_notify(move || { - thread_pool.install(|| { - rgb_data - .par_chunks_mut(4) - .enumerate() - .for_each(|(i, chunk)| { - let i = i as u32; - let x = i % width; - let y = i / width; - let ray = raytracer::Ray::create_prime(x, y, &scene); - let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); - chunk[0] = result.data[0]; - chunk[1] = result.data[1]; - chunk[2] = result.data[2]; - chunk[3] = result.data[3]; - }); - }); + let (tx, rx) = oneshot::channel(); + pool.run(move || { + thread_pool.install(|| { rgb_data - })? - .map(move |_data| image_data(base, len, width, height).into()); + .par_chunks_mut(4) + .enumerate() + .for_each(|(i, chunk)| { + let i = i as u32; + let x = i % width; + let y = i / width; + let ray = raytracer::Ray::create_prime(x, y, &scene); + let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); + chunk[0] = result.data[0]; + chunk[1] = result.data[1]; + chunk[2] = result.data[2]; + chunk[3] = result.data[3]; + }); + }); + drop(tx.send(rgb_data)); + })?; + let done = rx.map(move |_data| image_data(base, len, width, height).into()) + .map_err(|_| JsValue::undefined()); Ok(RenderingScene { promise: wasm_bindgen_futures::future_to_promise(done), diff --git a/examples/raytrace-parallel/src/pool.rs b/examples/raytrace-parallel/src/pool.rs index 921d3c1611f..b5cd4dd7994 100644 --- a/examples/raytrace-parallel/src/pool.rs +++ b/examples/raytrace-parallel/src/pool.rs @@ -1,13 +1,8 @@ //! A small module that's intended to provide an example of creating a pool of //! web workers which can be used to execute `rayon`-style work. -use futures::sync::oneshot; -use futures::Future; -use std::cell::{RefCell, UnsafeCell}; -use std::mem; +use std::cell::RefCell; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; -use std::sync::Arc; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{DedicatedWorkerGlobalScope, MessageEvent}; @@ -141,12 +136,11 @@ impl WorkerPool { /// whatn it's done the worker is ready to execute more work. This method is /// used for all spawned workers to ensure that when the work is finished /// the worker is reclaimed back into this pool. - fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) { + fn reclaim_on_message(&self, worker: Worker) { let state = Rc::downgrade(&self.state); let worker2 = worker.clone(); let reclaim_slot = Rc::new(RefCell::new(None)); let slot2 = reclaim_slot.clone(); - let mut on_finish = Some(on_finish); let reclaim = Closure::wrap(Box::new(move |event: Event| { if let Some(error) = event.dyn_ref::() { console_log!("error in worker: {}", error.message()); @@ -155,11 +149,9 @@ impl WorkerPool { return; } - // If this is a completion event then we can execute our `on_finish` - // callback and we can also deallocate our own callback by clearing - // out `slot2` which contains our own closure. + // If this is a completion event then can deallocate our own + // callback by clearing out `slot2` which contains our own closure. if let Some(_msg) = event.dyn_ref::() { - on_finish.take().unwrap()(); if let Some(state) = state.upgrade() { state.push(worker2.clone()); } @@ -193,80 +185,9 @@ impl WorkerPool { /// a web worker, that error is returned. pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> { let worker = self.execute(f)?; - self.reclaim_on_message(worker, || {}); + self.reclaim_on_message(worker); Ok(()) } - - /// Executes the closure `f` in a web worker, returning a future of the - /// value that `f` produces. - /// - /// This method is the same as `run` execept that it allows recovering the - /// return value of the closure `f` in a nonblocking fashion with the future - /// returned. - /// - /// # Errors - /// - /// If an error happens while spawning a web worker or sending a message to - /// a web worker, that error is returned. - pub fn run_notify( - &self, - f: impl FnOnce() -> T + Send + 'static, - ) -> Result + 'static, JsValue> - where - T: Send + 'static, - { - // FIXME(#1379) we should just use the `oneshot` directly as the future, - // but we have to use JS callbacks to ensure we don't have futures cross - // threads as that's currently not safe to do so. - let (tx, rx) = oneshot::channel(); - let storage = Arc::new(AtomicValue::new(None)); - let storage2 = storage.clone(); - let worker = self.execute(move || { - assert!(storage2.replace(Some(f())).is_ok()); - })?; - self.reclaim_on_message(worker, move || match storage.replace(None) { - Ok(Some(val)) => drop(tx.send(val)), - _ => unreachable!(), - }); - - Ok(rx.map_err(|_| JsValue::undefined())) - } -} - -/// A small helper struct representing atomic access to an internal value `T` -/// -/// This struct only supports one API, `replace`, which will either succeed and -/// replace the internal value with another (returning the previous one), or it -/// will fail returning the value passed in. Failure happens when two threads -/// try to `replace` at the same time. -/// -/// This is only really intended to help safely transfer information between -/// threads, it doesn't provide any synchronization capabilities itself other -/// than a guaranteed safe API. -struct AtomicValue { - modifying: AtomicBool, - slot: UnsafeCell, -} - -unsafe impl Send for AtomicValue {} -unsafe impl Sync for AtomicValue {} - -impl AtomicValue { - fn new(val: T) -> AtomicValue { - AtomicValue { - modifying: AtomicBool::new(false), - slot: UnsafeCell::new(val), - } - } - - fn replace(&self, val: T) -> Result { - if self.modifying.swap(true, SeqCst) { - return Err(val); - } - let ret = unsafe { mem::replace(&mut *self.slot.get(), val) }; - self.modifying.store(false, SeqCst); - Ok(ret) - } } impl PoolState { From cde9684e4bf7d578285a9b4ea07f1b81a0efaa36 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 13:47:57 -0700 Subject: [PATCH 14/18] Clean up atomics/futures + polyfill * Remove now-unneeded `State` enum * Remove timeout argument from polyfill since we don't need it * Call `Atomics.waitAsync` if it's available instead of using our polyfill * Remove some extraneous dead code from the polyfill * Add a `val: i32` argument to the polyfill * Simplify the flow of futures with `Package` since `waitAsync` handles all the heavy lifting for us. * Remove `Arc` and just use `Package` * Remove `RefCell` from inside of `Package` now that it is no longer needed. --- crates/futures/src/atomics.rs | 258 +++++++++++++-------------------- crates/futures/src/polyfill.rs | 151 +++++-------------- 2 files changed, 135 insertions(+), 274 deletions(-) diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs index b18b53093de..9914fa156dc 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/atomics.rs @@ -1,4 +1,4 @@ -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::fmt; use std::rc::Rc; use std::sync::atomic::{AtomicI32, Ordering}; @@ -8,8 +8,9 @@ use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; -use js_sys::{Function, Promise}; +use js_sys::Function; use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; /// A Rust `Future` backed by a JavaScript `Promise`. /// @@ -23,14 +24,28 @@ pub struct JsFuture { rx: oneshot::Receiver>, } +// Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`. +#[wasm_bindgen] +extern "C" { + type Promise; + #[wasm_bindgen(method)] + fn then(this: &Promise, cb: &JsValue) -> Promise; + + type Atomics; + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)] + fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise; + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)] + fn get_wait_async() -> JsValue; +} + impl fmt::Debug for JsFuture { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "JsFuture {{ ... }}") } } -impl From for JsFuture { - fn from(js: Promise) -> JsFuture { +impl From for JsFuture { + fn from(js: js_sys::Promise) -> JsFuture { // Use the `then` method to schedule two callbacks, one for the // resolved value and one for the rejected value. We're currently // assuming that JS engines will unconditionally invoke precisely one of @@ -112,205 +127,132 @@ impl Future for JsFuture { /// If the `future` provided panics then the returned `Promise` **will not /// resolve**. Instead it will be a leaked promise. This is an unfortunate /// limitation of wasm currently that's hoped to be fixed one day! -pub fn future_to_promise(future: F) -> Promise - where - F: Future + 'static, +pub fn future_to_promise(future: F) -> js_sys::Promise +where + F: Future + 'static, { _future_to_promise(Box::new(future)) } // Implementation of actually transforming a future into a JavaScript `Promise`. // -// The only primitive we have to work with here is `Promise::new`, which gives -// us two callbacks that we can use to either reject or resolve the promise. -// It's our job to ensure that one of those callbacks is called at the -// appropriate time. -// -// Now we know that JavaScript (in general) can't block and is largely -// notification/callback driven. That means that our future must either have -// synchronous computational work to do, or it's "scheduled a notification" to -// happen. These notifications are likely callbacks to get executed when things -// finish (like a different promise or something like `setTimeout`). The general -// idea here is thus to do as much synchronous work as we can and then otherwise -// translate notifications of a future's task into "let's poll the future!" +// The main primitives used here are `Promise::new` to actually create a JS +// promise to return as well as `Atomics.waitAsync` to create a promise that we +// can asynchronously wait on. The general idea here is that we'll create a +// promise to return and schedule work to happen in `Atomics.waitAsync` +// callbacks. // -// This isn't necessarily the greatest future executor in the world, but it -// should get the job done for now hopefully. -fn _future_to_promise(future: Box>) -> Promise { +// After we've created a promise we start polling a future, and whenever it's +// not ready we'll execute `Atomics.waitAsync`. When that resolves we'll keep +// polling the future, and this happens until the future is done. Finally +// when it's all finished we call either resolver or reject depending on the +// result of the future. +fn _future_to_promise(future: Box>) -> js_sys::Promise { let mut future = Some(executor::spawn(future)); - return Promise::new(&mut |resolve, reject| { - Package::poll(&Arc::new(Package { - spawn: RefCell::new(future.take().unwrap()), + return js_sys::Promise::new(&mut |resolve, reject| { + Package { + spawn: future.take().unwrap(), resolve, reject, - notified: Cell::new(State::Notified), - waker: Arc::new(Waker::default()), - })); + waker: Arc::new(Waker { + value: AtomicI32::new(1), // 1 == "notified, ready to poll" + }), + } + .poll(); }); struct Package { // Our "spawned future". This'll have everything we need to poll the // future and continue to move it forward. - spawn: RefCell>>>, - - // The current state of this future, expressed in an enum below. This - // indicates whether we're currently polling the future, received a - // notification and need to keep polling, or if we're waiting for a - // notification to come in (and no one is polling). - notified: Cell, + spawn: Spawn>>, // Our two callbacks connected to the `Promise` that we returned to // JavaScript. We'll be invoking one of these at the end. resolve: Function, reject: Function, - // Struct to wake a future + // Shared state used to communicate waking up this future, this is the + // `Send + Sync` piece needed by the async task system. waker: Arc, } - // The possible states our `Package` (future) can be in, tracked internally - // and used to guide what happens when polling a future. - enum State { - // This future is currently and actively being polled. Attempting to - // access the future will result in a runtime panic and is considered a - // bug. - Polling, - - // This future has been notified, while it was being polled. This marker - // is used in the `Notify` implementation below, and indicates that a - // notification was received that the future is ready to make progress. - // If seen, however, it probably means that the future is also currently - // being polled. - Notified, - - // The future is blocked, waiting for something to happen. Stored here - // is a self-reference to the future itself so we can pull it out in - // `Notify` and continue polling. - // - // Note that the self-reference here is an Arc-cycle that will leak - // memory unless the future completes, but currently that should be ok - // as we'll have to stick around anyway while the future is executing! - // - // This state is removed as soon as a notification comes in, so the leak - // should only be "temporary" - Waiting(Arc), - } - - #[derive(Default)] struct Waker { - // worker will be waiting on this value - // 0 by default, which means not notified value: AtomicI32, }; impl Notify for Waker { fn notify(&self, _id: usize) { - // since we have only value field here - // let it be 1 if notified, 0 if not - if self.value.swap(1, Ordering::SeqCst) == 0 { - let _ = unsafe { - core::arch::wasm32::atomic_notify( - &self.value as *const AtomicI32 as *mut i32, - std::u32::MAX, // number of threads to notify - ) - }; - } - } - } - - fn poll_again(package: Arc) { - let me = match package.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => { - me - } - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => { + // Attempt to notify us by storing 1. If we're already 1 then we + // were previously notified and there's nothing to do. Otherwise + // we execute the native `notify` instruction to wake up the + // corresponding `waitAsync` that was waiting for the transition + // from 0 to 1. + let prev = self.value.swap(1, Ordering::SeqCst); + if prev == 1 { return; } - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => { - return; + debug_assert_eq!(prev, 0); + unsafe { + core::arch::wasm32::atomic_notify( + &self.value as *const AtomicI32 as *mut i32, + 1, // number of threads to notify + ); } - }; - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - let promise = - crate::polyfill::wait_async(&package.waker.value).expect("Should create a Promise"); - let closure = Closure::once(move |_| { - Package::poll(&me); - }); - promise.then(&closure); - closure.forget(); + } } impl Package { - // Move the future contained in `me` as far forward as we can. This will - // do as much synchronous work as possible to complete the future, - // ensuring that when it blocks we're scheduled to get notified via some - // callback somewhere at some point (vague, right?) - // - // TODO: this probably shouldn't do as much synchronous work as possible - // as it can starve other computations. Rather it should instead - // yield every so often with something like `setTimeout` with the - // timeout set to zero. - fn poll(me: &Arc) { - loop { - match me.notified.replace(State::Polling) { - // We received a notification while previously polling, or - // this is the initial poll. We've got work to do below! - State::Notified => {} - - // We've gone through this loop once and no notification was - // received while we were executing work. That means we got - // `NotReady` below and we're scheduled to receive a - // notification. Block ourselves and wait for later. - // - // When the notification comes in it'll notify our task, see - // our `Waiting` state, and resume the polling process - State::Polling => { - me.notified.set(State::Waiting(me.clone())); - - poll_again(me.clone()); - - break; - } - - State::Waiting(_) => panic!("shouldn't see waiting state!"), - } - - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { + fn poll(mut self) { + // Poll in a loop waiting for the future to become ready. Note that + // we probably shouldn't maximize synchronous work here but rather + // we should occasionally yield back to the runtime and schedule + // ourselves to resume this future later on. + // + // Note that 0 here means "need a notification" and 1 means "we got + // a notification". That means we're storing 0 into the `notified` + // slot and we're trying to read 1 to keep on going. + while self.waker.value.swap(0, Ordering::SeqCst) == 1 { + let (val, f) = match self.spawn.poll_future_notify(&self.waker, 0) { // If the future is ready, immediately call the // resolve/reject callback and then return as we're done. - Ok(Async::Ready(value)) => (value, &me.resolve), - Err(value) => (value, &me.reject), + Ok(Async::Ready(value)) => (value, &self.resolve), + Err(value) => (value, &self.reject), - // Otherwise keep going in our loop, if we weren't notified - // we'll break out and start waiting. - Ok(Async::NotReady) => continue, + // ... otherwise let's break out and wait + Ok(Async::NotReady) => break, }; + // Call the resolution function, and then when we're done + // destroy ourselves through `drop` since our future is no + // longer needed. drop(f.call1(&JsValue::undefined(), &val)); - break; + return; } + + // Create a `js_sys::Promise` using `Atomics.waitAsync` (or our + // polyfill) and then register its completion callback as simply + // calling this function again. + let promise = wait_async(&self.waker.value, 0).unchecked_into::(); + let closure = Closure::once_into_js(move || { + self.poll(); + }); + promise.then(&closure); } } } +fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise { + // If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today) + // then we use our fallback, otherwise we use the native function. + if Atomics::get_wait_async().is_undefined() { + crate::polyfill::wait_async(ptr, val) + } else { + let mem = wasm_bindgen::memory().unchecked_into::(); + Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val) + } + +} + /// Converts a Rust `Future` on a local task queue. /// /// The `future` provided must adhere to `'static` because it'll be scheduled @@ -320,8 +262,8 @@ fn _future_to_promise(future: Box>) /// /// This function has the same panic behavior as `future_to_promise`. pub fn spawn_local(future: F) - where - F: Future + 'static, +where + F: Future + 'static, { future_to_promise( future diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index 8a2c3424e8e..42dc84b6b95 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -36,38 +36,21 @@ * when possible. The worker communicates with its parent using postMessage. */ +use js_sys::{encode_uri_component, Array, Promise}; use std::cell::RefCell; -use std::sync::atomic::{AtomicI32, Ordering}; - -use js_sys::{ - encode_uri_component, Array, Function, Int32Array, JsString, Promise, Reflect, - WebAssembly, -}; +use std::sync::atomic::AtomicI32; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{MessageEvent, Worker}; -const DEFAULT_TIMEOUT: f64 = std::f64::INFINITY; - const HELPER_CODE: &'static str = " onmessage = function (ev) { - try { - switch (ev.data[0]) { - case 'wait': { - let [_, ia, index, value, timeout] = ev.data; - let result = Atomics.wait(ia, index, value, timeout); - postMessage(['ok', result]); - break; - } - default: { - throw new Error('Wrong message sent to wait helper: ' + ev.data.join(',')); - } - } - } catch (e) { - console.log('Exception in wait helper', e); - postMessage(['error', 'Exception']); - } -} + let [ia, index, value] = ev.data; + ia = new Int32Array(ia.buffer); + let result = Atomics.wait(ia, index, value); + console.log('done', result); + postMessage(result); +}; "; thread_local! { @@ -84,103 +67,39 @@ fn alloc_helper() -> Worker { let encoded: String = encode_uri_component(HELPER_CODE).into(); initialization_string.push_str(&encoded); - Worker::new(&initialization_string).expect("Should create a Worker") + Worker::new(&initialization_string).unwrap_or_else(|js| wasm_bindgen::throw_val(js)) }) } fn free_helper(helper: Worker) { HELPERS.with(move |helpers| { - helpers.borrow_mut().push(helper.clone()); + let mut helpers = helpers.borrow_mut(); + helpers.push(helper.clone()); + helpers.truncate(10); // random arbitrary limit chosen here }); } -pub fn wait_async(value: &AtomicI32) -> Result { - wait_async_with_timeout(value, DEFAULT_TIMEOUT) -} - -fn get_array_item(array: &JsValue, index: u32) -> JsValue { - Reflect::get(array, &JsValue::from(index)) - .expect(&format!("Array should contain the index {}", index)) -} - -// Atomics.waitAsync always returns a promise. Throws standard errors -// for parameter validation. The promise is resolved with a string as from -// Atomics.wait, or, in the case something went completely wrong, it is -// rejected with an error string. -pub fn wait_async_with_timeout(value: &AtomicI32, timeout: f64) -> Result { - let memory_buffer = wasm_bindgen::memory() - .dyn_into::() - .expect("Should cast a memory to WebAssembly::Memory") - .buffer(); - - let indexed_array = Int32Array::new(&memory_buffer); - - let index = value as *const AtomicI32 as u32 / 4; - let value_i32 = value.load(Ordering::SeqCst); - - // General case, we must wait. - - Ok(Promise::new( - &mut move |resolve: Function, reject: Function| { - let helper = alloc_helper(); - let helper_ref = helper.clone(); - - let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| { - // Free the helper early so that it can be reused if the resolution - // needs a helper. - free_helper(helper_ref); - match String::from( - get_array_item(&e.data(), 0) - .as_string() - .expect("data[0] should return a String"), - ) - .as_str() - { - "ok" => { - resolve - .call1(&JsValue::NULL, &get_array_item(&e.data(), 1)) - .expect("Should successfully call a resolve callback"); - } - "error" => { - // Note, rejection is not in the spec, it is an artifact of the polyfill. - // The helper already printed an error to the console. - reject - .call1(&JsValue::NULL, &get_array_item(&e.data(), 1)) - .expect("Should successfully call a reject callback"); - } - // it's not specified in the proposal yet - _ => (), - } - }); - helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); - - // onmessage_callback.forget(); - - // It's possible to do better here if the ia is already known to the - // helper. In that case we can communicate the other data through - // shared memory and wake the agent. And it is possible to make ia - // known to the helper by waking it with a special value so that it - // checks its messages, and then posting the ia to the helper. Some - // caching / decay scheme is useful no doubt, to improve performance - // and avoid leaks. - // - // In the event we wake the helper directly, we can micro-wait here - // for a quick result. We'll need to restructure some code to make - // that work out properly, and some synchronization is necessary for - // the helper to know that we've picked up the result and no - // postMessage is necessary. - - let data = Array::of5( - &JsString::from("wait"), - &indexed_array, - &JsValue::from(index), - &JsValue::from(value_i32), - &JsValue::from(timeout), - ); - - helper - .post_message(&data) - .expect("Should successfully post data to a Worker"); - }, - )) +pub fn wait_async(ptr: &AtomicI32, value: i32) -> Promise { + Promise::new(&mut |resolve, _reject| { + let helper = alloc_helper(); + let helper_ref = helper.clone(); + + let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| { + // Our helper is done waiting so it's available to wait on a + // different location, so return it to the free list. + free_helper(helper_ref); + drop(resolve.call1(&JsValue::NULL, &e.data())); + }); + helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + + let data = Array::of3( + &wasm_bindgen::memory(), + &JsValue::from(ptr as *const AtomicI32 as i32 / 4), + &JsValue::from(value), + ); + + helper + .post_message(&data) + .unwrap_or_else(|js| wasm_bindgen::throw_val(js)); + }) } From d590a9e053a87f5db00cf35c99407b5755e923e6 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 13:59:14 -0700 Subject: [PATCH 15/18] Deduplicate `JsFuture` definitions Turns out it's the exact same for both before and after atomics, so let's use the same definition! --- crates/futures/src/atomics.rs | 85 -------------------------- crates/futures/src/legacy_js2rust.rs | 89 ++++++++++++++++++++++++++++ crates/futures/src/lib.rs | 3 + 3 files changed, 92 insertions(+), 85 deletions(-) create mode 100644 crates/futures/src/legacy_js2rust.rs diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs index 9914fa156dc..524d25dba5d 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/atomics.rs @@ -1,29 +1,13 @@ -use std::cell::RefCell; -use std::fmt; -use std::rc::Rc; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; -use futures::sync::oneshot; use js_sys::Function; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; -/// A Rust `Future` backed by a JavaScript `Promise`. -/// -/// This type is constructed with a JavaScript `Promise` object and translates -/// it to a Rust `Future`. This type implements the `Future` trait from the -/// `futures` crate and will either succeed or fail depending on what happens -/// with the JavaScript `Promise`. -/// -/// Currently this type is constructed with `JsFuture::from`. -pub struct JsFuture { - rx: oneshot::Receiver>, -} - // Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`. #[wasm_bindgen] extern "C" { @@ -38,75 +22,6 @@ extern "C" { fn get_wait_async() -> JsValue; } -impl fmt::Debug for JsFuture { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "JsFuture {{ ... }}") - } -} - -impl From for JsFuture { - fn from(js: js_sys::Promise) -> JsFuture { - // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. We're currently - // assuming that JS engines will unconditionally invoke precisely one of - // these callbacks, no matter what. - // - // Ideally we'd have a way to cancel the callbacks getting invoked and - // free up state ourselves when this `JsFuture` is dropped. We don't - // have that, though, and one of the callbacks is likely always going to - // be invoked. - // - // As a result we need to make sure that no matter when the callbacks - // are invoked they are valid to be called at any time, which means they - // have to be self-contained. Through the `Closure::once` and some - // `Rc`-trickery we can arrange for both instances of `Closure`, and the - // `Rc`, to all be destroyed once the first one is called. - let (tx, rx) = oneshot::channel(); - let state = Rc::new(RefCell::new(None)); - let state2 = state.clone(); - let resolve = Closure::once(move |val| finish(&state2, Ok(val))); - let state2 = state.clone(); - let reject = Closure::once(move |val| finish(&state2, Err(val))); - - js.then2(&resolve, &reject); - *state.borrow_mut() = Some((tx, resolve, reject)); - - return JsFuture { rx }; - - fn finish( - state: &RefCell< - Option<( - oneshot::Sender>, - Closure, - Closure, - )>, - >, - val: Result, - ) { - match state.borrow_mut().take() { - // We don't have any guarantee that anyone's still listening at this - // point (the Rust `JsFuture` could have been dropped) so simply - // ignore any errors here. - Some((tx, _, _)) => drop(tx.send(val)), - None => wasm_bindgen::throw_str("cannot finish twice"), - } - } - } -} - -impl Future for JsFuture { - type Item = JsValue; - type Error = JsValue; - - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(val)) => val.map(Async::Ready), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => wasm_bindgen::throw_str("cannot cancel"), - } - } -} - /// Converts a Rust `Future` into a JavaScript `Promise`. /// /// This function will take any future in Rust and schedule it to be executed, diff --git a/crates/futures/src/legacy_js2rust.rs b/crates/futures/src/legacy_js2rust.rs new file mode 100644 index 00000000000..a4fb52c4fe1 --- /dev/null +++ b/crates/futures/src/legacy_js2rust.rs @@ -0,0 +1,89 @@ +use std::cell::RefCell; +use std::fmt; +use std::rc::Rc; + +use futures::prelude::*; +use futures::sync::oneshot; +use js_sys::Promise; +use wasm_bindgen::prelude::*; + +/// A Rust `Future` backed by a JavaScript `Promise`. +/// +/// This type is constructed with a JavaScript `Promise` object and translates +/// it to a Rust `Future`. This type implements the `Future` trait from the +/// `futures` crate and will either succeed or fail depending on what happens +/// with the JavaScript `Promise`. +/// +/// Currently this type is constructed with `JsFuture::from`. +pub struct JsFuture { + rx: oneshot::Receiver>, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. We're currently + // assuming that JS engines will unconditionally invoke precisely one of + // these callbacks, no matter what. + // + // Ideally we'd have a way to cancel the callbacks getting invoked and + // free up state ourselves when this `JsFuture` is dropped. We don't + // have that, though, and one of the callbacks is likely always going to + // be invoked. + // + // As a result we need to make sure that no matter when the callbacks + // are invoked they are valid to be called at any time, which means they + // have to be self-contained. Through the `Closure::once` and some + // `Rc`-trickery we can arrange for both instances of `Closure`, and the + // `Rc`, to all be destroyed once the first one is called. + let (tx, rx) = oneshot::channel(); + let state = Rc::new(RefCell::new(None)); + let state2 = state.clone(); + let resolve = Closure::once(move |val| finish(&state2, Ok(val))); + let state2 = state.clone(); + let reject = Closure::once(move |val| finish(&state2, Err(val))); + + js.then2(&resolve, &reject); + *state.borrow_mut() = Some((tx, resolve, reject)); + + return JsFuture { rx }; + + fn finish( + state: &RefCell< + Option<( + oneshot::Sender>, + Closure, + Closure, + )>, + >, + val: Result, + ) { + match state.borrow_mut().take() { + // We don't have any guarantee that anyone's still listening at this + // point (the Rust `JsFuture` could have been dropped) so simply + // ignore any errors here. + Some((tx, _, _)) => drop(tx.send(val)), + None => wasm_bindgen::throw_str("cannot finish twice"), + } + } + } +} + +impl Future for JsFuture { + type Item = JsValue; + type Error = JsValue; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(val)) => val.map(Async::Ready), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => wasm_bindgen::throw_str("cannot cancel"), + } + } +} diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 8ed8d173e08..d04a49733a7 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -106,6 +106,9 @@ use cfg_if::cfg_if; +mod legacy_js2rust; +pub use legacy_js2rust::*; + cfg_if! { if #[cfg(target_feature = "atomics")] { /// Contains a thread-safe version of this crate, with Futures 0.1 From c8451d6f3ed36ae7bef80088a37d98787e72f237 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 14:00:45 -0700 Subject: [PATCH 16/18] Rename some files * Use "legacy" instead of "stable" since `futures 0.1` is quicly becoming "legacy" * Rename "atomics" to "legacy_atomics" to leave room for the libstd-based futures atomics version. * Rename "polyfill" to "wait_async_polyfill" to specify what it's polyfilling. --- crates/futures/src/{stable.rs => legacy.rs} | 0 crates/futures/src/{atomics.rs => legacy_atomics.rs} | 2 +- crates/futures/src/lib.rs | 10 +++++----- .../src/{polyfill.rs => wait_async_polyfill.rs} | 0 4 files changed, 6 insertions(+), 6 deletions(-) rename crates/futures/src/{stable.rs => legacy.rs} (100%) rename crates/futures/src/{atomics.rs => legacy_atomics.rs} (99%) rename crates/futures/src/{polyfill.rs => wait_async_polyfill.rs} (100%) diff --git a/crates/futures/src/stable.rs b/crates/futures/src/legacy.rs similarity index 100% rename from crates/futures/src/stable.rs rename to crates/futures/src/legacy.rs diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/legacy_atomics.rs similarity index 99% rename from crates/futures/src/atomics.rs rename to crates/futures/src/legacy_atomics.rs index 524d25dba5d..9981b6958e6 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/legacy_atomics.rs @@ -160,7 +160,7 @@ fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise { // If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today) // then we use our fallback, otherwise we use the native function. if Atomics::get_wait_async().is_undefined() { - crate::polyfill::wait_async(ptr, val) + crate::wait_async_polyfill::wait_async(ptr, val) } else { let mem = wasm_bindgen::memory().unchecked_into::(); Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val) diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index d04a49733a7..ed7aec0659b 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -112,15 +112,15 @@ pub use legacy_js2rust::*; cfg_if! { if #[cfg(target_feature = "atomics")] { /// Contains a thread-safe version of this crate, with Futures 0.1 - mod atomics; + mod legacy_atomics; /// Polyfill for `Atomics.waitAsync` function - mod polyfill; + mod wait_async_polyfill; - pub use atomics::*; + pub use legacy_atomics::*; } else { - mod stable; - pub use stable::*; + mod legacy; + pub use legacy::*; } } diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/wait_async_polyfill.rs similarity index 100% rename from crates/futures/src/polyfill.rs rename to crates/futures/src/wait_async_polyfill.rs From be294c824872690150658255484186e9e46b912c Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 14:04:45 -0700 Subject: [PATCH 17/18] Remove a debugging statement --- crates/futures/src/wait_async_polyfill.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/futures/src/wait_async_polyfill.rs b/crates/futures/src/wait_async_polyfill.rs index 42dc84b6b95..14b8f0eadf5 100644 --- a/crates/futures/src/wait_async_polyfill.rs +++ b/crates/futures/src/wait_async_polyfill.rs @@ -48,7 +48,6 @@ onmessage = function (ev) { let [ia, index, value] = ev.data; ia = new Int32Array(ia.buffer); let result = Atomics.wait(ia, index, value); - console.log('done', result); postMessage(result); }; "; From b13f757e90d7c88d579cd4adbb018580a4520dea Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 14:11:59 -0700 Subject: [PATCH 18/18] Shared more betwee legacy with/without atomics --- crates/futures/src/legacy.rs | 114 +----------------- crates/futures/src/legacy_atomics.rs | 26 +--- .../{legacy_js2rust.rs => legacy_shared.rs} | 21 +++- crates/futures/src/lib.rs | 7 +- 4 files changed, 30 insertions(+), 138 deletions(-) rename crates/futures/src/{legacy_js2rust.rs => legacy_shared.rs} (85%) diff --git a/crates/futures/src/legacy.rs b/crates/futures/src/legacy.rs index 18fcde41219..a200d088c60 100644 --- a/crates/futures/src/legacy.rs +++ b/crates/futures/src/legacy.rs @@ -1,96 +1,11 @@ -use std::cell::{Cell, RefCell}; -use std::fmt; -use std::rc::Rc; -use std::sync::Arc; - use futures::executor::{self, Notify, Spawn}; -use futures::future; use futures::prelude::*; -use futures::sync::oneshot; use js_sys::{Function, Promise}; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::sync::Arc; use wasm_bindgen::prelude::*; -/// A Rust `Future` backed by a JavaScript `Promise`. -/// -/// This type is constructed with a JavaScript `Promise` object and translates -/// it to a Rust `Future`. This type implements the `Future` trait from the -/// `futures` crate and will either succeed or fail depending on what happens -/// with the JavaScript `Promise`. -/// -/// Currently this type is constructed with `JsFuture::from`. -pub struct JsFuture { - rx: oneshot::Receiver>, -} - -impl fmt::Debug for JsFuture { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "JsFuture {{ ... }}") - } -} - -impl From for JsFuture { - fn from(js: Promise) -> JsFuture { - // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. We're currently - // assuming that JS engines will unconditionally invoke precisely one of - // these callbacks, no matter what. - // - // Ideally we'd have a way to cancel the callbacks getting invoked and - // free up state ourselves when this `JsFuture` is dropped. We don't - // have that, though, and one of the callbacks is likely always going to - // be invoked. - // - // As a result we need to make sure that no matter when the callbacks - // are invoked they are valid to be called at any time, which means they - // have to be self-contained. Through the `Closure::once` and some - // `Rc`-trickery we can arrange for both instances of `Closure`, and the - // `Rc`, to all be destroyed once the first one is called. - let (tx, rx) = oneshot::channel(); - let state = Rc::new(RefCell::new(None)); - let state2 = state.clone(); - let resolve = Closure::once(move |val| finish(&state2, Ok(val))); - let state2 = state.clone(); - let reject = Closure::once(move |val| finish(&state2, Err(val))); - - js.then2(&resolve, &reject); - *state.borrow_mut() = Some((tx, resolve, reject)); - - return JsFuture { rx }; - - fn finish( - state: &RefCell< - Option<( - oneshot::Sender>, - Closure, - Closure, - )>, - >, - val: Result, - ) { - match state.borrow_mut().take() { - // We don't have any guarantee that anyone's still listening at this - // point (the Rust `JsFuture` could have been dropped) so simply - // ignore any errors here. - Some((tx, _, _)) => drop(tx.send(val)), - None => wasm_bindgen::throw_str("cannot finish twice"), - } - } - } -} - -impl Future for JsFuture { - type Item = JsValue; - type Error = JsValue; - - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(val)) => val.map(Async::Ready), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => wasm_bindgen::throw_str("cannot cancel"), - } - } -} - /// Converts a Rust `Future` into a JavaScript `Promise`. /// /// This function will take any future in Rust and schedule it to be executed, @@ -112,8 +27,8 @@ impl Future for JsFuture { /// resolve**. Instead it will be a leaked promise. This is an unfortunate /// limitation of wasm currently that's hoped to be fixed one day! pub fn future_to_promise(future: F) -> Promise - where - F: Future + 'static, +where + F: Future + 'static, { _future_to_promise(Box::new(future)) } @@ -287,22 +202,3 @@ fn _future_to_promise(future: Box>) } } } - -/// Converts a Rust `Future` on a local task queue. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. -/// -/// # Panics -/// -/// This function has the same panic behavior as `future_to_promise`. -pub fn spawn_local(future: F) - where - F: Future + 'static, -{ - future_to_promise( - future - .map(|()| JsValue::undefined()) - .or_else(|()| future::ok::(JsValue::undefined())), - ); -} diff --git a/crates/futures/src/legacy_atomics.rs b/crates/futures/src/legacy_atomics.rs index 9981b6958e6..b09e8b7ced5 100644 --- a/crates/futures/src/legacy_atomics.rs +++ b/crates/futures/src/legacy_atomics.rs @@ -1,10 +1,8 @@ -use std::sync::atomic::{AtomicI32, Ordering}; -use std::sync::Arc; - use futures::executor::{self, Notify, Spawn}; -use futures::future; use futures::prelude::*; use js_sys::Function; +use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::Arc; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; @@ -165,24 +163,4 @@ fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise { let mem = wasm_bindgen::memory().unchecked_into::(); Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val) } - -} - -/// Converts a Rust `Future` on a local task queue. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. -/// -/// # Panics -/// -/// This function has the same panic behavior as `future_to_promise`. -pub fn spawn_local(future: F) -where - F: Future + 'static, -{ - future_to_promise( - future - .map(|()| JsValue::undefined()) - .or_else(|()| future::ok::(JsValue::undefined())), - ); } diff --git a/crates/futures/src/legacy_js2rust.rs b/crates/futures/src/legacy_shared.rs similarity index 85% rename from crates/futures/src/legacy_js2rust.rs rename to crates/futures/src/legacy_shared.rs index a4fb52c4fe1..a53e32e108e 100644 --- a/crates/futures/src/legacy_js2rust.rs +++ b/crates/futures/src/legacy_shared.rs @@ -1,7 +1,7 @@ use std::cell::RefCell; +use futures::future; use std::fmt; use std::rc::Rc; - use futures::prelude::*; use futures::sync::oneshot; use js_sys::Promise; @@ -87,3 +87,22 @@ impl Future for JsFuture { } } } + +/// Converts a Rust `Future` on a local task queue. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +pub fn spawn_local(future: F) +where + F: Future + 'static, +{ + crate::future_to_promise( + future + .map(|()| JsValue::undefined()) + .or_else(|()| future::ok::(JsValue::undefined())), + ); +} diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index ed7aec0659b..57da7b365d0 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -106,18 +106,17 @@ use cfg_if::cfg_if; -mod legacy_js2rust; -pub use legacy_js2rust::*; +mod legacy_shared; +pub use legacy_shared::*; cfg_if! { if #[cfg(target_feature = "atomics")] { /// Contains a thread-safe version of this crate, with Futures 0.1 mod legacy_atomics; + pub use legacy_atomics::*; /// Polyfill for `Atomics.waitAsync` function mod wait_async_polyfill; - - pub use legacy_atomics::*; } else { mod legacy; pub use legacy::*;