Skip to content

Commit

Permalink
StateMonitor: Remove on_change from Mutex
Browse files Browse the repository at this point in the history
Improves syncing speed by about 10-15% on Android
  • Loading branch information
inetic committed Aug 12, 2024
1 parent 8a2ac5a commit 53cc9d0
Showing 1 changed file with 17 additions and 31 deletions.
48 changes: 17 additions & 31 deletions state_monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,12 @@ struct StateMonitorShared {
id: MonitorId,
parent: Option<StateMonitor>,
inner: BlockingMutex<StateMonitorInner>,
on_change: watch::Sender<()>,
}

struct StateMonitorInner {
values: IndexMap<String, MonitoredValueHandle>,
children: IndexMap<MonitorId, ChildEntry>,
// TODO: Why is this in mutex?
on_change: watch::Sender<()>,
}

struct ChildEntry {
Expand Down Expand Up @@ -175,8 +174,8 @@ impl StateMonitor {
inner: BlockingMutex::new(StateMonitorInner {
values: Default::default(),
children: Default::default(),
on_change: watch::channel(()).0,
}),
on_change: watch::channel(()).0,
});

e.insert(ChildEntry {
Expand All @@ -201,7 +200,7 @@ impl StateMonitor {
// of this function because given that `self` exists it must be that `refcount` doesn't
// drop to zero anywhere in this function (and thus won't be removed from parent).
self.shared.increment_refcount();
self.shared.changed(self.shared.lock_inner());
self.shared.changed();
}

Self { shared: child }
Expand Down Expand Up @@ -249,7 +248,9 @@ impl StateMonitor {
}
};

self.shared.changed(lock);
drop(lock); // Drop ASAP

self.shared.changed();

MonitoredValue {
name,
Expand Down Expand Up @@ -316,7 +317,7 @@ impl Drop for StateMonitor {
}

entry.remove();
parent.shared.changed(parent_inner);
parent.shared.changed();
}
}

Expand All @@ -337,8 +338,8 @@ impl StateMonitorShared {
inner: BlockingMutex::new(StateMonitorInner {
values: Default::default(),
children: Default::default(),
on_change: watch::channel(()).0,
}),
on_change: watch::channel(()).0,
})
}

Expand All @@ -361,19 +362,11 @@ impl StateMonitorShared {
}

fn subscribe(self: &Arc<Self>) -> watch::Receiver<()> {
self.lock_inner().on_change.subscribe()
self.on_change.subscribe()
}

// TODO: Does this need the lock?
fn changed(&self, lock: BlockingMutexGuard<'_, StateMonitorInner>) {
lock.on_change.send(()).unwrap_or(());

// Let's not lock self and parent at the same time to avoid potential deadlocks.
drop(lock);

if let Some(parent) = &self.parent {
parent.shared.changed(parent.shared.lock_inner());
}
fn changed(&self) {
self.on_change.send(()).unwrap_or(());
}

fn lock_inner(&self) -> BlockingMutexGuard<'_, StateMonitorInner> {
Expand Down Expand Up @@ -433,40 +426,33 @@ impl<T> MonitoredValue<T> {
pub fn get(&self) -> MutexGuardWrap<'_, T> {
MutexGuardWrap {
monitor: self.monitor.clone(),
guard: Some(self.value.lock().unwrap()),
guard: self.value.lock().unwrap(),
}
}
}

pub struct MutexGuardWrap<'a, T> {
monitor: StateMonitor,
// This is only None in the destructor.
guard: Option<BlockingMutexGuard<'a, T>>,
guard: BlockingMutexGuard<'a, T>,
}

impl<'a, T> core::ops::Deref for MutexGuardWrap<'a, T> {
type Target = T;

fn deref(&self) -> &T {
self.guard.as_ref().unwrap()
&*self.guard
}
}

impl<'a, T> core::ops::DerefMut for MutexGuardWrap<'a, T> {
fn deref_mut(&mut self) -> &mut T {
&mut *(self.guard.as_mut().unwrap())
&mut *self.guard
}
}

impl<'a, T> Drop for MutexGuardWrap<'a, T> {
fn drop(&mut self) {
{
// Unlock this before we try to lock the parent monitor.
self.guard.take();
}
self.monitor
.shared
.changed(self.monitor.shared.lock_inner());
self.monitor.shared.changed();
}
}

Expand All @@ -481,7 +467,7 @@ impl<T> Drop for MonitoredValue<T> {
v.refcount -= 1;
if v.refcount == 0 {
e.remove();
self.monitor.shared.changed(lock);
self.monitor.shared.changed();
}
}
Entry::Vacant(_) => unreachable!(),
Expand Down

0 comments on commit 53cc9d0

Please sign in to comment.