diff --git a/src/runtime.rs b/src/runtime.rs index 417b2dcc5..addadf01f 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,6 +1,7 @@ //! [`Runtime`]s are the primary integration point between moxie and //! embedding environments. +mod bind_stream; mod context; mod runloop; mod var; @@ -17,6 +18,7 @@ use std::{ task::Waker, }; +pub use bind_stream::BindStream; pub(crate) use context::Context; pub use runloop::RunLoop; pub(crate) use var::Var; diff --git a/src/runtime/bind_stream.rs b/src/runtime/bind_stream.rs new file mode 100644 index 000000000..1204bdab5 --- /dev/null +++ b/src/runtime/bind_stream.rs @@ -0,0 +1,177 @@ +mod waker; +use waker::*; + +use super::{Revision, Runtime}; +use futures::{ + channel::mpsc, + stream::{Stream, StreamExt}, + task::{noop_waker, waker, LocalSpawn}, +}; +use std::{ + pin::Pin, + task::{Context as FutContext, Poll}, +}; + +/// A [`Runtime`] that is bound with a particular root function. +/// +/// If running in a context with an async executor, can be consumed as a +/// [`futures::Stream`] of [`crate::runtime::Revision`]s in order to provide +/// the [`super::Runtime`] with a [`std::task::Waker`]. +pub struct BindStream { + inner: Runtime, + root: Root, + changes_receiver: mpsc::Receiver<()>, +} + +impl super::Runtime { + /// Returns this runtime bound with a specific root function as + /// ([`futures::stream::Stream`]). + pub fn bind(self, root: Root) -> BindStream + where + Root: FnMut() -> Out + Unpin, + { + BindStream::with_runtime(self, root) + } + + /// Returns this runtime bound with a specific root function as + /// ([`futures::stream::Stream`]). + pub fn bind_init(self, root: Root) -> (BindStream, Out) + where + Root: FnMut() -> Out + Unpin, + { + let mut bound = self.bind(root); + let out = bound.try_next().unwrap(); + (bound, out) + } +} + +impl BindStream +where + Root: FnMut() -> Out + Unpin, +{ + /// Creates a new `Runtime` attached to the provided root function. + pub fn new(root: Root) -> Self { + let rt = Runtime::new(); + Self::with_runtime(rt, root) + } + + /// Binds `Runtime` to the provided root function. + pub fn with_runtime(mut rt: super::Runtime, root: Root) -> Self { + let (mut changes_sender, changes_receiver) = mpsc::channel(0); + changes_sender.try_send(()).unwrap(); + rt.set_state_change_waker(waker(BindStreamWaker::new(changes_sender))); + Self { inner: rt, root, changes_receiver } + } + + /// Returns output of the next revision if at least state change exists + pub fn try_next(&mut self) -> Option { + let wk = noop_waker(); + let mut cx = FutContext::from_waker(&wk); + match Pin::new(self).poll_next(&mut cx) { + Poll::Ready(out) => out, + Poll::Pending => None, + } + } + + /// Creates a new `Runtime` attached to the provided root function, which + /// then runs once. + pub fn init(root: Root) -> (Self, Out) { + let mut bound = Self::new(root); + let out = bound.try_next().unwrap(); + (bound, out) + } + + /// Binds `Runtime` to the provided root function, then runs it once. + pub fn init_with_runtime(rt: super::Runtime, root: Root) -> (Self, Out) { + let mut bound = Self::with_runtime(rt, root); + let out = bound.try_next().unwrap(); + (bound, out) + } + + /// Returns the runtime's current Revision. + pub fn revision(&self) -> Revision { + self.inner.revision() + } + + /// Sets the executor that will be used to spawn normal priority tasks. + pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { + self.inner.set_task_executor(sp); + } + + /// Poll this runtime without exiting. Discards any value returned from the + /// root function. The future yields in between revisions and is woken on + /// state changes. + pub async fn run_on_state_changes(mut self) { + loop { + self.next().await; + } + } + + /// Unbinds the runtime from its current root function, returning both. + /// Resets waker to `noop_waker`. + pub fn unbind(self) -> (Runtime, Root) { + let Self { mut inner, root, .. } = self; + inner.set_state_change_waker(noop_waker()); + (inner, root) + } +} + +impl Stream for BindStream +where + Root: FnMut() -> Out + Unpin, +{ + type Item = Out; + + /// This `Stream` implementation, if change present, runs a single revision + /// for a call to `poll_next`, returning `Poll::Ready(Some(...))`, otherwise + /// returns `Poll::Pending` + fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.changes_receiver) + .poll_next(cx) + .map(|received| received.map(|_| this.inner.run_once(&mut this.root))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn pending_without_change() { + use futures::{ + executor::{block_on, LocalPool}, + stream::StreamExt, + task::LocalSpawnExt, + }; + + let mut brt = BindStream::new(|| ()); + block_on(brt.next()).expect("BindStream should yield first revision immediately"); + let mut pool = LocalPool::new(); + pool.spawner() + .spawn_local(async move { + brt.next().await.unwrap(); + unreachable!() + }) + .unwrap(); + assert!(!pool.try_run_one()); + } + + #[test] + fn has_changes() { + use futures::{executor::LocalPool, task::LocalSpawnExt}; + + let (mut brt, key) = BindStream::init(|| crate::state(|| 0).1); + let mut pool = LocalPool::new(); + pool.spawner() + .spawn_local(async move { + brt.next().await; + }) + .unwrap(); + assert!(!pool.try_run_one()); + key.set(0); + assert!(!pool.try_run_one()); + key.set(1); + assert!(pool.try_run_one()); + } +} diff --git a/src/runtime/bind_stream/waker.rs b/src/runtime/bind_stream/waker.rs new file mode 100644 index 000000000..a28bc4d3c --- /dev/null +++ b/src/runtime/bind_stream/waker.rs @@ -0,0 +1,22 @@ +use futures::channel::mpsc; +use std::sync::{Arc, Mutex}; + +pub use futures::task::ArcWake; + +pub struct BindStreamWaker { + inner: Mutex>, +} + +impl BindStreamWaker { + pub fn new(inner: mpsc::Sender<()>) -> Arc { + Arc::new(Self { inner: Mutex::new(inner) }) + } +} + +impl ArcWake for BindStreamWaker { + fn wake_by_ref(arc_self: &Arc) { + let this = Arc::as_ref(arc_self); + // Ignore error on disconnected or full + drop(this.inner.try_lock().unwrap().try_send(())); + } +} diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index b8e648d2d..ccdae7ed7 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -63,6 +63,7 @@ where /// Poll this runtime without exiting. Discards any value returned from the /// root function. The future yields in between revisions and is woken on /// state changes. + #[deprecated(note = "Blocks indefinitely. Use BindStream instead.")] pub async fn run_on_state_changes(mut self) { loop { self.next().await; @@ -75,6 +76,7 @@ where } } +#[deprecated(note = "Does never yields. Will clog task pool. Use BindStream instead.")] impl Stream for RunLoop where Root: FnMut() -> Out + Unpin,