From d6b5ddea5c0d6014c27d371b8c8c6d7a2436e7cf Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Tue, 15 Dec 2020 01:18:59 +0300 Subject: [PATCH] Initial fix of #197 #215 #220 #222 for moxie crate --- src/lib.rs | 137 +++++++++++++++++++---------------------- src/runtime.rs | 95 +++++++++++++++++++++------- src/runtime/context.rs | 64 +++++++++---------- src/runtime/runloop.rs | 22 +++---- src/runtime/var.rs | 54 ++++++++++++---- 5 files changed, 217 insertions(+), 155 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2ef4e8d10..540910cf4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,7 +116,7 @@ pub use moxie_macros::updater; /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -124,7 +124,7 @@ pub use moxie_macros::updater; /// epoch.store(1, Ordering::Relaxed); // invalidates the cache /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 2, "reinitialized once after epoch changed"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -168,7 +168,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -214,7 +214,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -222,7 +222,7 @@ where /// epoch.store(1, Ordering::Relaxed); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 2, "reinitialized once after epoch changed"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -258,7 +258,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -285,22 +285,21 @@ where /// let mut rt = RunLoop::new(|| state(|| 0u64)); /// /// let track_wakes = BoolWaker::new(); -/// rt.set_state_change_waker(waker(track_wakes.clone())); /// -/// let (first_commit, first_key) = rt.run_once(); +/// let (first_commit, first_key) = rt.force_next_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(*first_key, 0, "no updates yet"); +/// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); +/// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// -/// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(*second_key, 1); +/// let (second_commit, second_key) = rt.force_next(); // this commits the pending update +/// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -331,22 +330,21 @@ where /// let mut rt = RunLoop::new(|| cache_state(&epoch.load(Ordering::Relaxed), |e| *e)); /// /// let track_wakes = BoolWaker::new(); -/// rt.set_state_change_waker(futures::task::waker(track_wakes.clone())); /// -/// let (first_commit, first_key) = rt.run_once(); +/// let (first_commit, first_key) = rt.force_next_with(futures::task::waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(*first_key, 0, "no updates yet"); +/// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); +/// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// -/// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(*second_key, 1); +/// let (second_commit, second_key) = rt.force_next(); // this commits the pending update +/// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -355,7 +353,7 @@ where /// // start the whole thing over again /// epoch.store(2, Ordering::Relaxed); /// -/// let (third_commit, third_key) = rt.run_once(); +/// let (third_commit, third_key) = rt.force_next(); /// assert_ne!(third_key, second_key, "different state variable"); /// /// // the rest is repeated from above with slight modifications @@ -363,15 +361,15 @@ where /// assert!(!track_wakes.is_woken()); /// /// third_key.set(2); -/// assert_eq!(*third_key, 2); +/// assert_eq!(**third_key.commit_at_root(), 2); /// assert!(!track_wakes.is_woken()); /// /// third_key.set(3); -/// assert_eq!(*third_key, 2); +/// assert_eq!(**third_key.commit_at_root(), 2); /// assert!(track_wakes.is_woken()); /// -/// let (fourth_commit, fourth_key) = rt.run_once(); -/// assert_eq!(*fourth_key, 3); +/// let (fourth_commit, fourth_key) = rt.force_next(); +/// assert_eq!(**fourth_key.commit_at_root(), 3); /// assert_eq!(*fourth_commit, 3); /// assert_eq!(*third_commit, 2); /// assert!(!track_wakes.is_woken()); @@ -428,9 +426,9 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let sender = recv_futs.recv().unwrap(); @@ -439,12 +437,12 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// /// // force the future to be reinitialized /// epoch.store(1, Ordering::Relaxed); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let sender = recv_futs.recv().unwrap(); @@ -453,7 +451,7 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -488,15 +486,15 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// sender.send(()).unwrap(); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -529,15 +527,15 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// sender.send(()).unwrap(); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -581,9 +579,9 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let (created_in_epoch, sender) = recv_futs.recv().unwrap(); @@ -593,12 +591,12 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// /// // force the future to be reinitialized /// epoch.store(1, Ordering::Relaxed); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let (created_in_epoch, sender) = recv_futs.recv().unwrap(); @@ -608,7 +606,7 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -691,6 +689,11 @@ impl Key { self.id } + /// Returns the `Commit` of the current `Revision` + pub fn commit_at_root(&self) -> &Commit { + &self.commit_at_root + } + /// Runs `updater` with a reference to the state variable's latest value, /// and enqueues a commit to the variable if `updater` returns `Some`. /// Returns the `Revision` at which the state variable was last rooted @@ -717,22 +720,21 @@ impl Key { /// let mut rt = RunLoop::new(|| state(|| 0u64)); /// /// let track_wakes = BoolWaker::new(); - /// rt.set_state_change_waker(waker(track_wakes.clone())); /// - /// let (first_commit, first_key) = rt.run_once(); + /// let (first_commit, first_key) = rt.force_next_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|_| None); // this is a no-op - /// assert_eq!(*first_key, 0, "no updates yet"); + /// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|prev| Some(prev + 1)); - /// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); + /// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// - /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update - /// assert_eq!(*second_key, 1); + /// let (second_commit, second_key) = rt.force_next(); // this commits the pending update + /// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -749,11 +751,6 @@ impl Key { fn force(&self, new: State) { self.var.lock().enqueue_commit(new); } - - // TODO(#197) delete this and remove the Deref impl - fn refresh(&mut self) { - self.commit_at_root = runtime::Var::root(self.var.clone()).0; - } } impl Key @@ -792,14 +789,6 @@ impl Clone for Key { } } -impl Deref for Key { - type Target = State; - - fn deref(&self) -> &Self::Target { - self.commit_at_root.deref() - } -} - impl Debug for Key where State: Debug, @@ -885,7 +874,7 @@ mod tests { for i in 0..5 { assert_eq!(rt.revision().0, i); - rt.run_once(); + rt.force_next(); assert_eq!(rt.revision().0, i + 1); } @@ -909,7 +898,7 @@ mod tests { } assert_eq!(ids.len(), 10); }); - rt.run_once(); + rt.force_next(); }); } @@ -925,10 +914,10 @@ mod tests { counts }); - let first_counts = rt.run_once(); + let first_counts = rt.force_next(); assert_eq!(first_counts.len(), num_iters, "each mutation must be called exactly once"); - let second_counts = rt.run_once(); + let second_counts = rt.force_next(); assert_eq!( second_counts.len(), 0, @@ -965,8 +954,8 @@ mod tests { "runtime's root block should run exactly twice per loop_ct value" ); - rt.run_once(); - rt.run_once(); + rt.force_next(); + rt.force_next(); } }) } @@ -991,14 +980,14 @@ mod tests { }); rt.set_task_executor(pool.spawner()); - assert_eq!(rt.run_once(), Poll::Pending, "no values received when nothing sent"); - assert_eq!(rt.run_once(), Poll::Pending, "no values received, and we aren't blocking"); + assert_eq!(rt.force_next(), Poll::Pending, "no values received when nothing sent"); + assert_eq!(rt.force_next(), Poll::Pending, "no values received, and we aren't blocking"); send.send(5u8).unwrap(); pool.run_until_stalled(); - assert_eq!(rt.run_once(), Poll::Ready(5), "we need to receive the value we sent"); + assert_eq!(rt.force_next(), Poll::Ready(5), "we need to receive the value we sent"); assert_eq!( - rt.run_once(), + rt.force_next(), Poll::Ready(5), "the value we sent must be cached because its from a oneshot channel" ); @@ -1028,19 +1017,19 @@ mod tests { rt.set_task_executor(pool.spawner()); pool.run_until_stalled(); - assert_eq!(rt.run_once(), Some(Poll::Pending)); + assert_eq!(rt.force_next(), Some(Poll::Pending)); assert!(!send.is_canceled(), "interest expressed, receiver must be live"); pool.run_until_stalled(); - assert_eq!(rt.run_once(), Some(Poll::Pending)); + assert_eq!(rt.force_next(), Some(Poll::Pending)); assert!(!send.is_canceled(), "interest still expressed, receiver must be live"); pool.run_until_stalled(); - assert_eq!(rt.run_once(), None); + assert_eq!(rt.force_next(), None); assert!(!send.is_canceled(), "interest dropped, task live for another revision"); pool.run_until_stalled(); - assert_eq!(rt.run_once(), None); + assert_eq!(rt.force_next(), None); assert!(send.is_canceled(), "interest dropped, task dropped"); assert!( diff --git a/src/runtime.rs b/src/runtime.rs index 417b2dcc5..0bae87029 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -11,10 +11,12 @@ use futures::{ task::{noop_waker, LocalSpawn, SpawnError}, }; use illicit::AsContext; +use parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; use std::{ fmt::{Debug, Formatter, Result as FmtResult}, rc::Rc, - task::Waker, + sync::{atomic::AtomicBool, Arc}, + task::{Poll, Waker}, }; pub(crate) use context::Context; @@ -41,6 +43,14 @@ impl std::fmt::Debug for Revision { } } +/// TODO +#[derive(Debug)] +pub struct RevisionControlSystem { + revision: Revision, + waker: Waker, + pending_changes: AtomicBool, +} + /// A [`Runtime`] is the primary integration point between moxie and an /// embedder. Each independent instance is responsible for an event loop and /// tracks time using a [`Revision`] which it increments on each iteration of @@ -112,17 +122,16 @@ impl std::fmt::Debug for Revision { /// let mut rt = Runtime::new(); /// assert_eq!(rt.revision().0, 0); /// for i in 1..10 { -/// rt.run_once(|| ()); +/// rt.force_once(|| ()); /// assert_eq!(rt.revision(), Revision(i)); /// } /// ``` /// /// [dyn-cache]: https://docs.rs/dyn-cache pub struct Runtime { - revision: Revision, + rcs: Arc>, cache: SharedLocalCache, spawner: Spawner, - wk: Waker, } impl Default for Runtime { @@ -137,23 +146,71 @@ impl Runtime { pub fn new() -> Self { Self { spawner: Spawner(Rc::new(JunkSpawner)), - revision: Revision(0), cache: SharedLocalCache::default(), - wk: noop_waker(), + rcs: Arc::new(RwLock::new(RevisionControlSystem { + revision: Revision(0), + waker: noop_waker(), + pending_changes: AtomicBool::new(true), + })), } } /// The current revision of the runtime, or how many times `run_once` has /// been invoked. pub fn revision(&self) -> Revision { - self.revision + self.rcs.read().revision + } + + /// TODO description + pub fn force(&self) { + self.rcs.read().pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); + } + + /// TODO description + pub fn force_once(&mut self, op: impl FnOnce() -> Out) -> Out { + self.execute(op, self.rcs.write()) } + /// TODO description + pub fn force_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Out { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + self.execute(op, rcs_write) + } + + /// TODO description /// Runs the root closure once with access to the runtime context, /// increments the runtime's `Revision`, and drops any cached values /// which were not marked alive. - pub fn run_once(&mut self, op: impl FnOnce() -> Out) -> Out { - self.revision.0 += 1; + pub fn poll_once(&mut self, op: impl FnOnce() -> Out, waker: Option) -> Poll { + // Avoid write lock + if let Some(waker) = waker { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + if !rcs_write.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending + } else { + Poll::Ready(self.execute(op, rcs_write)) + } + } else { + let rcs_read = self.rcs.upgradable_read(); + if !rcs_read.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending + } else { + let rcs_write = RwLockUpgradableReadGuard::upgrade(rcs_read); + Poll::Ready(self.execute(op, rcs_write)) + } + } + } + + fn execute( + &self, + op: impl FnOnce() -> Out, + mut rcs_write: RwLockWriteGuard, + ) -> Out { + rcs_write.revision.0 += 1; + rcs_write.pending_changes.store(false, std::sync::atomic::Ordering::Relaxed); + drop(rcs_write); let ret = self.context_handle().offer(|| topo::call(op)); @@ -161,14 +218,6 @@ impl Runtime { ret } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// receive commits. By default the runtime no-ops on a state change, - /// which is probably the desired behavior if the embedding system will - /// call `Runtime::run_once` on a regular interval regardless. - pub fn set_state_change_waker(&mut self, wk: Waker) { - self.wk = wk; - } - /// Sets the executor that will be used to spawn normal priority tasks. pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { self.spawner = Spawner(Rc::new(sp)); @@ -210,7 +259,7 @@ mod tests { assert!(illicit::get::().is_err()); first_byte.offer(|| { - topo::call(|| runtime.run_once()); + topo::call(|| runtime.force_next()); }); assert!(illicit::get::().is_err()); } @@ -218,10 +267,10 @@ mod tests { #[test] fn tick_a_few_times() { let mut rt = RunLoop::new(Revision::current); - assert_eq!(rt.run_once(), Revision(1)); - assert_eq!(rt.run_once(), Revision(2)); - assert_eq!(rt.run_once(), Revision(3)); - assert_eq!(rt.run_once(), Revision(4)); - assert_eq!(rt.run_once(), Revision(5)); + assert_eq!(rt.force_next(), Revision(1)); + assert_eq!(rt.force_next(), Revision(2)); + assert_eq!(rt.force_next(), Revision(3)); + assert_eq!(rt.force_next(), Revision(4)); + assert_eq!(rt.force_next(), Revision(5)); } } diff --git a/src/runtime/context.rs b/src/runtime/context.rs index f14a0e3eb..d53a70b98 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -1,28 +1,24 @@ -use super::{Revision, Spawner, Var}; +use super::{Revision, RevisionControlSystem, Spawner, Var}; use crate::{Commit, Key}; use dyn_cache::local::SharedLocalCache; use futures::future::abortable; -use std::{ - borrow::Borrow, - future::Future, - task::{Poll, Waker}, -}; +use parking_lot::RwLock; +use std::{borrow::Borrow, future::Future, sync::Arc, task::Poll}; /// A handle to the current [`Runtime`] which is offered via [`illicit`] /// contexts and provides access to the current revision, cache storage, /// task spawning, and the waker for the loop. #[derive(Debug)] pub(crate) struct Context { - revision: Revision, + rcs: Arc>, pub cache: SharedLocalCache, spawner: Spawner, - waker: Waker, } impl Context { /// Returns the revision for which this context was created. pub fn revision(&self) -> Revision { - self.revision + self.rcs.read().revision } /// Load a [`crate::state::Var`] with the provided argument and initializer. @@ -40,7 +36,7 @@ impl Context { { let var = self .cache - .cache(id, arg, |arg| Var::new(topo::CallId::current(), self.waker.clone(), init(arg))); + .cache(id, arg, |arg| Var::new(topo::CallId::current(), self.rcs.clone(), init(arg))); Var::root(var) } @@ -68,28 +64,31 @@ impl Context { Output: 'static, Ret: 'static, { - let (_, set_result): (_, Key>) = self.cache_state(id, &(), |()| Poll::Pending); - let mut set_result2 = set_result.clone(); - self.cache.hold(id, arg, |arg| { - // before we spawn the new task we need to mark it pending - set_result.force(Poll::Pending); + let var = self.cache.cache_with( + id, + arg, + |arg| { + // before we spawn the new task we need to mark it pending + let var = Var::new(topo::CallId::current(), self.rcs.clone(), Poll::Pending); - let (fut, aborter) = abortable(init(arg)); - let task = async move { - if let Ok(to_store) = fut.await { - set_result.update(|_| Some(Poll::Ready(to_store))); - } - }; - self.spawner - .0 - .spawn_local_obj(Box::pin(task).into()) - .expect("that set_task_executor has been called"); - scopeguard::guard(aborter, |a| a.abort()) - }); + let (fut, aborter) = abortable(init(arg)); - set_result2.refresh(); + let var2 = var.clone(); + let task = async move { + if let Ok(to_store) = fut.await { + var2.lock().enqueue_commit(Poll::Ready(to_store)); + } + }; + self.spawner + .0 + .spawn_local_obj(Box::pin(task).into()) + .expect("that set_task_executor has been called"); + (var, scopeguard::guard(aborter, |a| a.abort())) + }, + |(var, _)| var.clone(), + ); - match &*set_result2 { + match *Var::root(var).0 { Poll::Ready(ref stored) => Poll::Ready(with(stored)), Poll::Pending => Poll::Pending, } @@ -98,11 +97,6 @@ impl Context { impl super::Runtime { pub(crate) fn context_handle(&self) -> Context { - Context { - revision: self.revision, - spawner: self.spawner.clone(), - cache: self.cache.clone(), - waker: self.wk.clone(), - } + Context { rcs: self.rcs.clone(), spawner: self.spawner.clone(), cache: self.cache.clone() } } } diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index b8e648d2d..08532b523 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -43,12 +43,6 @@ where self.inner.revision() } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// change. - pub fn set_state_change_waker(&mut self, wk: Waker) { - self.inner.set_state_change_waker(wk); - } - /// Sets the executor that will be used to spawn normal priority tasks. pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { self.inner.set_task_executor(sp); @@ -56,8 +50,14 @@ where /// Run the root function once within this runtime's context, returning the /// result. - pub fn run_once(&mut self) -> Out { - self.inner.run_once(&mut self.root) + pub fn force_next(&mut self) -> Out { + self.inner.force_once(&mut self.root) + } + + /// Run the root function once within this runtime's context, returning the + /// result. + pub fn force_next_with(&mut self, waker: Waker) -> Out { + self.inner.force_once_with(&mut self.root, waker) } /// Poll this runtime without exiting. Discards any value returned from the @@ -79,14 +79,12 @@ impl Stream for RunLoop where Root: FnMut() -> Out + Unpin, { - type Item = (Revision, Out); + type Item = Out; /// This `Stream` implementation runs a single revision for each call to /// `poll_next`, always returning `Poll::Ready(Some(...))`. fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { let this = self.get_mut(); - this.inner.set_state_change_waker(cx.waker().clone()); - let out = this.run_once(); - Poll::Ready(Some((this.inner.revision, out))) + this.inner.poll_once(&mut this.root, Some(cx.waker().clone())).map(|o| Some(o)) } } diff --git a/src/runtime/var.rs b/src/runtime/var.rs index e9334464b..ab1ffc97a 100644 --- a/src/runtime/var.rs +++ b/src/runtime/var.rs @@ -1,20 +1,28 @@ use crate::{Commit, Key}; -use parking_lot::Mutex; -use std::{sync::Arc, task::Waker}; +use parking_lot::{Mutex, RwLock}; +use std::sync::Arc; + +use super::{Revision, RevisionControlSystem}; /// The underlying container of state variables. Vends copies of the latest /// [`Commit`] for [`Key`]s. pub(crate) struct Var { current: Commit, id: topo::CallId, - pending: Option>, - waker: Waker, + // can only contain commits from previous revisions + staged: Option>, + pending: Option<(Revision, Commit)>, + rcs: Arc>, } impl Var { - pub fn new(id: topo::CallId, waker: Waker, inner: State) -> Arc> { + pub fn new( + id: topo::CallId, + rcs: Arc>, + inner: State, + ) -> Arc> { let current = Commit { id, inner: Arc::new(inner) }; - Arc::new(Mutex::new(Var { id, current, waker, pending: None })) + Arc::new(Mutex::new(Var { id, current, rcs, staged: None, pending: None })) } /// Attach this `Var` to its callsite, performing any pending commit and @@ -22,9 +30,21 @@ impl Var { pub fn root(var: Arc>) -> (Commit, Key) { let (id, commit_at_root) = { let mut var = var.lock(); - if let Some(pending) = var.pending.take() { - var.current = pending; + let Revision(current) = Revision::current(); + + // stage pending commit if it's from previous revision + match var.pending { + Some((Revision(pending), _)) if pending < current => { + var.staged = Some(var.pending.take().unwrap().1) + } + _ => (), + } + + // perform staged commit + if let Some(staged) = var.staged.take() { + var.current = staged; } + (var.id, var.current.clone()) }; @@ -33,14 +53,26 @@ impl Var { /// Returns a reference to the latest value, pending or committed. pub fn latest(&self) -> &State { - &self.pending.as_ref().unwrap_or(&self.current) + self.pending + .as_ref() + .map(|(_revision, ref commit)| commit) + .or(self.staged.as_ref()) + .unwrap_or(&self.current) } /// Initiate a commit to the state variable. The commit will actually /// complete asynchronously when the state variable is next rooted in a /// topological function, flushing the pending commit. pub fn enqueue_commit(&mut self, state: State) { - self.pending = Some(Commit { inner: Arc::new(state), id: self.id }); - self.waker.wake_by_ref(); + let rcs_read = self.rcs.read(); + let rev = rcs_read.revision; + if let Some(pending) = self.pending.take() { + if pending.0 < rev { + self.staged = Some(pending.1); + } + } + self.pending = Some((rev, Commit { inner: Arc::new(state), id: self.id })); + rcs_read.pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); + rcs_read.waker.wake_by_ref(); } }