From 6278bc0eef30d7d1a4982cc96b2d8e69d931c688 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 31 Aug 2023 14:32:32 +0200 Subject: [PATCH] feat(ui): `TimelineInnerStateLock::lock` is replaced by `read` and `write`. This patch splits the `TimelineInnerStateLock::lock` method into `::read` and `::write`. The idea is that a read-only lock doesn't hold a clone of the lock release observer (`lock_release_ob: SharedObservable<()>`), it will not notify the observer. Then it's only the write lock that holds a clone of the lock release observer, and will notify it. This patch updates the code accordingly as best as possible. --- .../matrix-sdk-ui/src/timeline/inner/mod.rs | 59 ++++++++++--------- .../matrix-sdk-ui/src/timeline/inner/state.rs | 42 +++++++------ .../tests/integration/timeline/subscribe.rs | 12 +--- 3 files changed, 56 insertions(+), 57 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 1fdd4f40598..5ebcc11ec10 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -68,7 +68,7 @@ use super::{ mod state; pub(super) use self::state::TimelineInnerState; -use self::state::{TimelineInnerStateLock, TimelineInnerStateLockGuard}; +use self::state::{TimelineInnerStateLock, TimelineInnerStateWriteGuard}; #[derive(Clone, Debug)] pub(super) struct TimelineInner { @@ -145,7 +145,7 @@ impl TimelineInner

{ /// /// Cheap because `im::Vector` is cheap to clone. pub(super) async fn items(&self) -> Vector> { - self.state.lock().await.items.clone() + self.state.read().await.items.clone() } pub(super) fn subscribe(&self) -> impl Stream>> { @@ -158,7 +158,7 @@ impl TimelineInner

{ loop { let (timeline_items, timeline_stream) = { - let state = state.lock().await; + let state = state.read().await; (state.items.clone(), state.items.subscribe()) }; @@ -175,7 +175,7 @@ impl TimelineInner

{ let _ = ignore_user_list_stream.next().await; // A user has been ignored/unignored? Let's clear the timeline. - state.lock().await.clear(); + state.write().await.clear(); } } .switch() @@ -196,7 +196,7 @@ impl TimelineInner

{ F: Fn(Arc) -> Option, { trace!("Creating timeline items signal"); - let state = self.state.lock().await; + let state = self.state.read().await; state.items.subscribe_filter_map(f) } @@ -204,7 +204,7 @@ impl TimelineInner

{ &self, annotation: &Annotation, ) -> Result { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let user_id = self.room_data_provider.own_user_id(); @@ -325,7 +325,7 @@ impl TimelineInner

{ ) { let own_user_id = self.room_data_provider.own_user_id().to_owned(); self.state - .lock() + .write() .await .users_read_receipts .entry(own_user_id) @@ -341,7 +341,7 @@ impl TimelineInner

{ debug!("Adding {} initial events", events.len()); - let mut state = self.state.lock().await; + let mut state = self.state.write().await; for event in events { state .handle_remote_event( @@ -356,12 +356,12 @@ impl TimelineInner

{ pub(super) async fn clear(&self) { trace!("Clearing timeline"); - self.state.lock().await.clear(); + self.state.write().await.clear(); } #[instrument(skip_all)] pub(super) async fn handle_joined_room_update(&self, update: JoinedRoom) { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; state.handle_sync_timeline(update.timeline, &self.room_data_provider, &self.settings).await; trace!("Handling account data"); @@ -396,7 +396,7 @@ impl TimelineInner

{ pub(super) async fn handle_sync_timeline(&self, timeline: Timeline) { self.state - .lock() + .write() .await .handle_sync_timeline(timeline, &self.room_data_provider, &self.settings) .await; @@ -405,7 +405,7 @@ impl TimelineInner

{ #[cfg(test)] pub(super) async fn handle_live_event(&self, event: SyncTimelineEvent) { self.state - .lock() + .write() .await .handle_live_event(event, &self.room_data_provider, &self.settings) .await; @@ -421,7 +421,7 @@ impl TimelineInner

{ let sender = self.room_data_provider.own_user_id().to_owned(); let profile = self.room_data_provider.profile(&sender).await; - let mut state = self.state.lock().await; + let mut state = self.state.write().await; state.handle_local_event(sender, profile, txn_id, content, &self.settings); } @@ -436,7 +436,7 @@ impl TimelineInner

{ let sender = self.room_data_provider.own_user_id().to_owned(); let profile = self.room_data_provider.profile(&sender).await; - let mut state = self.state.lock().await; + let mut state = self.state.write().await; state.handle_local_redaction(sender, profile, txn_id, to_redact, content, &self.settings); } @@ -449,7 +449,7 @@ impl TimelineInner

{ txn_id: &TransactionId, send_state: EventSendState, ) { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let new_event_id: Option<&EventId> = match &send_state { EventSendState::Sent { event_id } => Some(event_id), @@ -550,7 +550,7 @@ impl TimelineInner

{ annotation: &Annotation, result: &ReactionToggleResult, ) -> Result { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let user_id = self.room_data_provider.own_user_id(); let annotation_key: AnnotationKey = annotation.into(); @@ -592,7 +592,7 @@ impl TimelineInner

{ &self, txn_id: &TransactionId, ) -> Option { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let (idx, item) = rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))?; let local_item = item.as_local()?; @@ -618,7 +618,8 @@ impl TimelineInner

{ } pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; + if let Some((idx, _)) = rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id)) { @@ -639,7 +640,7 @@ impl TimelineInner

{ &self, events: Vec, ) -> Option { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let mut total = HandleManyEventsResult::default(); for event in events { @@ -660,7 +661,7 @@ impl TimelineInner

{ } pub(super) async fn set_fully_read_event(&self, fully_read_event_id: OwnedEventId) { - self.state.lock().await.set_fully_read_event(fully_read_event_id) + self.state.write().await.set_fully_read_event(fully_read_event_id) } #[cfg(feature = "e2e-encryption")] @@ -691,7 +692,7 @@ impl TimelineInner

{ ) { use super::EncryptedMessage; - let mut state = self.state.clone().lock_owned().await; + let mut state = self.state.clone().write_owned().await; let should_retry = move |session_id: &str| { if let Some(session_ids) = &session_ids { @@ -821,7 +822,7 @@ impl TimelineInner

{ } async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails) { - self.state.lock().await.items.for_each(|mut entry| { + self.state.write().await.items.for_each(|mut entry| { let Some(event_item) = entry.as_event() else { return }; if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) { let new_item = entry.with_kind(TimelineItemKind::Event( @@ -835,7 +836,7 @@ impl TimelineInner

{ pub(super) async fn update_sender_profiles(&self) { trace!("Updating sender profiles"); - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let mut entries = state.items.entries(); while let Some(mut entry) = entries.next() { let Some(event_item) = entry.as_event() else { continue }; @@ -875,7 +876,7 @@ impl TimelineInner

{ #[cfg(test)] pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) { let own_user_id = self.room_data_provider.own_user_id(); - self.state.lock().await.handle_explicit_read_receipts(receipt_event_content, own_user_id); + self.state.write().await.handle_explicit_read_receipts(receipt_event_content, own_user_id); } } @@ -914,7 +915,7 @@ impl TimelineInner { &self, event_id: &EventId, ) -> Result<(), super::Error> { - let state = self.state.lock().await; + let state = self.state.write().await; let (index, item) = rfind_event_by_id(&state.items, event_id) .ok_or(super::Error::RemoteEventNotInTimeline)?; let remote_item = item.as_remote().ok_or(super::Error::RemoteEventNotInTimeline)?.clone(); @@ -949,7 +950,7 @@ impl TimelineInner { // We need to be sure to have the latest position of the event as it might have // changed while waiting for the request. - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id) .ok_or(super::Error::RemoteEventNotInTimeline)?; @@ -985,7 +986,7 @@ impl TimelineInner { &self, user_id: &UserId, ) -> Option<(OwnedEventId, Receipt)> { - let state = self.state.lock().await; + let state = self.state.read().await; let room = self.room(); state.latest_user_read_receipt(user_id, room).await @@ -1006,7 +1007,7 @@ impl TimelineInner { } let own_user_id = self.room().own_user_id(); - let state = self.state.lock().await; + let state = self.state.read().await; let room = self.room(); match receipt_type { @@ -1060,7 +1061,7 @@ pub(super) struct HandleManyEventsResult { } async fn fetch_replied_to_event( - mut state: TimelineInnerStateLockGuard<'_>, + mut state: TimelineInnerStateWriteGuard<'_>, index: usize, item: &EventTimelineItem, message: &Message, diff --git a/crates/matrix-sdk-ui/src/timeline/inner/state.rs b/crates/matrix-sdk-ui/src/timeline/inner/state.rs index 41a930ee859..75a198cf524 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/state.rs @@ -34,7 +34,7 @@ use ruma::{ MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId, UserId, }; -use tokio::sync::{Mutex, MutexGuard, OwnedMutexGuard}; +use tokio::sync::{OwnedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{debug, error, instrument, trace, warn}; use super::{ReactionState, TimelineInnerSettings}; @@ -58,29 +58,33 @@ use crate::{ #[derive(Clone)] pub(in crate::timeline) struct TimelineInnerStateLock { - inner: Arc>, + inner: Arc>, lock_release_ob: SharedObservable<()>, } impl TimelineInnerStateLock { pub(super) fn new(state: TimelineInnerState) -> Self { - Self { inner: Arc::new(Mutex::new(state)), lock_release_ob: Default::default() } + Self { inner: Arc::new(RwLock::new(state)), lock_release_ob: Default::default() } } pub(super) fn subscribe_lock_release(&self) -> Subscriber<()> { self.lock_release_ob.subscribe() } - pub async fn lock(&self) -> TimelineInnerStateLockGuard<'_> { - TimelineInnerStateLockGuard { - inner: self.inner.lock().await, + pub async fn read(&self) -> RwLockReadGuard<'_, TimelineInnerState> { + self.inner.read().await + } + + pub async fn write(&self) -> TimelineInnerStateWriteGuard<'_> { + TimelineInnerStateWriteGuard { + inner: self.inner.write().await, lock_release_ob: self.lock_release_ob.clone(), } } - pub async fn lock_owned(&self) -> TimelineInnerStateOwnedLockGuard { - TimelineInnerStateOwnedLockGuard { - inner: self.inner.clone().lock_owned().await, + pub async fn write_owned(&self) -> TimelineInnerStateOwnedWriteGuard { + TimelineInnerStateOwnedWriteGuard { + inner: self.inner.clone().write_owned().await, lock_release_ob: self.lock_release_ob.clone(), } } @@ -477,12 +481,12 @@ impl TimelineInnerState { } } -pub(in crate::timeline) struct TimelineInnerStateLockGuard<'a> { - inner: MutexGuard<'a, TimelineInnerState>, +pub(in crate::timeline) struct TimelineInnerStateWriteGuard<'a> { + inner: RwLockWriteGuard<'a, TimelineInnerState>, lock_release_ob: SharedObservable<()>, } -impl Deref for TimelineInnerStateLockGuard<'_> { +impl Deref for TimelineInnerStateWriteGuard<'_> { type Target = TimelineInnerState; fn deref(&self) -> &Self::Target { @@ -490,24 +494,24 @@ impl Deref for TimelineInnerStateLockGuard<'_> { } } -impl DerefMut for TimelineInnerStateLockGuard<'_> { +impl DerefMut for TimelineInnerStateWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } -impl Drop for TimelineInnerStateLockGuard<'_> { +impl Drop for TimelineInnerStateWriteGuard<'_> { fn drop(&mut self) { self.lock_release_ob.set(()); } } -pub(in crate::timeline) struct TimelineInnerStateOwnedLockGuard { - inner: OwnedMutexGuard, +pub(in crate::timeline) struct TimelineInnerStateOwnedWriteGuard { + inner: OwnedRwLockWriteGuard, lock_release_ob: SharedObservable<()>, } -impl Deref for TimelineInnerStateOwnedLockGuard { +impl Deref for TimelineInnerStateOwnedWriteGuard { type Target = TimelineInnerState; fn deref(&self) -> &Self::Target { @@ -515,13 +519,13 @@ impl Deref for TimelineInnerStateOwnedLockGuard { } } -impl DerefMut for TimelineInnerStateOwnedLockGuard { +impl DerefMut for TimelineInnerStateOwnedWriteGuard { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } -impl Drop for TimelineInnerStateOwnedLockGuard { +impl Drop for TimelineInnerStateOwnedWriteGuard { fn drop(&mut self) { self.lock_release_ob.set(()); } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/subscribe.rs b/crates/matrix-sdk-ui/tests/integration/timeline/subscribe.rs index 680da116723..9ef0d2b6702 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/subscribe.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/subscribe.rs @@ -48,16 +48,10 @@ async fn batched() { let hdl = tokio::spawn(async move { pin_mut!(timeline_stream); + let next_batch = timeline_stream.next().await.unwrap(); - // One `VectorDiff::Reset`… - assert_eq!(next_batch.len(), 1); - assert_matches!(&next_batch[0], VectorDiff::Reset { values } => { - // … which is empty. - assert_eq!(values.len(), 0); - }); - let next_batch = timeline_stream.next().await.unwrap(); - // One day divider, and three event items. - assert_eq!(next_batch.len(), 4); + // One `VectorDiff::Reset` + one day divider + three event items. + assert_eq!(next_batch.len(), 5); }); ev_builder.add_joined_room(