Skip to content

Commit

Permalink
Fixed anp#215
Browse files Browse the repository at this point in the history
  • Loading branch information
zetanumbers committed Nov 29, 2020
1 parent 74fb3e7 commit f87b753
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! [`Runtime`]s are the primary integration point between moxie and
//! embedding environments.

mod bind_stream;
mod context;
mod runloop;
mod var;
Expand All @@ -17,6 +18,7 @@ use std::{
task::Waker,
};

pub use bind_stream::BindStream;
pub(crate) use context::Context;
pub use runloop::RunLoop;
pub(crate) use var::Var;
Expand Down
177 changes: 177 additions & 0 deletions src/runtime/bind_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
mod waker;
use waker::*;

use super::{Revision, Runtime};
use futures::{
channel::mpsc,
stream::{Stream, StreamExt},
task::{noop_waker, waker, LocalSpawn},
};
use std::{
pin::Pin,
task::{Context as FutContext, Poll},
};

/// A [`Runtime`] that is bound with a particular root function.
///
/// If running in a context with an async executor, can be consumed as a
/// [`futures::Stream`] of [`crate::runtime::Revision`]s in order to provide
/// the [`super::Runtime`] with a [`std::task::Waker`].
pub struct BindStream<Root> {
inner: Runtime,
root: Root,
changes_receiver: mpsc::Receiver<()>,
}

impl super::Runtime {
/// Returns this runtime bound with a specific root function as
/// ([`futures::stream::Stream`]).
pub fn bind<Root, Out>(self, root: Root) -> BindStream<Root>
where
Root: FnMut() -> Out + Unpin,
{
BindStream::with_runtime(self, root)
}

/// Returns this runtime bound with a specific root function as
/// ([`futures::stream::Stream`]).
pub fn bind_init<Root, Out>(self, root: Root) -> (BindStream<Root>, Out)
where
Root: FnMut() -> Out + Unpin,
{
let mut bound = self.bind(root);
let out = bound.try_next().unwrap();
(bound, out)
}
}

impl<Root, Out> BindStream<Root>
where
Root: FnMut() -> Out + Unpin,
{
/// Creates a new `Runtime` attached to the provided root function.
pub fn new(root: Root) -> Self {
let rt = Runtime::new();
Self::with_runtime(rt, root)
}

/// Binds `Runtime` to the provided root function.
pub fn with_runtime(mut rt: super::Runtime, root: Root) -> Self {
let (mut changes_sender, changes_receiver) = mpsc::channel(0);
changes_sender.try_send(()).unwrap();
rt.set_state_change_waker(waker(BindStreamWaker::new(changes_sender)));
Self { inner: rt, root, changes_receiver }
}

/// Returns output of the next revision if at least state change exists
pub fn try_next(&mut self) -> Option<Out> {
let wk = noop_waker();
let mut cx = FutContext::from_waker(&wk);
match Pin::new(self).poll_next(&mut cx) {
Poll::Ready(out) => out,
Poll::Pending => None,
}
}

/// Creates a new `Runtime` attached to the provided root function, which
/// then runs once.
pub fn init(root: Root) -> (Self, Out) {
let mut bound = Self::new(root);
let out = bound.try_next().unwrap();
(bound, out)
}

/// Binds `Runtime` to the provided root function, then runs it once.
pub fn init_with_runtime(rt: super::Runtime, root: Root) -> (Self, Out) {
let mut bound = Self::with_runtime(rt, root);
let out = bound.try_next().unwrap();
(bound, out)
}

/// Returns the runtime's current Revision.
pub fn revision(&self) -> Revision {
self.inner.revision()
}

/// Sets the executor that will be used to spawn normal priority tasks.
pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) {
self.inner.set_task_executor(sp);
}

/// Poll this runtime without exiting. Discards any value returned from the
/// root function. The future yields in between revisions and is woken on
/// state changes.
pub async fn run_on_state_changes(mut self) {
loop {
self.next().await;
}
}

/// Unbinds the runtime from its current root function, returning both.
/// Resets waker to `noop_waker`.
pub fn unbind(self) -> (Runtime, Root) {
let Self { mut inner, root, .. } = self;
inner.set_state_change_waker(noop_waker());
(inner, root)
}
}

impl<Root, Out> Stream for BindStream<Root>
where
Root: FnMut() -> Out + Unpin,
{
type Item = Out;

/// This `Stream` implementation, if change present, runs a single revision
/// for a call to `poll_next`, returning `Poll::Ready(Some(...))`, otherwise
/// returns `Poll::Pending`
fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Pin::new(&mut this.changes_receiver)
.poll_next(cx)
.map(|received| received.map(|_| this.inner.run_once(&mut this.root)))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn pending_without_change() {
use futures::{
executor::{block_on, LocalPool},
stream::StreamExt,
task::LocalSpawnExt,
};

let mut brt = BindStream::new(|| ());
block_on(brt.next()).expect("BindStream should yield first revision immediately");
let mut pool = LocalPool::new();
pool.spawner()
.spawn_local(async move {
brt.next().await.unwrap();
unreachable!()
})
.unwrap();
assert!(!pool.try_run_one());
}

#[test]
fn has_changes() {
use futures::{executor::LocalPool, task::LocalSpawnExt};

let (mut brt, key) = BindStream::init(|| crate::state(|| 0).1);
let mut pool = LocalPool::new();
pool.spawner()
.spawn_local(async move {
brt.next().await;
})
.unwrap();
assert!(!pool.try_run_one());
key.set(0);
assert!(!pool.try_run_one());
key.set(1);
assert!(pool.try_run_one());
}
}
22 changes: 22 additions & 0 deletions src/runtime/bind_stream/waker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use futures::channel::mpsc;
use std::sync::{Arc, Mutex};

pub use futures::task::ArcWake;

pub struct BindStreamWaker {
inner: Mutex<mpsc::Sender<()>>,
}

impl BindStreamWaker {
pub fn new(inner: mpsc::Sender<()>) -> Arc<Self> {
Arc::new(Self { inner: Mutex::new(inner) })
}
}

impl ArcWake for BindStreamWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
let this = Arc::as_ref(arc_self);
// Ignore error on disconnected or full
drop(this.inner.try_lock().unwrap().try_send(()));
}
}
2 changes: 2 additions & 0 deletions src/runtime/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ where
/// Poll this runtime without exiting. Discards any value returned from the
/// root function. The future yields in between revisions and is woken on
/// state changes.
#[deprecated(note = "Blocks indefinitely. Use BindStream instead.")]
pub async fn run_on_state_changes(mut self) {
loop {
self.next().await;
Expand All @@ -75,6 +76,7 @@ where
}
}

#[deprecated(note = "Does never yields. Will clog task pool. Use BindStream instead.")]
impl<Root, Out> Stream for RunLoop<Root>
where
Root: FnMut() -> Out + Unpin,
Expand Down

0 comments on commit f87b753

Please sign in to comment.